1#!/usr/bin/env python 2# 3# Copyright (C) 2014 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17""" 18Given a target-files zipfile that does not contain images (ie, does 19not have an IMAGES/ top-level subdirectory), produce the images and 20add them to the zipfile. 21 22Usage: add_img_to_target_files [flag] target_files 23 24 -a (--add_missing) 25 Build and add missing images to "IMAGES/". If this option is 26 not specified, this script will simply exit when "IMAGES/" 27 directory exists in the target file. 28 29 -r (--rebuild_recovery) 30 Rebuild the recovery patch and write it to the system image. Only 31 meaningful when system image needs to be rebuilt and there're separate 32 boot / recovery images. 33 34 --replace_verity_private_key 35 Replace the private key used for verity signing. (same as the option 36 in sign_target_files_apks) 37 38 --replace_verity_public_key 39 Replace the certificate (public key) used for verity verification. (same 40 as the option in sign_target_files_apks) 41 42 --is_signing 43 Skip building & adding the images for "userdata" and "cache" if we 44 are signing the target files. 45""" 46 47from __future__ import print_function 48 49import datetime 50import logging 51import os 52import shlex 53import shutil 54import stat 55import sys 56import uuid 57import tempfile 58import zipfile 59 60import build_image 61import build_super_image 62import common 63import verity_utils 64import ota_metadata_pb2 65 66from apex_utils import GetApexInfoFromTargetFiles 67from common import AddCareMapForAbOta, ZipDelete 68 69if sys.hexversion < 0x02070000: 70 print("Python 2.7 or newer is required.", file=sys.stderr) 71 sys.exit(1) 72 73logger = logging.getLogger(__name__) 74 75OPTIONS = common.OPTIONS 76OPTIONS.add_missing = False 77OPTIONS.rebuild_recovery = False 78OPTIONS.replace_updated_files_list = [] 79OPTIONS.replace_verity_public_key = False 80OPTIONS.replace_verity_private_key = False 81OPTIONS.is_signing = False 82 83# Use a fixed timestamp (01/01/2009 00:00:00 UTC) for files when packaging 84# images. (b/24377993, b/80600931) 85FIXED_FILE_TIMESTAMP = int(( 86 datetime.datetime(2009, 1, 1, 0, 0, 0, 0, None) - 87 datetime.datetime.utcfromtimestamp(0)).total_seconds()) 88 89 90class OutputFile(object): 91 """A helper class to write a generated file to the given dir or zip. 92 93 When generating images, we want the outputs to go into the given zip file, or 94 the given dir. 95 96 Attributes: 97 name: The name of the output file, regardless of the final destination. 98 """ 99 100 def __init__(self, output_zip, input_dir, *args): 101 # We write the intermediate output file under the given input_dir, even if 102 # the final destination is a zip archive. 103 self.name = os.path.join(input_dir, *args) 104 self._output_zip = output_zip 105 if self._output_zip: 106 self._zip_name = os.path.join(*args) 107 108 def Write(self, compress_type=None): 109 if self._output_zip: 110 common.ZipWrite(self._output_zip, self.name, 111 self._zip_name, compress_type=compress_type) 112 113 114def AddSystem(output_zip, recovery_img=None, boot_img=None): 115 """Turn the contents of SYSTEM into a system image and store it in 116 output_zip. Returns the name of the system image file.""" 117 118 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "system.img") 119 if os.path.exists(img.name): 120 logger.info("system.img already exists; no need to rebuild...") 121 return img.name 122 123 def output_sink(fn, data): 124 output_file = os.path.join(OPTIONS.input_tmp, "SYSTEM", fn) 125 with open(output_file, "wb") as ofile: 126 ofile.write(data) 127 128 if output_zip: 129 arc_name = "SYSTEM/" + fn 130 if arc_name in output_zip.namelist(): 131 OPTIONS.replace_updated_files_list.append(arc_name) 132 else: 133 common.ZipWrite(output_zip, output_file, arc_name) 134 135 board_uses_vendorimage = OPTIONS.info_dict.get( 136 "board_uses_vendorimage") == "true" 137 138 if (OPTIONS.rebuild_recovery and not board_uses_vendorimage and 139 recovery_img is not None and boot_img is not None): 140 logger.info("Building new recovery patch on system at system/vendor") 141 common.MakeRecoveryPatch(OPTIONS.input_tmp, output_sink, recovery_img, 142 boot_img, info_dict=OPTIONS.info_dict) 143 144 block_list = OutputFile(output_zip, OPTIONS.input_tmp, 145 "IMAGES", "system.map") 146 CreateImage(OPTIONS.input_tmp, OPTIONS.info_dict, "system", img, 147 block_list=block_list) 148 return img.name 149 150 151def AddSystemOther(output_zip): 152 """Turn the contents of SYSTEM_OTHER into a system_other image 153 and store it in output_zip.""" 154 155 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "system_other.img") 156 if os.path.exists(img.name): 157 logger.info("system_other.img already exists; no need to rebuild...") 158 return 159 160 CreateImage(OPTIONS.input_tmp, OPTIONS.info_dict, "system_other", img) 161 162 163def AddVendor(output_zip, recovery_img=None, boot_img=None): 164 """Turn the contents of VENDOR into a vendor image and store in it 165 output_zip.""" 166 167 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "vendor.img") 168 if os.path.exists(img.name): 169 logger.info("vendor.img already exists; no need to rebuild...") 170 return img.name 171 172 def output_sink(fn, data): 173 output_file = os.path.join(OPTIONS.input_tmp, "VENDOR", fn) 174 with open(output_file, "wb") as ofile: 175 ofile.write(data) 176 177 if output_zip: 178 arc_name = "VENDOR/" + fn 179 if arc_name in output_zip.namelist(): 180 OPTIONS.replace_updated_files_list.append(arc_name) 181 else: 182 common.ZipWrite(output_zip, output_file, arc_name) 183 184 board_uses_vendorimage = OPTIONS.info_dict.get( 185 "board_uses_vendorimage") == "true" 186 187 if (OPTIONS.rebuild_recovery and board_uses_vendorimage and 188 recovery_img is not None and boot_img is not None): 189 logger.info("Building new recovery patch on vendor") 190 common.MakeRecoveryPatch(OPTIONS.input_tmp, output_sink, recovery_img, 191 boot_img, info_dict=OPTIONS.info_dict) 192 193 block_list = OutputFile(output_zip, OPTIONS.input_tmp, 194 "IMAGES", "vendor.map") 195 CreateImage(OPTIONS.input_tmp, OPTIONS.info_dict, "vendor", img, 196 block_list=block_list) 197 return img.name 198 199 200def AddProduct(output_zip): 201 """Turn the contents of PRODUCT into a product image and store it in 202 output_zip.""" 203 204 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "product.img") 205 if os.path.exists(img.name): 206 logger.info("product.img already exists; no need to rebuild...") 207 return img.name 208 209 block_list = OutputFile( 210 output_zip, OPTIONS.input_tmp, "IMAGES", "product.map") 211 CreateImage( 212 OPTIONS.input_tmp, OPTIONS.info_dict, "product", img, 213 block_list=block_list) 214 return img.name 215 216 217def AddSystemExt(output_zip): 218 """Turn the contents of SYSTEM_EXT into a system_ext image and store it in 219 output_zip.""" 220 221 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", 222 "system_ext.img") 223 if os.path.exists(img.name): 224 logger.info("system_ext.img already exists; no need to rebuild...") 225 return img.name 226 227 block_list = OutputFile( 228 output_zip, OPTIONS.input_tmp, "IMAGES", "system_ext.map") 229 CreateImage( 230 OPTIONS.input_tmp, OPTIONS.info_dict, "system_ext", img, 231 block_list=block_list) 232 return img.name 233 234 235def AddOdm(output_zip): 236 """Turn the contents of ODM into an odm image and store it in output_zip.""" 237 238 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "odm.img") 239 if os.path.exists(img.name): 240 logger.info("odm.img already exists; no need to rebuild...") 241 return img.name 242 243 block_list = OutputFile( 244 output_zip, OPTIONS.input_tmp, "IMAGES", "odm.map") 245 CreateImage( 246 OPTIONS.input_tmp, OPTIONS.info_dict, "odm", img, 247 block_list=block_list) 248 return img.name 249 250 251def AddVendorDlkm(output_zip): 252 """Turn the contents of VENDOR_DLKM into an vendor_dlkm image and store it in output_zip.""" 253 254 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "vendor_dlkm.img") 255 if os.path.exists(img.name): 256 logger.info("vendor_dlkm.img already exists; no need to rebuild...") 257 return img.name 258 259 block_list = OutputFile( 260 output_zip, OPTIONS.input_tmp, "IMAGES", "vendor_dlkm.map") 261 CreateImage( 262 OPTIONS.input_tmp, OPTIONS.info_dict, "vendor_dlkm", img, 263 block_list=block_list) 264 return img.name 265 266 267def AddOdmDlkm(output_zip): 268 """Turn the contents of OdmDlkm into an odm_dlkm image and store it in output_zip.""" 269 270 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "odm_dlkm.img") 271 if os.path.exists(img.name): 272 logger.info("odm_dlkm.img already exists; no need to rebuild...") 273 return img.name 274 275 block_list = OutputFile( 276 output_zip, OPTIONS.input_tmp, "IMAGES", "odm_dlkm.map") 277 CreateImage( 278 OPTIONS.input_tmp, OPTIONS.info_dict, "odm_dlkm", img, 279 block_list=block_list) 280 return img.name 281 282def AddSystemDlkm(output_zip): 283 """Turn the contents of SystemDlkm into an system_dlkm image and store it in output_zip.""" 284 285 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "system_dlkm.img") 286 if os.path.exists(img.name): 287 logger.info("system_dlkm.img already exists; no need to rebuild...") 288 return img.name 289 290 block_list = OutputFile( 291 output_zip, OPTIONS.input_tmp, "IMAGES", "system_dlkm.map") 292 CreateImage( 293 OPTIONS.input_tmp, OPTIONS.info_dict, "system_dlkm", img, 294 block_list=block_list) 295 return img.name 296 297 298def AddDtbo(output_zip): 299 """Adds the DTBO image. 300 301 Uses the image under IMAGES/ if it already exists. Otherwise looks for the 302 image under PREBUILT_IMAGES/, signs it as needed, and returns the image name. 303 """ 304 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "dtbo.img") 305 if os.path.exists(img.name): 306 logger.info("dtbo.img already exists; no need to rebuild...") 307 return img.name 308 309 dtbo_prebuilt_path = os.path.join( 310 OPTIONS.input_tmp, "PREBUILT_IMAGES", "dtbo.img") 311 assert os.path.exists(dtbo_prebuilt_path) 312 shutil.copy(dtbo_prebuilt_path, img.name) 313 314 # AVB-sign the image as needed. 315 if OPTIONS.info_dict.get("avb_enable") == "true": 316 # Signing requires +w 317 os.chmod(img.name, os.stat(img.name).st_mode | stat.S_IWUSR) 318 319 avbtool = OPTIONS.info_dict["avb_avbtool"] 320 part_size = OPTIONS.info_dict["dtbo_size"] 321 # The AVB hash footer will be replaced if already present. 322 cmd = [avbtool, "add_hash_footer", "--image", img.name, 323 "--partition_size", str(part_size), "--partition_name", "dtbo"] 324 common.AppendAVBSigningArgs(cmd, "dtbo") 325 args = OPTIONS.info_dict.get("avb_dtbo_add_hash_footer_args") 326 if args and args.strip(): 327 cmd.extend(shlex.split(args)) 328 common.RunAndCheckOutput(cmd) 329 330 img.Write() 331 return img.name 332 333 334def AddPvmfw(output_zip): 335 """Adds the pvmfw image. 336 337 Uses the image under IMAGES/ if it already exists. Otherwise looks for the 338 image under PREBUILT_IMAGES/, signs it as needed, and returns the image name. 339 """ 340 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "pvmfw.img") 341 if os.path.exists(img.name): 342 logger.info("pvmfw.img already exists; no need to rebuild...") 343 return img.name 344 345 pvmfw_prebuilt_path = os.path.join( 346 OPTIONS.input_tmp, "PREBUILT_IMAGES", "pvmfw.img") 347 assert os.path.exists(pvmfw_prebuilt_path) 348 shutil.copy(pvmfw_prebuilt_path, img.name) 349 350 # AVB-sign the image as needed. 351 if OPTIONS.info_dict.get("avb_enable") == "true": 352 # Signing requires +w 353 os.chmod(img.name, os.stat(img.name).st_mode | stat.S_IWUSR) 354 355 avbtool = OPTIONS.info_dict["avb_avbtool"] 356 part_size = OPTIONS.info_dict["pvmfw_size"] 357 # The AVB hash footer will be replaced if already present. 358 cmd = [avbtool, "add_hash_footer", "--image", img.name, 359 "--partition_size", str(part_size), "--partition_name", "pvmfw"] 360 common.AppendAVBSigningArgs(cmd, "pvmfw") 361 args = OPTIONS.info_dict.get("avb_pvmfw_add_hash_footer_args") 362 if args and args.strip(): 363 cmd.extend(shlex.split(args)) 364 common.RunAndCheckOutput(cmd) 365 366 img.Write() 367 return img.name 368 369 370def AddCustomImages(output_zip, partition_name): 371 """Adds and signs custom images in IMAGES/. 372 373 Args: 374 output_zip: The output zip file (needs to be already open), or None to 375 write images to OPTIONS.input_tmp/. 376 377 Uses the image under IMAGES/ if it already exists. Otherwise looks for the 378 image under PREBUILT_IMAGES/, signs it as needed, and returns the image name. 379 380 Raises: 381 AssertionError: If image can't be found. 382 """ 383 384 key_path = OPTIONS.info_dict.get("avb_{}_key_path".format(partition_name)) 385 algorithm = OPTIONS.info_dict.get("avb_{}_algorithm".format(partition_name)) 386 extra_args = OPTIONS.info_dict.get( 387 "avb_{}_add_hashtree_footer_args".format(partition_name)) 388 partition_size = OPTIONS.info_dict.get( 389 "avb_{}_partition_size".format(partition_name)) 390 391 builder = verity_utils.CreateCustomImageBuilder( 392 OPTIONS.info_dict, partition_name, partition_size, 393 key_path, algorithm, extra_args) 394 395 for img_name in OPTIONS.info_dict.get( 396 "avb_{}_image_list".format(partition_name)).split(): 397 custom_image = OutputFile( 398 output_zip, OPTIONS.input_tmp, "IMAGES", img_name) 399 if os.path.exists(custom_image.name): 400 continue 401 402 custom_image_prebuilt_path = os.path.join( 403 OPTIONS.input_tmp, "PREBUILT_IMAGES", img_name) 404 assert os.path.exists(custom_image_prebuilt_path), \ 405 "Failed to find %s at %s" % (img_name, custom_image_prebuilt_path) 406 407 shutil.copy(custom_image_prebuilt_path, custom_image.name) 408 409 if builder is not None: 410 builder.Build(custom_image.name) 411 412 custom_image.Write() 413 414 default = os.path.join(OPTIONS.input_tmp, "IMAGES", partition_name + ".img") 415 assert os.path.exists(default), \ 416 "There should be one %s.img" % (partition_name) 417 return default 418 419 420def CreateImage(input_dir, info_dict, what, output_file, block_list=None): 421 logger.info("creating %s.img...", what) 422 423 image_props = build_image.ImagePropFromGlobalDict(info_dict, what) 424 image_props["timestamp"] = FIXED_FILE_TIMESTAMP 425 426 if what == "system": 427 fs_config_prefix = "" 428 else: 429 fs_config_prefix = what + "_" 430 431 fs_config = os.path.join( 432 input_dir, "META/" + fs_config_prefix + "filesystem_config.txt") 433 if not os.path.exists(fs_config): 434 fs_config = None 435 436 # Override values loaded from info_dict. 437 if fs_config: 438 image_props["fs_config"] = fs_config 439 if block_list: 440 image_props["block_list"] = block_list.name 441 442 # Use repeatable ext4 FS UUID and hash_seed UUID (based on partition name and 443 # build fingerprint). Also use the legacy build id, because the vbmeta digest 444 # isn't available at this point. 445 build_info = common.BuildInfo(info_dict, use_legacy_id=True) 446 uuid_seed = what + "-" + build_info.GetPartitionFingerprint(what) 447 image_props["uuid"] = str(uuid.uuid5(uuid.NAMESPACE_URL, uuid_seed)) 448 hash_seed = "hash_seed-" + uuid_seed 449 image_props["hash_seed"] = str(uuid.uuid5(uuid.NAMESPACE_URL, hash_seed)) 450 451 build_image.BuildImage( 452 os.path.join(input_dir, what.upper()), image_props, output_file.name) 453 454 output_file.Write() 455 if block_list: 456 block_list.Write() 457 458 # Set the '_image_size' for given image size. 459 is_verity_partition = "verity_block_device" in image_props 460 verity_supported = (image_props.get("verity") == "true" or 461 image_props.get("avb_enable") == "true") 462 is_avb_enable = image_props.get("avb_hashtree_enable") == "true" 463 if verity_supported and (is_verity_partition or is_avb_enable): 464 image_size = image_props.get("image_size") 465 if image_size: 466 image_size_key = what + "_image_size" 467 info_dict[image_size_key] = int(image_size) 468 469 use_dynamic_size = ( 470 info_dict.get("use_dynamic_partition_size") == "true" and 471 what in shlex.split(info_dict.get("dynamic_partition_list", "").strip())) 472 if use_dynamic_size: 473 info_dict.update(build_image.GlobalDictFromImageProp(image_props, what)) 474 475 476def AddUserdata(output_zip): 477 """Create a userdata image and store it in output_zip. 478 479 In most case we just create and store an empty userdata.img; 480 But the invoker can also request to create userdata.img with real 481 data from the target files, by setting "userdata_img_with_data=true" 482 in OPTIONS.info_dict. 483 """ 484 485 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "userdata.img") 486 if os.path.exists(img.name): 487 logger.info("userdata.img already exists; no need to rebuild...") 488 return 489 490 # Skip userdata.img if no size. 491 image_props = build_image.ImagePropFromGlobalDict(OPTIONS.info_dict, "data") 492 if not image_props.get("partition_size"): 493 return 494 495 logger.info("creating userdata.img...") 496 497 image_props["timestamp"] = FIXED_FILE_TIMESTAMP 498 499 if OPTIONS.info_dict.get("userdata_img_with_data") == "true": 500 user_dir = os.path.join(OPTIONS.input_tmp, "DATA") 501 else: 502 user_dir = common.MakeTempDir() 503 504 build_image.BuildImage(user_dir, image_props, img.name) 505 506 common.CheckSize(img.name, "userdata.img", OPTIONS.info_dict) 507 # Always use compression for useradata image. 508 # As it's likely huge and consist of lots of 0s. 509 img.Write(zipfile.ZIP_DEFLATED) 510 511 512def AddVBMeta(output_zip, partitions, name, needed_partitions): 513 """Creates a VBMeta image and stores it in output_zip. 514 515 It generates the requested VBMeta image. The requested image could be for 516 top-level or chained VBMeta image, which is determined based on the name. 517 518 Args: 519 output_zip: The output zip file, which needs to be already open. 520 partitions: A dict that's keyed by partition names with image paths as 521 values. Only valid partition names are accepted, as partitions listed 522 in common.AVB_PARTITIONS and custom partitions listed in 523 OPTIONS.info_dict.get("avb_custom_images_partition_list") 524 name: Name of the VBMeta partition, e.g. 'vbmeta', 'vbmeta_system'. 525 needed_partitions: Partitions whose descriptors should be included into the 526 generated VBMeta image. 527 528 Returns: 529 Path to the created image. 530 531 Raises: 532 AssertionError: On invalid input args. 533 """ 534 assert needed_partitions, "Needed partitions must be specified" 535 536 img = OutputFile( 537 output_zip, OPTIONS.input_tmp, "IMAGES", "{}.img".format(name)) 538 if os.path.exists(img.name): 539 logger.info("%s.img already exists; not rebuilding...", name) 540 return img.name 541 542 common.BuildVBMeta(img.name, partitions, name, needed_partitions) 543 img.Write() 544 return img.name 545 546 547def AddPartitionTable(output_zip): 548 """Create a partition table image and store it in output_zip.""" 549 550 img = OutputFile( 551 output_zip, OPTIONS.input_tmp, "IMAGES", "partition-table.img") 552 bpt = OutputFile( 553 output_zip, OPTIONS.input_tmp, "META", "partition-table.bpt") 554 555 # use BPTTOOL from environ, or "bpttool" if empty or not set. 556 bpttool = os.getenv("BPTTOOL") or "bpttool" 557 cmd = [bpttool, "make_table", "--output_json", bpt.name, 558 "--output_gpt", img.name] 559 input_files_str = OPTIONS.info_dict["board_bpt_input_files"] 560 input_files = input_files_str.split(" ") 561 for i in input_files: 562 cmd.extend(["--input", i]) 563 disk_size = OPTIONS.info_dict.get("board_bpt_disk_size") 564 if disk_size: 565 cmd.extend(["--disk_size", disk_size]) 566 args = OPTIONS.info_dict.get("board_bpt_make_table_args") 567 if args: 568 cmd.extend(shlex.split(args)) 569 common.RunAndCheckOutput(cmd) 570 571 img.Write() 572 bpt.Write() 573 574 575def AddCache(output_zip): 576 """Create an empty cache image and store it in output_zip.""" 577 578 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "cache.img") 579 if os.path.exists(img.name): 580 logger.info("cache.img already exists; no need to rebuild...") 581 return 582 583 image_props = build_image.ImagePropFromGlobalDict(OPTIONS.info_dict, "cache") 584 # The build system has to explicitly request for cache.img. 585 if "fs_type" not in image_props: 586 return 587 588 logger.info("creating cache.img...") 589 590 image_props["timestamp"] = FIXED_FILE_TIMESTAMP 591 592 user_dir = common.MakeTempDir() 593 build_image.BuildImage(user_dir, image_props, img.name) 594 595 common.CheckSize(img.name, "cache.img", OPTIONS.info_dict) 596 img.Write() 597 598 599def CheckAbOtaImages(output_zip, ab_partitions): 600 """Checks that all the listed A/B partitions have their images available. 601 602 The images need to be available under IMAGES/ or RADIO/, with the former takes 603 a priority. 604 605 Args: 606 output_zip: The output zip file (needs to be already open), or None to 607 find images in OPTIONS.input_tmp/. 608 ab_partitions: The list of A/B partitions. 609 610 Raises: 611 AssertionError: If it can't find an image. 612 """ 613 for partition in ab_partitions: 614 img_name = partition + ".img" 615 616 # Assert that the image is present under IMAGES/ now. 617 if output_zip: 618 # Zip spec says: All slashes MUST be forward slashes. 619 images_path = "IMAGES/" + img_name 620 radio_path = "RADIO/" + img_name 621 available = (images_path in output_zip.namelist() or 622 radio_path in output_zip.namelist()) 623 else: 624 images_path = os.path.join(OPTIONS.input_tmp, "IMAGES", img_name) 625 radio_path = os.path.join(OPTIONS.input_tmp, "RADIO", img_name) 626 available = os.path.exists(images_path) or os.path.exists(radio_path) 627 628 assert available, "Failed to find " + img_name 629 630 631def AddPackRadioImages(output_zip, images): 632 """Copies images listed in META/pack_radioimages.txt from RADIO/ to IMAGES/. 633 634 Args: 635 output_zip: The output zip file (needs to be already open), or None to 636 write images to OPTIONS.input_tmp/. 637 images: A list of image names. 638 639 Raises: 640 AssertionError: If a listed image can't be found. 641 """ 642 for image in images: 643 img_name = image.strip() 644 _, ext = os.path.splitext(img_name) 645 if not ext: 646 img_name += ".img" 647 648 prebuilt_path = os.path.join(OPTIONS.input_tmp, "IMAGES", img_name) 649 if os.path.exists(prebuilt_path): 650 logger.info("%s already exists, no need to overwrite...", img_name) 651 continue 652 653 img_radio_path = os.path.join(OPTIONS.input_tmp, "RADIO", img_name) 654 assert os.path.exists(img_radio_path), \ 655 "Failed to find %s at %s" % (img_name, img_radio_path) 656 657 if output_zip: 658 common.ZipWrite(output_zip, img_radio_path, "IMAGES/" + img_name) 659 else: 660 shutil.copy(img_radio_path, prebuilt_path) 661 662 663def AddSuperEmpty(output_zip): 664 """Create a super_empty.img and store it in output_zip.""" 665 666 img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "super_empty.img") 667 build_super_image.BuildSuperImage(OPTIONS.info_dict, img.name) 668 img.Write() 669 670 671def AddSuperSplit(output_zip): 672 """Create split super_*.img and store it in output_zip.""" 673 674 outdir = os.path.join(OPTIONS.input_tmp, "OTA") 675 built = build_super_image.BuildSuperImage(OPTIONS.input_tmp, outdir) 676 677 if built: 678 for dev in OPTIONS.info_dict['super_block_devices'].strip().split(): 679 img = OutputFile(output_zip, OPTIONS.input_tmp, "OTA", 680 "super_" + dev + ".img") 681 img.Write() 682 683 684def ReplaceUpdatedFiles(zip_filename, files_list): 685 """Updates all the ZIP entries listed in files_list. 686 687 For now the list includes META/care_map.pb, and the related files under 688 SYSTEM/ after rebuilding recovery. 689 """ 690 common.ZipDelete(zip_filename, files_list) 691 output_zip = zipfile.ZipFile(zip_filename, "a", 692 compression=zipfile.ZIP_DEFLATED, 693 allowZip64=True) 694 for item in files_list: 695 file_path = os.path.join(OPTIONS.input_tmp, item) 696 assert os.path.exists(file_path) 697 common.ZipWrite(output_zip, file_path, arcname=item) 698 common.ZipClose(output_zip) 699 700 701def HasPartition(partition_name): 702 """Determines if the target files archive should build a given partition.""" 703 704 return ((os.path.isdir( 705 os.path.join(OPTIONS.input_tmp, partition_name.upper())) and 706 OPTIONS.info_dict.get( 707 "building_{}_image".format(partition_name)) == "true") or 708 os.path.exists( 709 os.path.join(OPTIONS.input_tmp, "IMAGES", 710 "{}.img".format(partition_name)))) 711 712 713def AddApexInfo(output_zip): 714 apex_infos = GetApexInfoFromTargetFiles(OPTIONS.input_tmp, 'system', 715 compressed_only=False) 716 apex_metadata_proto = ota_metadata_pb2.ApexMetadata() 717 apex_metadata_proto.apex_info.extend(apex_infos) 718 apex_info_bytes = apex_metadata_proto.SerializeToString() 719 720 output_file = os.path.join(OPTIONS.input_tmp, "META", "apex_info.pb") 721 with open(output_file, "wb") as ofile: 722 ofile.write(apex_info_bytes) 723 if output_zip: 724 arc_name = "META/apex_info.pb" 725 if arc_name in output_zip.namelist(): 726 OPTIONS.replace_updated_files_list.append(arc_name) 727 else: 728 common.ZipWrite(output_zip, output_file, arc_name) 729 730 731def AddVbmetaDigest(output_zip): 732 """Write the vbmeta digest to the output dir and zipfile.""" 733 734 # Calculate the vbmeta digest and put the result in to META/ 735 boot_images = OPTIONS.info_dict.get("boot_images") 736 # Disable the digest calculation if the target_file is used as a container 737 # for boot images. A boot container might contain boot-5.4.img, boot-5.10.img 738 # etc., instead of just a boot.img and will fail in vbmeta digest calculation. 739 boot_container = boot_images and ( 740 len(boot_images.split()) >= 2 or boot_images.split()[0] != 'boot.img') 741 if (OPTIONS.info_dict.get("avb_enable") == "true" and not boot_container and 742 OPTIONS.info_dict.get("avb_building_vbmeta_image") == "true"): 743 avbtool = OPTIONS.info_dict["avb_avbtool"] 744 digest = verity_utils.CalculateVbmetaDigest(OPTIONS.input_tmp, avbtool) 745 vbmeta_digest_txt = os.path.join(OPTIONS.input_tmp, "META", 746 "vbmeta_digest.txt") 747 with open(vbmeta_digest_txt, 'w') as f: 748 f.write(digest) 749 # writes to the output zipfile 750 if output_zip: 751 arc_name = "META/vbmeta_digest.txt" 752 if arc_name in output_zip.namelist(): 753 OPTIONS.replace_updated_files_list.append(arc_name) 754 else: 755 common.ZipWriteStr(output_zip, arc_name, digest) 756 757 758def AddImagesToTargetFiles(filename): 759 """Creates and adds images (boot/recovery/system/...) to a target_files.zip. 760 761 It works with either a zip file (zip mode), or a directory that contains the 762 files to be packed into a target_files.zip (dir mode). The latter is used when 763 being called from build/make/core/Makefile. 764 765 The images will be created under IMAGES/ in the input target_files.zip. 766 767 Args: 768 filename: the target_files.zip, or the zip root directory. 769 """ 770 if os.path.isdir(filename): 771 OPTIONS.input_tmp = os.path.abspath(filename) 772 else: 773 OPTIONS.input_tmp = common.UnzipTemp(filename) 774 775 if not OPTIONS.add_missing: 776 if os.path.isdir(os.path.join(OPTIONS.input_tmp, "IMAGES")): 777 logger.warning("target_files appears to already contain images.") 778 sys.exit(1) 779 780 OPTIONS.info_dict = common.LoadInfoDict(OPTIONS.input_tmp, repacking=True) 781 782 has_recovery = OPTIONS.info_dict.get("no_recovery") != "true" 783 has_boot = OPTIONS.info_dict.get("no_boot") != "true" 784 has_init_boot = OPTIONS.info_dict.get("init_boot") == "true" 785 has_vendor_boot = OPTIONS.info_dict.get("vendor_boot") == "true" 786 has_vendor_kernel_boot = OPTIONS.info_dict.get("vendor_kernel_boot") == "true" 787 788 # {vendor,odm,product,system_ext,vendor_dlkm,odm_dlkm, system_dlkm, system, system_other}.img 789 # can be built from source, or dropped into target_files.zip as a prebuilt blob. 790 has_vendor = HasPartition("vendor") 791 has_odm = HasPartition("odm") 792 has_vendor_dlkm = HasPartition("vendor_dlkm") 793 has_odm_dlkm = HasPartition("odm_dlkm") 794 has_system_dlkm = HasPartition("system_dlkm") 795 has_product = HasPartition("product") 796 has_system_ext = HasPartition("system_ext") 797 has_system = HasPartition("system") 798 has_system_other = HasPartition("system_other") 799 has_userdata = OPTIONS.info_dict.get("building_userdata_image") == "true" 800 has_cache = OPTIONS.info_dict.get("building_cache_image") == "true" 801 802 # Set up the output destination. It writes to the given directory for dir 803 # mode; otherwise appends to the given ZIP. 804 if os.path.isdir(filename): 805 output_zip = None 806 else: 807 output_zip = zipfile.ZipFile(filename, "a", 808 compression=zipfile.ZIP_DEFLATED, 809 allowZip64=True) 810 811 # Always make input_tmp/IMAGES available, since we may stage boot / recovery 812 # images there even under zip mode. The directory will be cleaned up as part 813 # of OPTIONS.input_tmp. 814 images_dir = os.path.join(OPTIONS.input_tmp, "IMAGES") 815 if not os.path.isdir(images_dir): 816 os.makedirs(images_dir) 817 818 # A map between partition names and their paths, which could be used when 819 # generating AVB vbmeta image. 820 partitions = {} 821 822 def banner(s): 823 logger.info("\n\n++++ %s ++++\n\n", s) 824 825 boot_image = None 826 if has_boot: 827 banner("boot") 828 boot_images = OPTIONS.info_dict.get("boot_images") 829 if boot_images is None: 830 boot_images = "boot.img" 831 for index, b in enumerate(boot_images.split()): 832 # common.GetBootableImage() returns the image directly if present. 833 boot_image = common.GetBootableImage( 834 "IMAGES/" + b, b, OPTIONS.input_tmp, "BOOT") 835 # boot.img may be unavailable in some targets (e.g. aosp_arm64). 836 if boot_image: 837 boot_image_path = os.path.join(OPTIONS.input_tmp, "IMAGES", b) 838 # Although multiple boot images can be generated, include the image 839 # descriptor of only the first boot image in vbmeta 840 if index == 0: 841 partitions['boot'] = boot_image_path 842 if not os.path.exists(boot_image_path): 843 boot_image.WriteToDir(OPTIONS.input_tmp) 844 if output_zip: 845 boot_image.AddToZip(output_zip) 846 847 if has_init_boot: 848 banner("init_boot") 849 init_boot_image = common.GetBootableImage( 850 "IMAGES/init_boot.img", "init_boot.img", OPTIONS.input_tmp, "INIT_BOOT") 851 if init_boot_image: 852 partitions['init_boot'] = os.path.join( 853 OPTIONS.input_tmp, "IMAGES", "init_boot.img") 854 if not os.path.exists(partitions['init_boot']): 855 init_boot_image.WriteToDir(OPTIONS.input_tmp) 856 if output_zip: 857 init_boot_image.AddToZip(output_zip) 858 859 if has_vendor_boot: 860 banner("vendor_boot") 861 vendor_boot_image = common.GetVendorBootImage( 862 "IMAGES/vendor_boot.img", "vendor_boot.img", OPTIONS.input_tmp, 863 "VENDOR_BOOT") 864 if vendor_boot_image: 865 partitions['vendor_boot'] = os.path.join(OPTIONS.input_tmp, "IMAGES", 866 "vendor_boot.img") 867 if not os.path.exists(partitions['vendor_boot']): 868 vendor_boot_image.WriteToDir(OPTIONS.input_tmp) 869 if output_zip: 870 vendor_boot_image.AddToZip(output_zip) 871 872 if has_vendor_kernel_boot: 873 banner("vendor_kernel_boot") 874 vendor_kernel_boot_image = common.GetVendorKernelBootImage( 875 "IMAGES/vendor_kernel_boot.img", "vendor_kernel_boot.img", OPTIONS.input_tmp, 876 "VENDOR_KERNEL_BOOT") 877 if vendor_kernel_boot_image: 878 partitions['vendor_kernel_boot'] = os.path.join(OPTIONS.input_tmp, "IMAGES", 879 "vendor_kernel_boot.img") 880 if not os.path.exists(partitions['vendor_kernel_boot']): 881 vendor_kernel_boot_image.WriteToDir(OPTIONS.input_tmp) 882 if output_zip: 883 vendor_kernel_boot_image.AddToZip(output_zip) 884 885 recovery_image = None 886 if has_recovery: 887 banner("recovery") 888 recovery_image = common.GetBootableImage( 889 "IMAGES/recovery.img", "recovery.img", OPTIONS.input_tmp, "RECOVERY") 890 assert recovery_image, "Failed to create recovery.img." 891 partitions['recovery'] = os.path.join( 892 OPTIONS.input_tmp, "IMAGES", "recovery.img") 893 if not os.path.exists(partitions['recovery']): 894 recovery_image.WriteToDir(OPTIONS.input_tmp) 895 if output_zip: 896 recovery_image.AddToZip(output_zip) 897 898 banner("recovery (two-step image)") 899 # The special recovery.img for two-step package use. 900 recovery_two_step_image = common.GetBootableImage( 901 "OTA/recovery-two-step.img", "recovery-two-step.img", 902 OPTIONS.input_tmp, "RECOVERY", two_step_image=True) 903 assert recovery_two_step_image, "Failed to create recovery-two-step.img." 904 recovery_two_step_image_path = os.path.join( 905 OPTIONS.input_tmp, "OTA", "recovery-two-step.img") 906 if not os.path.exists(recovery_two_step_image_path): 907 recovery_two_step_image.WriteToDir(OPTIONS.input_tmp) 908 if output_zip: 909 recovery_two_step_image.AddToZip(output_zip) 910 911 def add_partition(partition, has_partition, add_func, add_args): 912 if has_partition: 913 banner(partition) 914 partitions[partition] = add_func(output_zip, *add_args) 915 916 add_partition_calls = ( 917 ("system", has_system, AddSystem, [recovery_image, boot_image]), 918 ("vendor", has_vendor, AddVendor, [recovery_image, boot_image]), 919 ("product", has_product, AddProduct, []), 920 ("system_ext", has_system_ext, AddSystemExt, []), 921 ("odm", has_odm, AddOdm, []), 922 ("vendor_dlkm", has_vendor_dlkm, AddVendorDlkm, []), 923 ("odm_dlkm", has_odm_dlkm, AddOdmDlkm, []), 924 ("system_dlkm", has_system_dlkm, AddSystemDlkm, []), 925 ("system_other", has_system_other, AddSystemOther, []), 926 ) 927 for call in add_partition_calls: 928 add_partition(*call) 929 930 AddApexInfo(output_zip) 931 932 if not OPTIONS.is_signing: 933 banner("userdata") 934 AddUserdata(output_zip) 935 banner("cache") 936 AddCache(output_zip) 937 938 if OPTIONS.info_dict.get("board_bpt_enable") == "true": 939 banner("partition-table") 940 AddPartitionTable(output_zip) 941 942 add_partition("dtbo", 943 OPTIONS.info_dict.get("has_dtbo") == "true", AddDtbo, []) 944 add_partition("pvmfw", 945 OPTIONS.info_dict.get("has_pvmfw") == "true", AddPvmfw, []) 946 947 # Custom images. 948 custom_partitions = OPTIONS.info_dict.get( 949 "avb_custom_images_partition_list", "").strip().split() 950 for partition_name in custom_partitions: 951 partition_name = partition_name.strip() 952 banner("custom images for " + partition_name) 953 partitions[partition_name] = AddCustomImages(output_zip, partition_name) 954 955 if OPTIONS.info_dict.get("avb_enable") == "true": 956 # vbmeta_partitions includes the partitions that should be included into 957 # top-level vbmeta.img, which are the ones that are not included in any 958 # chained VBMeta image plus the chained VBMeta images themselves. 959 # Currently custom_partitions are all chained to VBMeta image. 960 vbmeta_partitions = common.AVB_PARTITIONS[:] + tuple(custom_partitions) 961 962 vbmeta_system = OPTIONS.info_dict.get("avb_vbmeta_system", "").strip() 963 if vbmeta_system: 964 banner("vbmeta_system") 965 partitions["vbmeta_system"] = AddVBMeta( 966 output_zip, partitions, "vbmeta_system", vbmeta_system.split()) 967 vbmeta_partitions = [ 968 item for item in vbmeta_partitions 969 if item not in vbmeta_system.split()] 970 vbmeta_partitions.append("vbmeta_system") 971 972 vbmeta_vendor = OPTIONS.info_dict.get("avb_vbmeta_vendor", "").strip() 973 if vbmeta_vendor: 974 banner("vbmeta_vendor") 975 partitions["vbmeta_vendor"] = AddVBMeta( 976 output_zip, partitions, "vbmeta_vendor", vbmeta_vendor.split()) 977 vbmeta_partitions = [ 978 item for item in vbmeta_partitions 979 if item not in vbmeta_vendor.split()] 980 vbmeta_partitions.append("vbmeta_vendor") 981 982 if OPTIONS.info_dict.get("avb_building_vbmeta_image") == "true": 983 banner("vbmeta") 984 AddVBMeta(output_zip, partitions, "vbmeta", vbmeta_partitions) 985 986 if OPTIONS.info_dict.get("use_dynamic_partitions") == "true": 987 if OPTIONS.info_dict.get("build_super_empty_partition") == "true": 988 banner("super_empty") 989 AddSuperEmpty(output_zip) 990 991 if OPTIONS.info_dict.get("build_super_partition") == "true": 992 if OPTIONS.info_dict.get( 993 "build_retrofit_dynamic_partitions_ota_package") == "true": 994 banner("super split images") 995 AddSuperSplit(output_zip) 996 997 banner("radio") 998 ab_partitions_txt = os.path.join(OPTIONS.input_tmp, "META", 999 "ab_partitions.txt") 1000 if os.path.exists(ab_partitions_txt): 1001 with open(ab_partitions_txt) as f: 1002 ab_partitions = f.read().splitlines() 1003 1004 # For devices using A/B update, make sure we have all the needed images 1005 # ready under IMAGES/ or RADIO/. 1006 CheckAbOtaImages(output_zip, ab_partitions) 1007 1008 # Generate care_map.pb for ab_partitions, then write this file to 1009 # target_files package. 1010 output_care_map = os.path.join(OPTIONS.input_tmp, "META", "care_map.pb") 1011 AddCareMapForAbOta(output_zip if output_zip else output_care_map, 1012 ab_partitions, partitions) 1013 1014 # Radio images that need to be packed into IMAGES/, and product-img.zip. 1015 pack_radioimages_txt = os.path.join( 1016 OPTIONS.input_tmp, "META", "pack_radioimages.txt") 1017 if os.path.exists(pack_radioimages_txt): 1018 with open(pack_radioimages_txt) as f: 1019 AddPackRadioImages(output_zip, f.readlines()) 1020 1021 AddVbmetaDigest(output_zip) 1022 1023 if output_zip: 1024 common.ZipClose(output_zip) 1025 if OPTIONS.replace_updated_files_list: 1026 ReplaceUpdatedFiles(output_zip.filename, 1027 OPTIONS.replace_updated_files_list) 1028 1029 1030def OptimizeCompressedEntries(zipfile_path): 1031 """Convert files that do not compress well to uncompressed storage 1032 1033 EROFS images tend to be compressed already, so compressing them again 1034 yields little space savings. Leaving them uncompressed will make 1035 downstream tooling's job easier, and save compute time. 1036 """ 1037 if not zipfile.is_zipfile(zipfile_path): 1038 return 1039 entries_to_store = [] 1040 with tempfile.TemporaryDirectory() as tmpdir: 1041 with zipfile.ZipFile(zipfile_path, "r", allowZip64=True) as zfp: 1042 for zinfo in zfp.filelist: 1043 if not zinfo.filename.startswith("IMAGES/") and not zinfo.filename.startswith("META"): 1044 continue 1045 # Don't try to store userdata.img uncompressed, it's usually huge. 1046 if zinfo.filename.endswith("userdata.img"): 1047 continue 1048 if zinfo.compress_size > zinfo.file_size * 0.80 and zinfo.compress_type != zipfile.ZIP_STORED: 1049 entries_to_store.append(zinfo) 1050 zfp.extract(zinfo, tmpdir) 1051 if len(entries_to_store) == 0: 1052 return 1053 # Remove these entries, then re-add them as ZIP_STORED 1054 ZipDelete(zipfile_path, [entry.filename for entry in entries_to_store]) 1055 with zipfile.ZipFile(zipfile_path, "a", allowZip64=True) as zfp: 1056 for entry in entries_to_store: 1057 zfp.write(os.path.join(tmpdir, entry.filename), entry.filename, compress_type=zipfile.ZIP_STORED) 1058 1059 1060def main(argv): 1061 def option_handler(o, a): 1062 if o in ("-a", "--add_missing"): 1063 OPTIONS.add_missing = True 1064 elif o in ("-r", "--rebuild_recovery",): 1065 OPTIONS.rebuild_recovery = True 1066 elif o == "--replace_verity_private_key": 1067 OPTIONS.replace_verity_private_key = (True, a) 1068 elif o == "--replace_verity_public_key": 1069 OPTIONS.replace_verity_public_key = (True, a) 1070 elif o == "--is_signing": 1071 OPTIONS.is_signing = True 1072 else: 1073 return False 1074 return True 1075 1076 args = common.ParseOptions( 1077 argv, __doc__, extra_opts="ar", 1078 extra_long_opts=["add_missing", "rebuild_recovery", 1079 "replace_verity_public_key=", 1080 "replace_verity_private_key=", 1081 "is_signing"], 1082 extra_option_handler=option_handler) 1083 1084 if len(args) != 1: 1085 common.Usage(__doc__) 1086 sys.exit(1) 1087 1088 common.InitLogging() 1089 1090 AddImagesToTargetFiles(args[0]) 1091 OptimizeCompressedEntries(args[0]) 1092 logger.info("done.") 1093 1094 1095if __name__ == '__main__': 1096 try: 1097 common.CloseInheritedPipes() 1098 main(sys.argv[1:]) 1099 finally: 1100 common.Cleanup() 1101