1#!/usr/bin/env python 2# 3# Copyright (C) 2014 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17""" 18Given a target-files zipfile that does not contain images (ie, does 19not have an IMAGES/ top-level subdirectory), produce the images and 20add them to the zipfile. 21 22Usage: add_img_to_target_files [flag] target_files 23 24 -a (--add_missing) 25 Build and add missing images to "IMAGES/". If this option is 26 not specified, this script will simply exit when "IMAGES/" 27 directory exists in the target file. 28 29 -r (--rebuild_recovery) 30 Rebuild the recovery patch and write it to the system image. Only 31 meaningful when system image needs to be rebuilt and there're separate 32 boot / recovery images. 33 34 --replace_verity_private_key 35 Replace the private key used for verity signing. (same as the option 36 in sign_target_files_apks) 37 38 --replace_verity_public_key 39 Replace the certificate (public key) used for verity verification. (same 40 as the option in sign_target_files_apks) 41 42 --is_signing 43 Skip building & adding the images for "userdata" and "cache" if we 44 are signing the target files. 45""" 46 47from __future__ import print_function 48 49import avbtool 50import datetime 51import logging 52import os 53import shlex 54import shutil 55import stat 56import sys 57import uuid 58import tempfile 59import zipfile 60 61import build_image 62import build_super_image 63import common 64import verity_utils 65import ota_metadata_pb2 66import rangelib 67import sparse_img 68 69from apex_utils import GetApexInfoFromTargetFiles 70from common import ZipDelete, PARTITIONS_WITH_CARE_MAP, ExternalError, RunAndCheckOutput, IsSparseImage, MakeTempFile, ZipWrite 71 72if sys.hexversion < 0x02070000: 73 print("Python 2.7 or newer is required.", file=sys.stderr) 74 sys.exit(1) 75 76logger = logging.getLogger(__name__) 77 78OPTIONS = common.OPTIONS 79OPTIONS.add_missing = False 80OPTIONS.rebuild_recovery = False 81OPTIONS.replace_updated_files_list = [] 82OPTIONS.is_signing = False 83 84# Use a fixed timestamp (01/01/2009 00:00:00 UTC) for files when packaging 85# images. (b/24377993, b/80600931) 86FIXED_FILE_TIMESTAMP = int(( 87 datetime.datetime(2009, 1, 1, 0, 0, 0, 0, None) - 88 datetime.datetime.utcfromtimestamp(0)).total_seconds()) 89 90 91def ParseAvbFooter(img_path) -> avbtool.AvbFooter: 92 with open(img_path, 'rb') as fp: 93 fp.seek(-avbtool.AvbFooter.SIZE, os.SEEK_END) 94 data = fp.read(avbtool.AvbFooter.SIZE) 95 return avbtool.AvbFooter(data) 96 97 98def GetCareMap(which, imgname): 99 """Returns the care_map string for the given partition. 100 101 Args: 102 which: The partition name, must be listed in PARTITIONS_WITH_CARE_MAP. 103 imgname: The filename of the image. 104 105 Returns: 106 (which, care_map_ranges): care_map_ranges is the raw string of the care_map 107 RangeSet; or None. 108 """ 109 assert which in PARTITIONS_WITH_CARE_MAP 110 111 is_sparse_img = IsSparseImage(imgname) 112 unsparsed_image_size = os.path.getsize(imgname) 113 114 # A verified image contains original image + hash tree data + FEC data 115 # + AVB footer, all concatenated together. The caremap specifies a range 116 # of blocks that update_verifier should read on top of dm-verity device 117 # to verify correctness of OTA updates. When reading off of dm-verity device, 118 # the hashtree and FEC part of image isn't available. So caremap should 119 # only contain the original image blocks. 120 try: 121 avbfooter = None 122 if is_sparse_img: 123 with tempfile.NamedTemporaryFile() as tmpfile: 124 img = sparse_img.SparseImage(imgname) 125 unsparsed_image_size = img.total_blocks * img.blocksize 126 for data in img.ReadBlocks(img.total_blocks - 1, 1): 127 tmpfile.write(data) 128 tmpfile.flush() 129 avbfooter = ParseAvbFooter(tmpfile.name) 130 else: 131 avbfooter = ParseAvbFooter(imgname) 132 except LookupError as e: 133 logger.warning( 134 "Failed to parse avbfooter for partition %s image %s, %s", which, imgname, e) 135 return None 136 137 image_size = avbfooter.original_image_size 138 assert image_size < unsparsed_image_size, f"AVB footer's original image size {image_size} is larger than or equal to image size on disk {unsparsed_image_size}, this can't happen because a verified image = original image + hash tree data + FEC data + avbfooter." 139 assert image_size > 0 140 141 image_blocks = int(image_size) // 4096 - 1 142 # It's OK for image_blocks to be 0, because care map ranges are inclusive. 143 # So 0-0 means "just block 0", which is valid. 144 assert image_blocks >= 0, "blocks for {} must be non-negative, image size: {}".format( 145 which, image_size) 146 147 # For sparse images, we will only check the blocks that are listed in the care 148 # map, i.e. the ones with meaningful data. 149 if is_sparse_img: 150 simg = sparse_img.SparseImage(imgname) 151 care_map_ranges = simg.care_map.intersect( 152 rangelib.RangeSet("0-{}".format(image_blocks))) 153 154 # Otherwise for non-sparse images, we read all the blocks in the filesystem 155 # image. 156 else: 157 care_map_ranges = rangelib.RangeSet("0-{}".format(image_blocks)) 158 159 return [which, care_map_ranges.to_string_raw()] 160 161 162def AddCareMapForAbOta(output_file, ab_partitions, image_paths): 163 """Generates and adds care_map.pb for a/b partition that has care_map. 164 165 Args: 166 output_file: The output zip file (needs to be already open), 167 or file path to write care_map.pb. 168 ab_partitions: The list of A/B partitions. 169 image_paths: A map from the partition name to the image path. 170 """ 171 if not output_file: 172 raise ExternalError('Expected output_file for AddCareMapForAbOta') 173 174 care_map_list = [] 175 for partition in ab_partitions: 176 partition = partition.strip() 177 if partition not in PARTITIONS_WITH_CARE_MAP: 178 continue 179 180 verity_block_device = "{}_verity_block_device".format(partition) 181 avb_hashtree_enable = "avb_{}_hashtree_enable".format(partition) 182 if (verity_block_device in OPTIONS.info_dict or 183 OPTIONS.info_dict.get(avb_hashtree_enable) == "true"): 184 if partition not in image_paths: 185 logger.warning('Potential partition with care_map missing from images: %s', 186 partition) 187 continue 188 image_path = image_paths[partition] 189 if not os.path.exists(image_path): 190 raise ExternalError('Expected image at path {}'.format(image_path)) 191 192 care_map = GetCareMap(partition, image_path) 193 if not care_map: 194 continue 195 care_map_list += care_map 196 197 # adds fingerprint field to the care_map 198 # TODO(xunchang) revisit the fingerprint calculation for care_map. 199 partition_props = OPTIONS.info_dict.get(partition + ".build.prop") 200 prop_name_list = ["ro.{}.build.fingerprint".format(partition), 201 "ro.{}.build.thumbprint".format(partition)] 202 203 present_props = [x for x in prop_name_list if 204 partition_props and partition_props.GetProp(x)] 205 if not present_props: 206 logger.warning( 207 "fingerprint is not present for partition %s", partition) 208 property_id, fingerprint = "unknown", "unknown" 209 else: 210 property_id = present_props[0] 211 fingerprint = partition_props.GetProp(property_id) 212 care_map_list += [property_id, fingerprint] 213 214 if not care_map_list: 215 return 216 217 # Converts the list into proto buf message by calling care_map_generator; and 218 # writes the result to a temp file. 219 temp_care_map_text = MakeTempFile(prefix="caremap_text-", 220 suffix=".txt") 221 with open(temp_care_map_text, 'w') as text_file: 222 text_file.write('\n'.join(care_map_list)) 223 224 temp_care_map = MakeTempFile(prefix="caremap-", suffix=".pb") 225 care_map_gen_cmd = ["care_map_generator", temp_care_map_text, temp_care_map] 226 RunAndCheckOutput(care_map_gen_cmd) 227 228 if not isinstance(output_file, zipfile.ZipFile): 229 shutil.copy(temp_care_map, output_file) 230 return 231 # output_file is a zip file 232 care_map_path = "META/care_map.pb" 233 if care_map_path in output_file.namelist(): 234 # Copy the temp file into the OPTIONS.input_tmp dir and update the 235 # replace_updated_files_list used by add_img_to_target_files 236 if not OPTIONS.replace_updated_files_list: 237 OPTIONS.replace_updated_files_list = [] 238 shutil.copy(temp_care_map, os.path.join(OPTIONS.input_tmp, care_map_path)) 239 OPTIONS.replace_updated_files_list.append(care_map_path) 240 else: 241 ZipWrite(output_file, temp_care_map, arcname=care_map_path) 242 243 244class OutputFile(object): 245 """A helper class to write a generated file to the given dir or zip. 246 247 When generating images, we want the outputs to go into the given zip file, or 248 the given dir. 249 250 Attributes: 251 name: The name of the output file, regardless of the final destination. 252 """ 253 254 def __init__(self, output_zip, input_dir, *args): 255 # We write the intermediate output file under the given input_dir, even if 256 # the final destination is a zip archive. 257 self.name = os.path.join(input_dir, *args) 258 self._output_zip = output_zip 259 if self._output_zip: 260 self._zip_name = os.path.join(*args) 261 262 def Write(self, compress_type=None): 263 if self._output_zip: 264 common.ZipWrite(self._output_zip, self.name, 265 self._zip_name, compress_type=compress_type) 266 267 268def AddSystem(output_zip, recovery_img=None, boot_img=None): 269 """Turn the contents of SYSTEM into a system image and store it in 270 output_zip. Returns the name of the system image file.""" 271 272 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "system.img") 273 if os.path.exists(img.name): 274 logger.info("system.img already exists; no need to rebuild...") 275 return img.name 276 277 def output_sink(fn, data): 278 output_file = os.path.join(OPTIONS.input_tmp, "SYSTEM", fn) 279 with open(output_file, "wb") as ofile: 280 ofile.write(data) 281 282 if output_zip: 283 arc_name = "SYSTEM/" + fn 284 if arc_name in output_zip.namelist(): 285 OPTIONS.replace_updated_files_list.append(arc_name) 286 else: 287 common.ZipWrite(output_zip, output_file, arc_name) 288 289 board_uses_vendorimage = OPTIONS.info_dict.get( 290 "board_uses_vendorimage") == "true" 291 292 if (OPTIONS.rebuild_recovery and not board_uses_vendorimage and 293 recovery_img is not None and boot_img is not None): 294 logger.info("Building new recovery patch on system at system/vendor") 295 common.MakeRecoveryPatch(OPTIONS.input_tmp, output_sink, recovery_img, 296 boot_img, info_dict=OPTIONS.info_dict) 297 298 block_list = OutputFile(output_zip, OPTIONS.input_tmp, 299 "IMAGES", "system.map") 300 CreateImage(OPTIONS.input_tmp, OPTIONS.info_dict, "system", img, 301 block_list=block_list) 302 return img.name 303 304 305def AddSystemOther(output_zip): 306 """Turn the contents of SYSTEM_OTHER into a system_other image 307 and store it in output_zip.""" 308 309 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "system_other.img") 310 if os.path.exists(img.name): 311 logger.info("system_other.img already exists; no need to rebuild...") 312 return 313 314 CreateImage(OPTIONS.input_tmp, OPTIONS.info_dict, "system_other", img) 315 316 317def AddVendor(output_zip, recovery_img=None, boot_img=None): 318 """Turn the contents of VENDOR into a vendor image and store in it 319 output_zip.""" 320 321 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "vendor.img") 322 if os.path.exists(img.name): 323 logger.info("vendor.img already exists; no need to rebuild...") 324 return img.name 325 326 def output_sink(fn, data): 327 output_file = os.path.join(OPTIONS.input_tmp, "VENDOR", fn) 328 with open(output_file, "wb") as ofile: 329 ofile.write(data) 330 331 if output_zip: 332 arc_name = "VENDOR/" + fn 333 if arc_name in output_zip.namelist(): 334 OPTIONS.replace_updated_files_list.append(arc_name) 335 else: 336 common.ZipWrite(output_zip, output_file, arc_name) 337 338 board_uses_vendorimage = OPTIONS.info_dict.get( 339 "board_uses_vendorimage") == "true" 340 341 if (OPTIONS.rebuild_recovery and board_uses_vendorimage and 342 recovery_img is not None and boot_img is not None): 343 logger.info("Building new recovery patch on vendor") 344 common.MakeRecoveryPatch(OPTIONS.input_tmp, output_sink, recovery_img, 345 boot_img, info_dict=OPTIONS.info_dict) 346 347 block_list = OutputFile(output_zip, OPTIONS.input_tmp, 348 "IMAGES", "vendor.map") 349 CreateImage(OPTIONS.input_tmp, OPTIONS.info_dict, "vendor", img, 350 block_list=block_list) 351 return img.name 352 353 354def AddProduct(output_zip): 355 """Turn the contents of PRODUCT into a product image and store it in 356 output_zip.""" 357 358 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "product.img") 359 if os.path.exists(img.name): 360 logger.info("product.img already exists; no need to rebuild...") 361 return img.name 362 363 block_list = OutputFile( 364 output_zip, OPTIONS.input_tmp, "IMAGES", "product.map") 365 CreateImage( 366 OPTIONS.input_tmp, OPTIONS.info_dict, "product", img, 367 block_list=block_list) 368 return img.name 369 370 371def AddSystemExt(output_zip): 372 """Turn the contents of SYSTEM_EXT into a system_ext image and store it in 373 output_zip.""" 374 375 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", 376 "system_ext.img") 377 if os.path.exists(img.name): 378 logger.info("system_ext.img already exists; no need to rebuild...") 379 return img.name 380 381 block_list = OutputFile( 382 output_zip, OPTIONS.input_tmp, "IMAGES", "system_ext.map") 383 CreateImage( 384 OPTIONS.input_tmp, OPTIONS.info_dict, "system_ext", img, 385 block_list=block_list) 386 return img.name 387 388 389def AddOdm(output_zip): 390 """Turn the contents of ODM into an odm image and store it in output_zip.""" 391 392 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "odm.img") 393 if os.path.exists(img.name): 394 logger.info("odm.img already exists; no need to rebuild...") 395 return img.name 396 397 block_list = OutputFile( 398 output_zip, OPTIONS.input_tmp, "IMAGES", "odm.map") 399 CreateImage( 400 OPTIONS.input_tmp, OPTIONS.info_dict, "odm", img, 401 block_list=block_list) 402 return img.name 403 404 405def AddVendorDlkm(output_zip): 406 """Turn the contents of VENDOR_DLKM into an vendor_dlkm image and store it in output_zip.""" 407 408 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "vendor_dlkm.img") 409 if os.path.exists(img.name): 410 logger.info("vendor_dlkm.img already exists; no need to rebuild...") 411 return img.name 412 413 block_list = OutputFile( 414 output_zip, OPTIONS.input_tmp, "IMAGES", "vendor_dlkm.map") 415 CreateImage( 416 OPTIONS.input_tmp, OPTIONS.info_dict, "vendor_dlkm", img, 417 block_list=block_list) 418 return img.name 419 420 421def AddOdmDlkm(output_zip): 422 """Turn the contents of OdmDlkm into an odm_dlkm image and store it in output_zip.""" 423 424 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "odm_dlkm.img") 425 if os.path.exists(img.name): 426 logger.info("odm_dlkm.img already exists; no need to rebuild...") 427 return img.name 428 429 block_list = OutputFile( 430 output_zip, OPTIONS.input_tmp, "IMAGES", "odm_dlkm.map") 431 CreateImage( 432 OPTIONS.input_tmp, OPTIONS.info_dict, "odm_dlkm", img, 433 block_list=block_list) 434 return img.name 435 436 437def AddSystemDlkm(output_zip): 438 """Turn the contents of SystemDlkm into an system_dlkm image and store it in output_zip.""" 439 440 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "system_dlkm.img") 441 if os.path.exists(img.name): 442 logger.info("system_dlkm.img already exists; no need to rebuild...") 443 return img.name 444 445 block_list = OutputFile( 446 output_zip, OPTIONS.input_tmp, "IMAGES", "system_dlkm.map") 447 CreateImage( 448 OPTIONS.input_tmp, OPTIONS.info_dict, "system_dlkm", img, 449 block_list=block_list) 450 return img.name 451 452 453def AddDtbo(output_zip): 454 """Adds the DTBO image. 455 456 Uses the image under IMAGES/ if it already exists. Otherwise looks for the 457 image under PREBUILT_IMAGES/, signs it as needed, and returns the image name. 458 """ 459 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "dtbo.img") 460 if os.path.exists(img.name): 461 logger.info("dtbo.img already exists; no need to rebuild...") 462 return img.name 463 464 dtbo_prebuilt_path = os.path.join( 465 OPTIONS.input_tmp, "PREBUILT_IMAGES", "dtbo.img") 466 assert os.path.exists(dtbo_prebuilt_path) 467 shutil.copy(dtbo_prebuilt_path, img.name) 468 469 # AVB-sign the image as needed. 470 if OPTIONS.info_dict.get("avb_enable") == "true": 471 # Signing requires +w 472 os.chmod(img.name, os.stat(img.name).st_mode | stat.S_IWUSR) 473 474 avbtool = OPTIONS.info_dict["avb_avbtool"] 475 part_size = OPTIONS.info_dict["dtbo_size"] 476 # The AVB hash footer will be replaced if already present. 477 cmd = [avbtool, "add_hash_footer", "--image", img.name, 478 "--partition_size", str(part_size), "--partition_name", "dtbo"] 479 common.AppendAVBSigningArgs(cmd, "dtbo") 480 args = OPTIONS.info_dict.get("avb_dtbo_add_hash_footer_args") 481 if args and args.strip(): 482 cmd.extend(shlex.split(args)) 483 common.RunAndCheckOutput(cmd) 484 485 img.Write() 486 return img.name 487 488 489def AddPvmfw(output_zip): 490 """Adds the pvmfw image. 491 492 Uses the image under IMAGES/ if it already exists. Otherwise looks for the 493 image under PREBUILT_IMAGES/, signs it as needed, and returns the image name. 494 """ 495 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "pvmfw.img") 496 if os.path.exists(img.name): 497 logger.info("pvmfw.img already exists; no need to rebuild...") 498 return img.name 499 500 pvmfw_prebuilt_path = os.path.join( 501 OPTIONS.input_tmp, "PREBUILT_IMAGES", "pvmfw.img") 502 assert os.path.exists(pvmfw_prebuilt_path) 503 shutil.copy(pvmfw_prebuilt_path, img.name) 504 505 # AVB-sign the image as needed. 506 if OPTIONS.info_dict.get("avb_enable") == "true": 507 # Signing requires +w 508 os.chmod(img.name, os.stat(img.name).st_mode | stat.S_IWUSR) 509 510 avbtool = OPTIONS.info_dict["avb_avbtool"] 511 part_size = OPTIONS.info_dict["pvmfw_size"] 512 # The AVB hash footer will be replaced if already present. 513 cmd = [avbtool, "add_hash_footer", "--image", img.name, 514 "--partition_size", str(part_size), "--partition_name", "pvmfw"] 515 common.AppendAVBSigningArgs(cmd, "pvmfw") 516 args = OPTIONS.info_dict.get("avb_pvmfw_add_hash_footer_args") 517 if args and args.strip(): 518 cmd.extend(shlex.split(args)) 519 common.RunAndCheckOutput(cmd) 520 521 img.Write() 522 return img.name 523 524 525def AddCustomImages(output_zip, partition_name): 526 """Adds and signs custom images in IMAGES/. 527 528 Args: 529 output_zip: The output zip file (needs to be already open), or None to 530 write images to OPTIONS.input_tmp/. 531 532 Uses the image under IMAGES/ if it already exists. Otherwise looks for the 533 image under PREBUILT_IMAGES/, signs it as needed, and returns the image name. 534 535 Raises: 536 AssertionError: If image can't be found. 537 """ 538 539 key_path = OPTIONS.info_dict.get("avb_{}_key_path".format(partition_name)) 540 algorithm = OPTIONS.info_dict.get("avb_{}_algorithm".format(partition_name)) 541 extra_args = OPTIONS.info_dict.get( 542 "avb_{}_add_hashtree_footer_args".format(partition_name)) 543 partition_size = OPTIONS.info_dict.get( 544 "avb_{}_partition_size".format(partition_name)) 545 546 builder = verity_utils.CreateCustomImageBuilder( 547 OPTIONS.info_dict, partition_name, partition_size, 548 key_path, algorithm, extra_args) 549 550 for img_name in OPTIONS.info_dict.get( 551 "avb_{}_image_list".format(partition_name)).split(): 552 custom_image = OutputFile( 553 output_zip, OPTIONS.input_tmp, "IMAGES", img_name) 554 if os.path.exists(custom_image.name): 555 continue 556 557 custom_image_prebuilt_path = os.path.join( 558 OPTIONS.input_tmp, "PREBUILT_IMAGES", img_name) 559 assert os.path.exists(custom_image_prebuilt_path), \ 560 "Failed to find %s at %s" % (img_name, custom_image_prebuilt_path) 561 562 shutil.copy(custom_image_prebuilt_path, custom_image.name) 563 564 if builder is not None: 565 builder.Build(custom_image.name) 566 567 custom_image.Write() 568 569 default = os.path.join(OPTIONS.input_tmp, "IMAGES", partition_name + ".img") 570 assert os.path.exists(default), \ 571 "There should be one %s.img" % (partition_name) 572 return default 573 574 575def CreateImage(input_dir, info_dict, what, output_file, block_list=None): 576 logger.info("creating %s.img...", what) 577 578 image_props = build_image.ImagePropFromGlobalDict(info_dict, what) 579 image_props["timestamp"] = FIXED_FILE_TIMESTAMP 580 581 if what == "system": 582 fs_config_prefix = "" 583 else: 584 fs_config_prefix = what + "_" 585 586 fs_config = os.path.join( 587 input_dir, "META/" + fs_config_prefix + "filesystem_config.txt") 588 if not os.path.exists(fs_config): 589 fs_config = None 590 591 # Override values loaded from info_dict. 592 if fs_config: 593 image_props["fs_config"] = fs_config 594 if block_list: 595 image_props["block_list"] = block_list.name 596 597 # Use repeatable ext4 FS UUID and hash_seed UUID (based on partition name and 598 # build fingerprint). Also use the legacy build id, because the vbmeta digest 599 # isn't available at this point. 600 build_info = common.BuildInfo(info_dict, use_legacy_id=True) 601 uuid_seed = what + "-" + build_info.GetPartitionFingerprint(what) 602 image_props["uuid"] = str(uuid.uuid5(uuid.NAMESPACE_URL, uuid_seed)) 603 hash_seed = "hash_seed-" + uuid_seed 604 image_props["hash_seed"] = str(uuid.uuid5(uuid.NAMESPACE_URL, hash_seed)) 605 606 build_image.BuildImage( 607 os.path.join(input_dir, what.upper()), image_props, output_file.name) 608 609 output_file.Write() 610 if block_list: 611 block_list.Write() 612 613 # Set the '_image_size' for given image size. 614 is_verity_partition = "verity_block_device" in image_props 615 verity_supported = (image_props.get("avb_enable") == "true") 616 is_avb_enable = image_props.get("avb_hashtree_enable") == "true" 617 if verity_supported and (is_verity_partition or is_avb_enable): 618 image_size = image_props.get("image_size") 619 if image_size: 620 image_size_key = what + "_image_size" 621 info_dict[image_size_key] = int(image_size) 622 623 use_dynamic_size = ( 624 info_dict.get("use_dynamic_partition_size") == "true" and 625 what in shlex.split(info_dict.get("dynamic_partition_list", "").strip())) 626 if use_dynamic_size: 627 info_dict.update(build_image.GlobalDictFromImageProp(image_props, what)) 628 629 630def AddUserdata(output_zip): 631 """Create a userdata image and store it in output_zip. 632 633 In most case we just create and store an empty userdata.img; 634 But the invoker can also request to create userdata.img with real 635 data from the target files, by setting "userdata_img_with_data=true" 636 in OPTIONS.info_dict. 637 """ 638 639 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "userdata.img") 640 if os.path.exists(img.name): 641 logger.info("userdata.img already exists; no need to rebuild...") 642 return 643 644 # Skip userdata.img if no size. 645 image_props = build_image.ImagePropFromGlobalDict(OPTIONS.info_dict, "data") 646 if not image_props.get("partition_size"): 647 return 648 649 logger.info("creating userdata.img...") 650 651 image_props["timestamp"] = FIXED_FILE_TIMESTAMP 652 653 if OPTIONS.info_dict.get("userdata_img_with_data") == "true": 654 user_dir = os.path.join(OPTIONS.input_tmp, "DATA") 655 else: 656 user_dir = common.MakeTempDir() 657 658 build_image.BuildImage(user_dir, image_props, img.name) 659 660 common.CheckSize(img.name, "userdata.img", OPTIONS.info_dict) 661 # Always use compression for useradata image. 662 # As it's likely huge and consist of lots of 0s. 663 img.Write(zipfile.ZIP_DEFLATED) 664 665 666def AddVBMeta(output_zip, partitions, name, needed_partitions): 667 """Creates a VBMeta image and stores it in output_zip. 668 669 It generates the requested VBMeta image. The requested image could be for 670 top-level or chained VBMeta image, which is determined based on the name. 671 672 Args: 673 output_zip: The output zip file, which needs to be already open. 674 partitions: A dict that's keyed by partition names with image paths as 675 values. Only valid partition names are accepted, as partitions listed 676 in common.AVB_PARTITIONS and custom partitions listed in 677 OPTIONS.info_dict.get("avb_custom_images_partition_list") 678 name: Name of the VBMeta partition, e.g. 'vbmeta', 'vbmeta_system'. 679 needed_partitions: Partitions whose descriptors should be included into the 680 generated VBMeta image. 681 682 Returns: 683 Path to the created image. 684 685 Raises: 686 AssertionError: On invalid input args. 687 """ 688 assert needed_partitions, "Needed partitions must be specified" 689 690 img = OutputFile( 691 output_zip, OPTIONS.input_tmp, "IMAGES", "{}.img".format(name)) 692 if os.path.exists(img.name): 693 logger.info("%s.img already exists; not rebuilding...", name) 694 return img.name 695 696 common.BuildVBMeta(img.name, partitions, name, needed_partitions) 697 img.Write() 698 return img.name 699 700 701def AddPartitionTable(output_zip): 702 """Create a partition table image and store it in output_zip.""" 703 704 img = OutputFile( 705 output_zip, OPTIONS.input_tmp, "IMAGES", "partition-table.img") 706 bpt = OutputFile( 707 output_zip, OPTIONS.input_tmp, "META", "partition-table.bpt") 708 709 # use BPTTOOL from environ, or "bpttool" if empty or not set. 710 bpttool = os.getenv("BPTTOOL") or "bpttool" 711 cmd = [bpttool, "make_table", "--output_json", bpt.name, 712 "--output_gpt", img.name] 713 input_files_str = OPTIONS.info_dict["board_bpt_input_files"] 714 input_files = input_files_str.split() 715 for i in input_files: 716 cmd.extend(["--input", i]) 717 disk_size = OPTIONS.info_dict.get("board_bpt_disk_size") 718 if disk_size: 719 cmd.extend(["--disk_size", disk_size]) 720 args = OPTIONS.info_dict.get("board_bpt_make_table_args") 721 if args: 722 cmd.extend(shlex.split(args)) 723 common.RunAndCheckOutput(cmd) 724 725 img.Write() 726 bpt.Write() 727 728 729def AddCache(output_zip): 730 """Create an empty cache image and store it in output_zip.""" 731 732 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "cache.img") 733 if os.path.exists(img.name): 734 logger.info("cache.img already exists; no need to rebuild...") 735 return 736 737 image_props = build_image.ImagePropFromGlobalDict(OPTIONS.info_dict, "cache") 738 # The build system has to explicitly request for cache.img. 739 if "fs_type" not in image_props: 740 return 741 742 logger.info("creating cache.img...") 743 744 image_props["timestamp"] = FIXED_FILE_TIMESTAMP 745 746 user_dir = common.MakeTempDir() 747 build_image.BuildImage(user_dir, image_props, img.name) 748 749 common.CheckSize(img.name, "cache.img", OPTIONS.info_dict) 750 img.Write() 751 752 753def CheckAbOtaImages(output_zip, ab_partitions): 754 """Checks that all the listed A/B partitions have their images available. 755 756 The images need to be available under IMAGES/ or RADIO/, with the former takes 757 a priority. 758 759 Args: 760 output_zip: The output zip file (needs to be already open), or None to 761 find images in OPTIONS.input_tmp/. 762 ab_partitions: The list of A/B partitions. 763 764 Raises: 765 AssertionError: If it can't find an image. 766 """ 767 for partition in ab_partitions: 768 img_name = partition + ".img" 769 770 # Assert that the image is present under IMAGES/ now. 771 if output_zip: 772 # Zip spec says: All slashes MUST be forward slashes. 773 images_path = "IMAGES/" + img_name 774 radio_path = "RADIO/" + img_name 775 available = (images_path in output_zip.namelist() or 776 radio_path in output_zip.namelist()) 777 else: 778 images_path = os.path.join(OPTIONS.input_tmp, "IMAGES", img_name) 779 radio_path = os.path.join(OPTIONS.input_tmp, "RADIO", img_name) 780 available = os.path.exists(images_path) or os.path.exists(radio_path) 781 782 assert available, "Failed to find " + img_name 783 784 785def AddPackRadioImages(output_zip, images): 786 """Copies images listed in META/pack_radioimages.txt from RADIO/ to IMAGES/. 787 788 Args: 789 output_zip: The output zip file (needs to be already open), or None to 790 write images to OPTIONS.input_tmp/. 791 images: A list of image names. 792 793 Raises: 794 AssertionError: If a listed image can't be found. 795 """ 796 for image in images: 797 img_name = image.strip() 798 _, ext = os.path.splitext(img_name) 799 if not ext: 800 img_name += ".img" 801 802 prebuilt_path = os.path.join(OPTIONS.input_tmp, "IMAGES", img_name) 803 if os.path.exists(prebuilt_path): 804 logger.info("%s already exists, no need to overwrite...", img_name) 805 continue 806 807 img_radio_path = os.path.join(OPTIONS.input_tmp, "RADIO", img_name) 808 assert os.path.exists(img_radio_path), \ 809 "Failed to find %s at %s" % (img_name, img_radio_path) 810 811 if output_zip: 812 common.ZipWrite(output_zip, img_radio_path, "IMAGES/" + img_name) 813 else: 814 shutil.copy(img_radio_path, prebuilt_path) 815 816 817def AddSuperEmpty(output_zip): 818 """Create a super_empty.img and store it in output_zip.""" 819 820 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "super_empty.img") 821 if os.path.exists(img.name): 822 logger.info("super_empty.img already exists; no need to rebuild...") 823 return 824 build_super_image.BuildSuperImage(OPTIONS.info_dict, img.name) 825 img.Write() 826 827 828def AddSuperSplit(output_zip): 829 """Create split super_*.img and store it in output_zip.""" 830 831 outdir = os.path.join(OPTIONS.input_tmp, "OTA") 832 built = build_super_image.BuildSuperImage(OPTIONS.input_tmp, outdir) 833 834 if built: 835 for dev in OPTIONS.info_dict['super_block_devices'].strip().split(): 836 img = OutputFile(output_zip, OPTIONS.input_tmp, "OTA", 837 "super_" + dev + ".img") 838 img.Write() 839 840 841def ReplaceUpdatedFiles(zip_filename, files_list): 842 """Updates all the ZIP entries listed in files_list. 843 844 For now the list includes META/care_map.pb, and the related files under 845 SYSTEM/ after rebuilding recovery. 846 """ 847 common.ZipDelete(zip_filename, files_list) 848 output_zip = zipfile.ZipFile(zip_filename, "a", 849 compression=zipfile.ZIP_DEFLATED, 850 allowZip64=True) 851 for item in files_list: 852 file_path = os.path.join(OPTIONS.input_tmp, item) 853 assert os.path.exists(file_path) 854 common.ZipWrite(output_zip, file_path, arcname=item) 855 common.ZipClose(output_zip) 856 857 858def HasPartition(partition_name): 859 """Determines if the target files archive should build a given partition.""" 860 861 return ((os.path.isdir( 862 os.path.join(OPTIONS.input_tmp, partition_name.upper())) and 863 OPTIONS.info_dict.get( 864 "building_{}_image".format(partition_name)) == "true") or 865 os.path.exists( 866 os.path.join(OPTIONS.input_tmp, "IMAGES", 867 "{}.img".format(partition_name)))) 868 869 870def AddApexInfo(output_zip): 871 apex_infos = GetApexInfoFromTargetFiles(OPTIONS.input_tmp, 'system', 872 compressed_only=False) 873 apex_metadata_proto = ota_metadata_pb2.ApexMetadata() 874 apex_metadata_proto.apex_info.extend(apex_infos) 875 apex_info_bytes = apex_metadata_proto.SerializeToString() 876 877 output_file = os.path.join(OPTIONS.input_tmp, "META", "apex_info.pb") 878 with open(output_file, "wb") as ofile: 879 ofile.write(apex_info_bytes) 880 if output_zip: 881 arc_name = "META/apex_info.pb" 882 if arc_name in output_zip.namelist(): 883 OPTIONS.replace_updated_files_list.append(arc_name) 884 else: 885 common.ZipWrite(output_zip, output_file, arc_name) 886 887 888def AddVbmetaDigest(output_zip): 889 """Write the vbmeta digest to the output dir and zipfile.""" 890 891 # Calculate the vbmeta digest and put the result in to META/ 892 boot_images = OPTIONS.info_dict.get("boot_images") 893 # Disable the digest calculation if the target_file is used as a container 894 # for boot images. A boot container might contain boot-5.4.img, boot-5.10.img 895 # etc., instead of just a boot.img and will fail in vbmeta digest calculation. 896 boot_container = boot_images and ( 897 len(boot_images.split()) >= 2 or boot_images.split()[0] != 'boot.img') 898 if (OPTIONS.info_dict.get("avb_enable") == "true" and not boot_container and 899 OPTIONS.info_dict.get("avb_building_vbmeta_image") == "true"): 900 avbtool = OPTIONS.info_dict["avb_avbtool"] 901 digest = verity_utils.CalculateVbmetaDigest(OPTIONS.input_tmp, avbtool) 902 vbmeta_digest_txt = os.path.join(OPTIONS.input_tmp, "META", 903 "vbmeta_digest.txt") 904 with open(vbmeta_digest_txt, 'w') as f: 905 f.write(digest) 906 # writes to the output zipfile 907 if output_zip: 908 arc_name = "META/vbmeta_digest.txt" 909 if arc_name in output_zip.namelist(): 910 OPTIONS.replace_updated_files_list.append(arc_name) 911 else: 912 common.ZipWriteStr(output_zip, arc_name, digest) 913 914 915def AddImagesToTargetFiles(filename): 916 """Creates and adds images (boot/recovery/system/...) to a target_files.zip. 917 918 It works with either a zip file (zip mode), or a directory that contains the 919 files to be packed into a target_files.zip (dir mode). The latter is used when 920 being called from build/make/core/Makefile. 921 922 The images will be created under IMAGES/ in the input target_files.zip. 923 924 Args: 925 filename: the target_files.zip, or the zip root directory. 926 """ 927 if os.path.isdir(filename): 928 OPTIONS.input_tmp = os.path.abspath(filename) 929 else: 930 OPTIONS.input_tmp = common.UnzipTemp(filename) 931 932 if not OPTIONS.add_missing: 933 if os.path.isdir(os.path.join(OPTIONS.input_tmp, "IMAGES")): 934 logger.warning("target_files appears to already contain images.") 935 sys.exit(1) 936 937 OPTIONS.info_dict = common.LoadInfoDict(OPTIONS.input_tmp, repacking=True) 938 939 has_recovery = OPTIONS.info_dict.get("no_recovery") != "true" 940 has_boot = OPTIONS.info_dict.get("no_boot") != "true" 941 has_init_boot = OPTIONS.info_dict.get("init_boot") == "true" 942 has_vendor_boot = OPTIONS.info_dict.get("vendor_boot") == "true" 943 has_vendor_kernel_boot = OPTIONS.info_dict.get( 944 "vendor_kernel_boot") == "true" 945 946 # {vendor,odm,product,system_ext,vendor_dlkm,odm_dlkm, system_dlkm, system, system_other}.img 947 # can be built from source, or dropped into target_files.zip as a prebuilt blob. 948 has_vendor = HasPartition("vendor") 949 has_odm = HasPartition("odm") 950 has_vendor_dlkm = HasPartition("vendor_dlkm") 951 has_odm_dlkm = HasPartition("odm_dlkm") 952 has_system_dlkm = HasPartition("system_dlkm") 953 has_product = HasPartition("product") 954 has_system_ext = HasPartition("system_ext") 955 has_system = HasPartition("system") 956 has_system_other = HasPartition("system_other") 957 has_userdata = OPTIONS.info_dict.get("building_userdata_image") == "true" 958 has_cache = OPTIONS.info_dict.get("building_cache_image") == "true" 959 960 # Set up the output destination. It writes to the given directory for dir 961 # mode; otherwise appends to the given ZIP. 962 if os.path.isdir(filename): 963 output_zip = None 964 else: 965 output_zip = zipfile.ZipFile(filename, "a", 966 compression=zipfile.ZIP_DEFLATED, 967 allowZip64=True) 968 969 # Always make input_tmp/IMAGES available, since we may stage boot / recovery 970 # images there even under zip mode. The directory will be cleaned up as part 971 # of OPTIONS.input_tmp. 972 images_dir = os.path.join(OPTIONS.input_tmp, "IMAGES") 973 if not os.path.isdir(images_dir): 974 os.makedirs(images_dir) 975 976 # A map between partition names and their paths, which could be used when 977 # generating AVB vbmeta image. 978 partitions = {} 979 980 def banner(s): 981 logger.info("\n\n++++ %s ++++\n\n", s) 982 983 boot_image = None 984 if has_boot: 985 banner("boot") 986 boot_images = OPTIONS.info_dict.get("boot_images") 987 if boot_images is None: 988 boot_images = "boot.img" 989 for index, b in enumerate(boot_images.split()): 990 # common.GetBootableImage() returns the image directly if present. 991 boot_image = common.GetBootableImage( 992 "IMAGES/" + b, b, OPTIONS.input_tmp, "BOOT") 993 # boot.img may be unavailable in some targets (e.g. aosp_arm64). 994 if boot_image: 995 boot_image_path = os.path.join(OPTIONS.input_tmp, "IMAGES", b) 996 # Although multiple boot images can be generated, include the image 997 # descriptor of only the first boot image in vbmeta 998 if index == 0: 999 partitions['boot'] = boot_image_path 1000 if not os.path.exists(boot_image_path): 1001 boot_image.WriteToDir(OPTIONS.input_tmp) 1002 if output_zip: 1003 boot_image.AddToZip(output_zip) 1004 1005 if has_init_boot: 1006 banner("init_boot") 1007 init_boot_image = common.GetBootableImage( 1008 "IMAGES/init_boot.img", "init_boot.img", OPTIONS.input_tmp, "INIT_BOOT", 1009 dev_nodes=True) 1010 if init_boot_image: 1011 partitions['init_boot'] = os.path.join( 1012 OPTIONS.input_tmp, "IMAGES", "init_boot.img") 1013 if not os.path.exists(partitions['init_boot']): 1014 init_boot_image.WriteToDir(OPTIONS.input_tmp) 1015 if output_zip: 1016 init_boot_image.AddToZip(output_zip) 1017 1018 if has_vendor_boot: 1019 banner("vendor_boot") 1020 vendor_boot_image = common.GetVendorBootImage( 1021 "IMAGES/vendor_boot.img", "vendor_boot.img", OPTIONS.input_tmp, 1022 "VENDOR_BOOT") 1023 if vendor_boot_image: 1024 partitions['vendor_boot'] = os.path.join(OPTIONS.input_tmp, "IMAGES", 1025 "vendor_boot.img") 1026 if not os.path.exists(partitions['vendor_boot']): 1027 vendor_boot_image.WriteToDir(OPTIONS.input_tmp) 1028 if output_zip: 1029 vendor_boot_image.AddToZip(output_zip) 1030 1031 if has_vendor_kernel_boot: 1032 banner("vendor_kernel_boot") 1033 vendor_kernel_boot_image = common.GetVendorKernelBootImage( 1034 "IMAGES/vendor_kernel_boot.img", "vendor_kernel_boot.img", OPTIONS.input_tmp, 1035 "VENDOR_KERNEL_BOOT") 1036 if vendor_kernel_boot_image: 1037 partitions['vendor_kernel_boot'] = os.path.join(OPTIONS.input_tmp, "IMAGES", 1038 "vendor_kernel_boot.img") 1039 if not os.path.exists(partitions['vendor_kernel_boot']): 1040 vendor_kernel_boot_image.WriteToDir(OPTIONS.input_tmp) 1041 if output_zip: 1042 vendor_kernel_boot_image.AddToZip(output_zip) 1043 1044 recovery_image = None 1045 if has_recovery: 1046 banner("recovery") 1047 recovery_image = common.GetBootableImage( 1048 "IMAGES/recovery.img", "recovery.img", OPTIONS.input_tmp, "RECOVERY") 1049 assert recovery_image, "Failed to create recovery.img." 1050 partitions['recovery'] = os.path.join( 1051 OPTIONS.input_tmp, "IMAGES", "recovery.img") 1052 if not os.path.exists(partitions['recovery']): 1053 recovery_image.WriteToDir(OPTIONS.input_tmp) 1054 if output_zip: 1055 recovery_image.AddToZip(output_zip) 1056 1057 banner("recovery (two-step image)") 1058 # The special recovery.img for two-step package use. 1059 recovery_two_step_image = common.GetBootableImage( 1060 "OTA/recovery-two-step.img", "recovery-two-step.img", 1061 OPTIONS.input_tmp, "RECOVERY", two_step_image=True) 1062 assert recovery_two_step_image, "Failed to create recovery-two-step.img." 1063 recovery_two_step_image_path = os.path.join( 1064 OPTIONS.input_tmp, "OTA", "recovery-two-step.img") 1065 if not os.path.exists(recovery_two_step_image_path): 1066 recovery_two_step_image.WriteToDir(OPTIONS.input_tmp) 1067 if output_zip: 1068 recovery_two_step_image.AddToZip(output_zip) 1069 1070 def add_partition(partition, has_partition, add_func, add_args): 1071 if has_partition: 1072 banner(partition) 1073 partitions[partition] = add_func(output_zip, *add_args) 1074 1075 add_partition_calls = ( 1076 ("system", has_system, AddSystem, [recovery_image, boot_image]), 1077 ("vendor", has_vendor, AddVendor, [recovery_image, boot_image]), 1078 ("product", has_product, AddProduct, []), 1079 ("system_ext", has_system_ext, AddSystemExt, []), 1080 ("odm", has_odm, AddOdm, []), 1081 ("vendor_dlkm", has_vendor_dlkm, AddVendorDlkm, []), 1082 ("odm_dlkm", has_odm_dlkm, AddOdmDlkm, []), 1083 ("system_dlkm", has_system_dlkm, AddSystemDlkm, []), 1084 ("system_other", has_system_other, AddSystemOther, []), 1085 ) 1086 for call in add_partition_calls: 1087 add_partition(*call) 1088 1089 AddApexInfo(output_zip) 1090 1091 if not OPTIONS.is_signing: 1092 banner("userdata") 1093 AddUserdata(output_zip) 1094 banner("cache") 1095 AddCache(output_zip) 1096 1097 if OPTIONS.info_dict.get("board_bpt_enable") == "true": 1098 banner("partition-table") 1099 AddPartitionTable(output_zip) 1100 1101 add_partition("dtbo", 1102 OPTIONS.info_dict.get("has_dtbo") == "true", AddDtbo, []) 1103 add_partition("pvmfw", 1104 OPTIONS.info_dict.get("has_pvmfw") == "true", AddPvmfw, []) 1105 1106 # Custom images. 1107 custom_partitions = OPTIONS.info_dict.get( 1108 "avb_custom_images_partition_list", "").strip().split() 1109 for partition_name in custom_partitions: 1110 partition_name = partition_name.strip() 1111 banner("custom images for " + partition_name) 1112 partitions[partition_name] = AddCustomImages(output_zip, partition_name) 1113 1114 if OPTIONS.info_dict.get("avb_enable") == "true": 1115 # vbmeta_partitions includes the partitions that should be included into 1116 # top-level vbmeta.img, which are the ones that are not included in any 1117 # chained VBMeta image plus the chained VBMeta images themselves. 1118 # Currently custom_partitions are all chained to VBMeta image. 1119 vbmeta_partitions = common.AVB_PARTITIONS[:] + tuple(custom_partitions) 1120 1121 vbmeta_system = OPTIONS.info_dict.get("avb_vbmeta_system", "").strip() 1122 if vbmeta_system: 1123 banner("vbmeta_system") 1124 partitions["vbmeta_system"] = AddVBMeta( 1125 output_zip, partitions, "vbmeta_system", vbmeta_system.split()) 1126 vbmeta_partitions = [ 1127 item for item in vbmeta_partitions 1128 if item not in vbmeta_system.split()] 1129 vbmeta_partitions.append("vbmeta_system") 1130 1131 vbmeta_vendor = OPTIONS.info_dict.get("avb_vbmeta_vendor", "").strip() 1132 if vbmeta_vendor: 1133 banner("vbmeta_vendor") 1134 partitions["vbmeta_vendor"] = AddVBMeta( 1135 output_zip, partitions, "vbmeta_vendor", vbmeta_vendor.split()) 1136 vbmeta_partitions = [ 1137 item for item in vbmeta_partitions 1138 if item not in vbmeta_vendor.split()] 1139 vbmeta_partitions.append("vbmeta_vendor") 1140 custom_avb_partitions = OPTIONS.info_dict.get("avb_custom_vbmeta_images_partition_list", "").strip().split() 1141 if custom_avb_partitions: 1142 for avb_part in custom_avb_partitions: 1143 partition_name = "vbmeta_" + avb_part 1144 included_partitions = OPTIONS.info_dict.get("avb_vbmeta_{}".format(avb_part), "").strip().split() 1145 assert included_partitions, "Custom vbmeta partition {0} missing avb_vbmeta_{0} prop".format(avb_part) 1146 banner(partition_name) 1147 logger.info("VBMeta partition {} needs {}".format(partition_name, included_partitions)) 1148 partitions[partition_name] = AddVBMeta( 1149 output_zip, partitions, partition_name, included_partitions) 1150 vbmeta_partitions = [ 1151 item for item in vbmeta_partitions 1152 if item not in included_partitions] 1153 vbmeta_partitions.append(partition_name) 1154 1155 1156 if OPTIONS.info_dict.get("avb_building_vbmeta_image") == "true": 1157 banner("vbmeta") 1158 AddVBMeta(output_zip, partitions, "vbmeta", vbmeta_partitions) 1159 1160 if OPTIONS.info_dict.get("use_dynamic_partitions") == "true": 1161 if OPTIONS.info_dict.get("build_super_empty_partition") == "true": 1162 banner("super_empty") 1163 AddSuperEmpty(output_zip) 1164 1165 if OPTIONS.info_dict.get("build_super_partition") == "true": 1166 if OPTIONS.info_dict.get( 1167 "build_retrofit_dynamic_partitions_ota_package") == "true": 1168 banner("super split images") 1169 AddSuperSplit(output_zip) 1170 1171 banner("radio") 1172 ab_partitions_txt = os.path.join(OPTIONS.input_tmp, "META", 1173 "ab_partitions.txt") 1174 if os.path.exists(ab_partitions_txt): 1175 with open(ab_partitions_txt) as f: 1176 ab_partitions = f.read().splitlines() 1177 1178 # For devices using A/B update, make sure we have all the needed images 1179 # ready under IMAGES/ or RADIO/. 1180 CheckAbOtaImages(output_zip, ab_partitions) 1181 1182 # Generate care_map.pb for ab_partitions, then write this file to 1183 # target_files package. 1184 output_care_map = os.path.join(OPTIONS.input_tmp, "META", "care_map.pb") 1185 AddCareMapForAbOta(output_zip if output_zip else output_care_map, 1186 ab_partitions, partitions) 1187 1188 # Radio images that need to be packed into IMAGES/, and product-img.zip. 1189 pack_radioimages_txt = os.path.join( 1190 OPTIONS.input_tmp, "META", "pack_radioimages.txt") 1191 if os.path.exists(pack_radioimages_txt): 1192 with open(pack_radioimages_txt) as f: 1193 AddPackRadioImages(output_zip, f.readlines()) 1194 1195 AddVbmetaDigest(output_zip) 1196 1197 if output_zip: 1198 common.ZipClose(output_zip) 1199 if OPTIONS.replace_updated_files_list: 1200 ReplaceUpdatedFiles(output_zip.filename, 1201 OPTIONS.replace_updated_files_list) 1202 1203 1204def OptimizeCompressedEntries(zipfile_path): 1205 """Convert files that do not compress well to uncompressed storage 1206 1207 EROFS images tend to be compressed already, so compressing them again 1208 yields little space savings. Leaving them uncompressed will make 1209 downstream tooling's job easier, and save compute time. 1210 """ 1211 if not zipfile.is_zipfile(zipfile_path): 1212 return 1213 entries_to_store = [] 1214 with tempfile.TemporaryDirectory() as tmpdir: 1215 with zipfile.ZipFile(zipfile_path, "r", allowZip64=True) as zfp: 1216 for zinfo in zfp.filelist: 1217 if not zinfo.filename.startswith("IMAGES/") and not zinfo.filename.startswith("META"): 1218 continue 1219 # Don't try to store userdata.img uncompressed, it's usually huge. 1220 if zinfo.filename.endswith("userdata.img"): 1221 continue 1222 if zinfo.compress_size > zinfo.file_size * 0.80 and zinfo.compress_type != zipfile.ZIP_STORED: 1223 entries_to_store.append(zinfo) 1224 zfp.extract(zinfo, tmpdir) 1225 if len(entries_to_store) == 0: 1226 return 1227 # Remove these entries, then re-add them as ZIP_STORED 1228 ZipDelete(zipfile_path, [entry.filename for entry in entries_to_store]) 1229 with zipfile.ZipFile(zipfile_path, "a", allowZip64=True) as zfp: 1230 for entry in entries_to_store: 1231 zfp.write(os.path.join(tmpdir, entry.filename), 1232 entry.filename, compress_type=zipfile.ZIP_STORED) 1233 1234 1235def main(argv): 1236 def option_handler(o, a): 1237 if o in ("-a", "--add_missing"): 1238 OPTIONS.add_missing = True 1239 elif o in ("-r", "--rebuild_recovery",): 1240 OPTIONS.rebuild_recovery = True 1241 elif o == "--replace_verity_private_key": 1242 raise ValueError("--replace_verity_private_key is no longer supported," 1243 " please switch to AVB") 1244 elif o == "--replace_verity_public_key": 1245 raise ValueError("--replace_verity_public_key is no longer supported," 1246 " please switch to AVB") 1247 elif o == "--is_signing": 1248 OPTIONS.is_signing = True 1249 else: 1250 return False 1251 return True 1252 1253 args = common.ParseOptions( 1254 argv, __doc__, extra_opts="ar", 1255 extra_long_opts=["add_missing", "rebuild_recovery", 1256 "replace_verity_public_key=", 1257 "replace_verity_private_key=", 1258 "is_signing"], 1259 extra_option_handler=option_handler) 1260 1261 if len(args) != 1: 1262 common.Usage(__doc__) 1263 sys.exit(1) 1264 1265 common.InitLogging() 1266 1267 AddImagesToTargetFiles(args[0]) 1268 OptimizeCompressedEntries(args[0]) 1269 logger.info("done.") 1270 1271 1272if __name__ == '__main__': 1273 try: 1274 common.CloseInheritedPipes() 1275 main(sys.argv[1:]) 1276 finally: 1277 common.Cleanup() 1278