1# Copyright 2019 - The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""OtaTools class.""" 15 16import logging 17import os 18import tempfile 19 20from acloud import errors 21from acloud.internal.lib import utils 22 23logger = logging.getLogger(__name__) 24 25_BIN_DIR_NAME = "bin" 26_LPMAKE = "lpmake" 27_BUILD_SUPER_IMAGE = "build_super_image" 28_AVBTOOL = "avbtool" 29_SGDISK = "sgdisk" 30_SIMG2IMG = "simg2img" 31_MK_COMBINED_IMG = "mk_combined_img" 32_UNPACK_BOOTIMG = "unpack_bootimg" 33 34_BUILD_SUPER_IMAGE_TIMEOUT_SECS = 30 35_AVBTOOL_TIMEOUT_SECS = 30 36_MK_COMBINED_IMG_TIMEOUT_SECS = 180 37_UNPACK_BOOTIMG_TIMEOUT_SECS = 30 38 39_MISSING_OTA_TOOLS_MSG = ("%(tool_name)s is not found. Run `make otatools` " 40 "in build environment, or set --local-tool to an " 41 "extracted otatools.zip.") 42 43 44def FindOtaToolsDir(search_paths): 45 """Find OTA tools directory in the search paths. 46 47 Args: 48 search_paths: List of paths, the directories to search for OTA tools. 49 50 Returns: 51 The directory containing OTA tools. 52 53 Raises: 54 errors.CheckPathError if OTA tools are not found. 55 """ 56 for search_path in search_paths: 57 if os.path.isfile(os.path.join(search_path, _BIN_DIR_NAME, 58 _BUILD_SUPER_IMAGE)): 59 return search_path 60 raise errors.CheckPathError(_MISSING_OTA_TOOLS_MSG % 61 {"tool_name": "OTA tool directory"}) 62 63 64def FindOtaTools(search_paths): 65 """Find OTA tools in the search paths. 66 67 Args: 68 search_paths: List of paths, the directories to search for OTA tools. 69 70 Returns: 71 An OtaTools object. 72 73 Raises: 74 errors.CheckPathError if OTA tools are not found. 75 """ 76 return OtaTools(FindOtaToolsDir(search_paths)) 77 78 79def GetImageForPartition(partition_name, image_dir, **image_paths): 80 """Map a partition name to an image path. 81 82 This function is used with BuildSuperImage or MkCombinedImg to mix 83 image_dir and image_paths into the output file. 84 85 Args: 86 partition_name: String, e.g., "system", "product", and "vendor". 87 image_dir: String, the directory to search for the images that are not 88 given in image_paths. 89 image_paths: Pairs of partition names and image paths. 90 91 Returns: 92 The image path if the partition is in image_paths. 93 Otherwise, this function returns the path under image_dir. 94 95 Raises 96 errors.GetLocalImageError if the image does not exist. 97 """ 98 image_path = (image_paths.get(partition_name) or 99 os.path.join(image_dir, partition_name + ".img")) 100 if not os.path.isfile(image_path): 101 raise errors.GetLocalImageError( 102 "Cannot find image for partition %s" % partition_name) 103 return image_path 104 105 106class OtaTools: 107 """The class that executes OTA tool commands.""" 108 109 def __init__(self, ota_tools_dir): 110 self._ota_tools_dir = os.path.abspath(ota_tools_dir) 111 112 def _GetBinary(self, name): 113 """Get an executable file from _ota_tools_dir. 114 115 Args: 116 name: String, the file name. 117 118 Returns: 119 String, the absolute path. 120 121 Raises: 122 errors.NoExecuteCmd if the file does not exist. 123 """ 124 path = os.path.join(self._ota_tools_dir, _BIN_DIR_NAME, name) 125 if not os.path.isfile(path): 126 raise errors.NoExecuteCmd(_MISSING_OTA_TOOLS_MSG % 127 {"tool_name": name}) 128 utils.SetExecutable(path) 129 return path 130 131 @staticmethod 132 def _RewriteMiscInfo(output_file, input_file, lpmake_path, get_image): 133 """Rewrite lpmake and image paths in misc_info.txt. 134 135 Misc info consists of multiple lines of <key>=<value>. 136 Sample input_file: 137 lpmake=lpmake 138 dynamic_partition_list= system system_ext product vendor 139 140 Sample output_file: 141 lpmake=/path/to/lpmake 142 dynamic_partition_list= system system_ext product vendor 143 system_image=/path/to/system.img 144 system_ext_image=/path/to/system_ext.img 145 product_image=/path/to/product.img 146 vendor_image=/path/to/vendor.img 147 148 This method replaces lpmake with the specified path, and sets 149 *_image for every partition in dynamic_partition_list. 150 151 Args: 152 output_file: The output file object. 153 input_file: The input file object. 154 lpmake_path: The path to lpmake binary. 155 get_image: A function that takes the partition name as the 156 parameter and returns the image path. 157 """ 158 partition_names = () 159 for line in input_file: 160 split_line = line.strip().split("=", 1) 161 if len(split_line) < 2: 162 split_line = (split_line[0], "") 163 if split_line[0] == "dynamic_partition_list": 164 partition_names = split_line[1].split() 165 elif split_line[0] == "lpmake": 166 output_file.write("lpmake=%s\n" % lpmake_path) 167 continue 168 elif split_line[0].endswith("_image"): 169 continue 170 output_file.write(line) 171 172 if not partition_names: 173 logger.w("No dynamic partition list in misc info.") 174 175 for partition_name in partition_names: 176 output_file.write("%s_image=%s\n" % 177 (partition_name, get_image(partition_name))) 178 179 @utils.TimeExecute(function_description="Build super image") 180 @utils.TimeoutException(_BUILD_SUPER_IMAGE_TIMEOUT_SECS) 181 def BuildSuperImage(self, output_path, misc_info_path, get_image): 182 """Use build_super_image to create a super image. 183 184 Args: 185 output_path: The path to the output super image. 186 misc_info_path: The path to the misc info that provides parameters 187 to create the super image. 188 get_image: A function that takes the partition name as the 189 parameter and returns the image path. 190 """ 191 build_super_image = self._GetBinary(_BUILD_SUPER_IMAGE) 192 lpmake = self._GetBinary(_LPMAKE) 193 194 new_misc_info_path = None 195 try: 196 with open(misc_info_path, "r") as misc_info: 197 with tempfile.NamedTemporaryFile( 198 prefix="misc_info_", suffix=".txt", 199 delete=False, mode="w") as new_misc_info: 200 new_misc_info_path = new_misc_info.name 201 self._RewriteMiscInfo(new_misc_info, misc_info, lpmake, 202 get_image) 203 204 utils.Popen(build_super_image, new_misc_info_path, output_path) 205 finally: 206 if new_misc_info_path: 207 os.remove(new_misc_info_path) 208 209 @utils.TimeExecute(function_description="Make disabled vbmeta image.") 210 @utils.TimeoutException(_AVBTOOL_TIMEOUT_SECS) 211 def MakeDisabledVbmetaImage(self, output_path): 212 """Use avbtool to create a vbmeta image with verification disabled. 213 214 Args: 215 output_path: The path to the output vbmeta image. 216 """ 217 avbtool = self._GetBinary(_AVBTOOL) 218 utils.Popen(avbtool, "make_vbmeta_image", 219 "--flag", "2", 220 "--padding_size", "4096", 221 "--output", output_path) 222 223 @staticmethod 224 def _RewriteSystemQemuConfig(output_file, input_file, get_image): 225 """Rewrite image paths in system-qemu-config.txt. 226 227 Sample input_file: 228 out/target/product/generic_x86_64/vbmeta.img vbmeta 1 229 out/target/product/generic_x86_64/super.img super 2 230 231 Sample output_file: 232 /path/to/vbmeta.img vbmeta 1 233 /path/to/super.img super 2 234 235 This method replaces the first entry of each line with the path 236 returned by get_image. 237 238 Args: 239 output_file: The output file object. 240 input_file: The input file object. 241 get_image: A function that takes the partition name as the 242 parameter and returns the image path. 243 """ 244 for line in input_file: 245 split_line = line.split() 246 if len(split_line) == 3: 247 output_file.write("%s %s %s\n" % (get_image(split_line[1]), 248 split_line[1], 249 split_line[2])) 250 else: 251 output_file.write(line) 252 253 @utils.TimeExecute(function_description="Make combined image") 254 @utils.TimeoutException(_MK_COMBINED_IMG_TIMEOUT_SECS) 255 def MkCombinedImg(self, output_path, system_qemu_config_path, get_image): 256 """Use mk_combined_img to create a disk image. 257 258 Args: 259 output_path: The path to the output disk image. 260 system_qemu_config: The path to the config that provides the 261 parition information on the disk. 262 get_image: A function that takes the partition name as the 263 parameter and returns the image path. 264 """ 265 mk_combined_img = self._GetBinary(_MK_COMBINED_IMG) 266 sgdisk = self._GetBinary(_SGDISK) 267 simg2img = self._GetBinary(_SIMG2IMG) 268 269 new_config_path = None 270 try: 271 with open(system_qemu_config_path, "r") as config: 272 with tempfile.NamedTemporaryFile( 273 prefix="system-qemu-config_", suffix=".txt", 274 delete=False, mode="w") as new_config: 275 new_config_path = new_config.name 276 self._RewriteSystemQemuConfig(new_config, config, 277 get_image) 278 279 mk_combined_img_env = {"SGDISK": sgdisk, "SIMG2IMG": simg2img} 280 utils.Popen(mk_combined_img, 281 "-i", new_config_path, 282 "-o", output_path, 283 env=mk_combined_img_env) 284 finally: 285 if new_config_path: 286 os.remove(new_config_path) 287 288 @utils.TimeExecute(function_description="Unpack boot image") 289 @utils.TimeoutException(_UNPACK_BOOTIMG_TIMEOUT_SECS) 290 def UnpackBootImg(self, out_dir, boot_img): 291 """Use unpack_bootimg to unpack a boot image to a direcotry. 292 293 Args: 294 out_dir: The output directory. 295 boot_img: The path to the boot image. 296 """ 297 unpack_bootimg = self._GetBinary(_UNPACK_BOOTIMG) 298 utils.Popen(unpack_bootimg, 299 "--out", out_dir, 300 "--boot_img", boot_img) 301