1# Copyright 2019 - The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""OtaTools class.""" 15 16import logging 17import os 18import subprocess 19import tempfile 20 21from six import b 22 23 24from acloud import errors 25from acloud.internal import constants 26from acloud.internal.lib import utils 27 28logger = logging.getLogger(__name__) 29 30_BIN_DIR_NAME = "bin" 31_LPMAKE = "lpmake" 32_BUILD_SUPER_IMAGE = "build_super_image" 33_AVBTOOL = "avbtool" 34_SGDISK = "sgdisk" 35_SIMG2IMG = "simg2img" 36_MK_COMBINED_IMG = "mk_combined_img" 37 38_BUILD_SUPER_IMAGE_TIMEOUT_SECS = 30 39_AVBTOOL_TIMEOUT_SECS = 30 40_MK_COMBINED_IMG_TIMEOUT_SECS = 180 41 42_MISSING_OTA_TOOLS_MSG = ("%(tool_name)s is not found. Run `make otatools` " 43 "in build environment, or set --local-tool to an " 44 "extracted otatools.zip.") 45 46 47def FindOtaTools(search_paths): 48 """Find OTA tools in the search paths and in build environment. 49 50 Args: 51 search_paths: List of paths, the directories to search for OTA tools. 52 53 Returns: 54 The directory containing OTA tools. 55 56 Raises: 57 errors.CheckPathError if OTA tools are not found. 58 """ 59 for search_path in search_paths: 60 if os.path.isfile(os.path.join(search_path, _BIN_DIR_NAME, 61 _BUILD_SUPER_IMAGE)): 62 return search_path 63 for env_host_out in [constants.ENV_ANDROID_SOONG_HOST_OUT, 64 constants.ENV_ANDROID_HOST_OUT]: 65 host_out_dir = os.environ.get(env_host_out) 66 if (host_out_dir and 67 os.path.isfile(os.path.join(host_out_dir, _BIN_DIR_NAME, 68 _BUILD_SUPER_IMAGE))): 69 return host_out_dir 70 71 raise errors.CheckPathError(_MISSING_OTA_TOOLS_MSG % 72 {"tool_name": "OTA tool directory"}) 73 74 75def GetImageForPartition(partition_name, image_dir, **image_paths): 76 """Map a partition name to an image path. 77 78 This function is used with BuildSuperImage or MkCombinedImg to mix 79 image_dir and image_paths into the output file. 80 81 Args: 82 partition_name: String, e.g., "system", "product", and "vendor". 83 image_dir: String, the directory to search for the images that are not 84 given in image_paths. 85 image_paths: Pairs of partition names and image paths. 86 87 Returns: 88 The image path if the partition is in image_paths. 89 Otherwise, this function returns the path under image_dir. 90 91 Raises 92 errors.GetLocalImageError if the image does not exist. 93 """ 94 image_path = (image_paths.get(partition_name) or 95 os.path.join(image_dir, partition_name + ".img")) 96 if not os.path.isfile(image_path): 97 raise errors.GetLocalImageError( 98 "Cannot find image for partition %s" % partition_name) 99 return image_path 100 101 102class OtaTools: 103 """The class that executes OTA tool commands.""" 104 105 def __init__(self, ota_tools_dir): 106 self._ota_tools_dir = os.path.abspath(ota_tools_dir) 107 108 def _GetBinary(self, name): 109 """Get an executable file from _ota_tools_dir. 110 111 Args: 112 name: String, the file name. 113 114 Returns: 115 String, the absolute path. 116 117 Raises: 118 errors.NoExecuteCmd if the file does not exist. 119 """ 120 path = os.path.join(self._ota_tools_dir, _BIN_DIR_NAME, name) 121 if not os.path.isfile(path): 122 raise errors.NoExecuteCmd(_MISSING_OTA_TOOLS_MSG % 123 {"tool_name": name}) 124 utils.SetExecutable(path) 125 return path 126 127 @staticmethod 128 def _ExecuteCommand(*command, **popen_args): 129 """Execute a command and log the output. 130 131 This method waits for the process to terminate. It kills the process 132 if it's interrupted due to timeout. 133 134 Args: 135 command: Strings, the command. 136 popen_kwargs: The arguments to be passed to subprocess.Popen. 137 138 Raises: 139 errors.SubprocessFail if the process returns non-zero. 140 """ 141 proc = None 142 try: 143 logger.info("Execute %s", command) 144 popen_args["stdin"] = subprocess.PIPE 145 popen_args["stdout"] = subprocess.PIPE 146 popen_args["stderr"] = subprocess.PIPE 147 148 # Some OTA tools are Python scripts in different versions. The 149 # PYTHONPATH for acloud may be incompatible with the tools. 150 if "env" not in popen_args and "PYTHONPATH" in os.environ: 151 popen_env = os.environ.copy() 152 del popen_env["PYTHONPATH"] 153 popen_args["env"] = popen_env 154 155 proc = subprocess.Popen(command, **popen_args) 156 stdout, stderr = proc.communicate() 157 logger.info("%s stdout: %s", command[0], stdout) 158 logger.info("%s stderr: %s", command[0], stderr) 159 160 if proc.returncode != 0: 161 raise errors.SubprocessFail("%s returned %d." % 162 (command[0], proc.returncode)) 163 finally: 164 if proc and proc.poll() is None: 165 logger.info("Kill %s", command[0]) 166 proc.kill() 167 168 @staticmethod 169 def _RewriteMiscInfo(output_file, input_file, lpmake_path, get_image): 170 """Rewrite lpmake and image paths in misc_info.txt. 171 172 Misc info consists of multiple lines of <key>=<value>. 173 Sample input_file: 174 lpmake=lpmake 175 dynamic_partition_list= system system_ext product vendor 176 177 Sample output_file: 178 lpmake=/path/to/lpmake 179 dynamic_partition_list= system system_ext product vendor 180 system_image=/path/to/system.img 181 system_ext_image=/path/to/system_ext.img 182 product_image=/path/to/product.img 183 vendor_image=/path/to/vendor.img 184 185 This method replaces lpmake with the specified path, and sets 186 *_image for every partition in dynamic_partition_list. 187 188 Args: 189 output_file: The output file object. 190 input_file: The input file object. 191 lpmake_path: The path to lpmake binary. 192 get_image: A function that takes the partition name as the 193 parameter and returns the image path. 194 """ 195 partition_names = () 196 for line in input_file: 197 split_line = line.strip().split("=", 1) 198 if len(split_line) < 2: 199 split_line = (split_line[0], "") 200 if split_line[0] == "dynamic_partition_list": 201 partition_names = split_line[1].split() 202 elif split_line[0] == "lpmake": 203 output_file.write(b("lpmake=%s\n" % lpmake_path)) 204 continue 205 elif split_line[0].endswith("_image"): 206 continue 207 output_file.write(b(line)) 208 209 if not partition_names: 210 logger.w("No dynamic partition list in misc info.") 211 212 for partition_name in partition_names: 213 output_file.write(b("%s_image=%s\n" % 214 (partition_name, get_image(partition_name)))) 215 216 @utils.TimeExecute(function_description="Build super image") 217 @utils.TimeoutException(_BUILD_SUPER_IMAGE_TIMEOUT_SECS) 218 def BuildSuperImage(self, output_path, misc_info_path, get_image): 219 """Use build_super_image to create a super image. 220 221 Args: 222 output_path: The path to the output super image. 223 misc_info_path: The path to the misc info that provides parameters 224 to create the super image. 225 get_image: A function that takes the partition name as the 226 parameter and returns the image path. 227 """ 228 build_super_image = self._GetBinary(_BUILD_SUPER_IMAGE) 229 lpmake = self._GetBinary(_LPMAKE) 230 231 new_misc_info_path = None 232 try: 233 with open(misc_info_path, "r") as misc_info: 234 with tempfile.NamedTemporaryFile( 235 prefix="misc_info_", suffix=".txt", 236 delete=False) as new_misc_info: 237 new_misc_info_path = new_misc_info.name 238 self._RewriteMiscInfo(new_misc_info, misc_info, lpmake, 239 get_image) 240 241 self._ExecuteCommand(build_super_image, new_misc_info_path, 242 output_path) 243 finally: 244 if new_misc_info_path: 245 os.remove(new_misc_info_path) 246 247 @utils.TimeExecute(function_description="Make disabled vbmeta image.") 248 @utils.TimeoutException(_AVBTOOL_TIMEOUT_SECS) 249 def MakeDisabledVbmetaImage(self, output_path): 250 """Use avbtool to create a vbmeta image with verification disabled. 251 252 Args: 253 output_path: The path to the output vbmeta image. 254 """ 255 avbtool = self._GetBinary(_AVBTOOL) 256 self._ExecuteCommand(avbtool, "make_vbmeta_image", 257 "--flag", "2", 258 "--padding_size", "4096", 259 "--output", output_path) 260 261 @staticmethod 262 def _RewriteSystemQemuConfig(output_file, input_file, get_image): 263 """Rewrite image paths in system-qemu-config.txt. 264 265 Sample input_file: 266 out/target/product/generic_x86_64/vbmeta.img vbmeta 1 267 out/target/product/generic_x86_64/super.img super 2 268 269 Sample output_file: 270 /path/to/vbmeta.img vbmeta 1 271 /path/to/super.img super 2 272 273 This method replaces the first entry of each line with the path 274 returned by get_image. 275 276 Args: 277 output_file: The output file object. 278 input_file: The input file object. 279 get_image: A function that takes the partition name as the 280 parameter and returns the image path. 281 """ 282 for line in input_file: 283 split_line = line.split() 284 if len(split_line) == 3: 285 output_file.write(b("%s %s %s\n" % (get_image(split_line[1]), 286 split_line[1], 287 split_line[2]))) 288 else: 289 output_file.write(b(line)) 290 291 @utils.TimeExecute(function_description="Make combined image") 292 @utils.TimeoutException(_MK_COMBINED_IMG_TIMEOUT_SECS) 293 def MkCombinedImg(self, output_path, system_qemu_config_path, get_image): 294 """Use mk_combined_img to create a disk image. 295 296 Args: 297 output_path: The path to the output disk image. 298 system_qemu_config: The path to the config that provides the 299 parition information on the disk. 300 get_image: A function that takes the partition name as the 301 parameter and returns the image path. 302 """ 303 mk_combined_img = self._GetBinary(_MK_COMBINED_IMG) 304 sgdisk = self._GetBinary(_SGDISK) 305 simg2img = self._GetBinary(_SIMG2IMG) 306 307 new_config_path = None 308 try: 309 with open(system_qemu_config_path, "r") as config: 310 with tempfile.NamedTemporaryFile( 311 prefix="system-qemu-config_", suffix=".txt", 312 delete=False) as new_config: 313 new_config_path = new_config.name 314 self._RewriteSystemQemuConfig(new_config, config, 315 get_image) 316 317 mk_combined_img_env = {"SGDISK": sgdisk, "SIMG2IMG": simg2img} 318 self._ExecuteCommand(mk_combined_img, 319 "-i", new_config_path, 320 "-o", output_path, 321 env=mk_combined_img_env) 322 finally: 323 if new_config_path: 324 os.remove(new_config_path) 325