1#!/usr/bin/env python 2# 3# Copyright (C) 2019 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18import os.path 19import re 20import shlex 21import shutil 22import zipfile 23 24import apex_manifest 25import common 26from common import UnzipTemp, RunAndCheckOutput, MakeTempFile, OPTIONS 27 28import ota_metadata_pb2 29 30 31logger = logging.getLogger(__name__) 32 33OPTIONS = common.OPTIONS 34 35APEX_PAYLOAD_IMAGE = 'apex_payload.img' 36 37APEX_PUBKEY = 'apex_pubkey' 38 39 40class ApexInfoError(Exception): 41 """An Exception raised during Apex Information command.""" 42 43 def __init__(self, message): 44 Exception.__init__(self, message) 45 46 47class ApexSigningError(Exception): 48 """An Exception raised during Apex Payload signing.""" 49 50 def __init__(self, message): 51 Exception.__init__(self, message) 52 53 54class ApexApkSigner(object): 55 """Class to sign the apk files in a apex payload image and repack the apex""" 56 57 def __init__(self, apex_path, key_passwords, codename_to_api_level_map): 58 self.apex_path = apex_path 59 if not key_passwords: 60 self.key_passwords = dict() 61 else: 62 self.key_passwords = key_passwords 63 self.codename_to_api_level_map = codename_to_api_level_map 64 self.debugfs_path = os.path.join( 65 OPTIONS.search_path, "bin", "debugfs_static") 66 67 def ProcessApexFile(self, apk_keys, payload_key, signing_args=None): 68 """Scans and signs the apk files and repack the apex 69 70 Args: 71 apk_keys: A dict that holds the signing keys for apk files. 72 73 Returns: 74 The repacked apex file containing the signed apk files. 75 """ 76 if not os.path.exists(self.debugfs_path): 77 raise ApexSigningError( 78 "Couldn't find location of debugfs_static: " + 79 "Path {} does not exist. ".format(self.debugfs_path) + 80 "Make sure bin/debugfs_static can be found in -p <path>") 81 list_cmd = ['deapexer', '--debugfs_path', 82 self.debugfs_path, 'list', self.apex_path] 83 entries_names = common.RunAndCheckOutput(list_cmd).split() 84 apk_entries = [name for name in entries_names if name.endswith('.apk')] 85 86 # No need to sign and repack, return the original apex path. 87 if not apk_entries: 88 logger.info('No apk file to sign in %s', self.apex_path) 89 return self.apex_path 90 91 for entry in apk_entries: 92 apk_name = os.path.basename(entry) 93 if apk_name not in apk_keys: 94 raise ApexSigningError('Failed to find signing keys for apk file {} in' 95 ' apex {}. Use "-e <apkname>=" to specify a key' 96 .format(entry, self.apex_path)) 97 if not any(dirname in entry for dirname in ['app/', 'priv-app/', 98 'overlay/']): 99 logger.warning('Apk path does not contain the intended directory name:' 100 ' %s', entry) 101 102 payload_dir, has_signed_apk = self.ExtractApexPayloadAndSignApks( 103 apk_entries, apk_keys) 104 if not has_signed_apk: 105 logger.info('No apk file has been signed in %s', self.apex_path) 106 return self.apex_path 107 108 return self.RepackApexPayload(payload_dir, payload_key, signing_args) 109 110 def ExtractApexPayloadAndSignApks(self, apk_entries, apk_keys): 111 """Extracts the payload image and signs the containing apk files.""" 112 if not os.path.exists(self.debugfs_path): 113 raise ApexSigningError( 114 "Couldn't find location of debugfs_static: " + 115 "Path {} does not exist. ".format(self.debugfs_path) + 116 "Make sure bin/debugfs_static can be found in -p <path>") 117 payload_dir = common.MakeTempDir() 118 extract_cmd = ['deapexer', '--debugfs_path', 119 self.debugfs_path, 'extract', self.apex_path, payload_dir] 120 common.RunAndCheckOutput(extract_cmd) 121 122 has_signed_apk = False 123 for entry in apk_entries: 124 apk_path = os.path.join(payload_dir, entry) 125 assert os.path.exists(self.apex_path) 126 127 key_name = apk_keys.get(os.path.basename(entry)) 128 if key_name in common.SPECIAL_CERT_STRINGS: 129 logger.info('Not signing: %s due to special cert string', apk_path) 130 continue 131 132 logger.info('Signing apk file %s in apex %s', apk_path, self.apex_path) 133 # Rename the unsigned apk and overwrite the original apk path with the 134 # signed apk file. 135 unsigned_apk = common.MakeTempFile() 136 os.rename(apk_path, unsigned_apk) 137 common.SignFile( 138 unsigned_apk, apk_path, key_name, self.key_passwords.get(key_name), 139 codename_to_api_level_map=self.codename_to_api_level_map) 140 has_signed_apk = True 141 return payload_dir, has_signed_apk 142 143 def RepackApexPayload(self, payload_dir, payload_key, signing_args=None): 144 """Rebuilds the apex file with the updated payload directory.""" 145 apex_dir = common.MakeTempDir() 146 # Extract the apex file and reuse its meta files as repack parameters. 147 common.UnzipToDir(self.apex_path, apex_dir) 148 arguments_dict = { 149 'manifest': os.path.join(apex_dir, 'apex_manifest.pb'), 150 'build_info': os.path.join(apex_dir, 'apex_build_info.pb'), 151 'key': payload_key, 152 } 153 for filename in arguments_dict.values(): 154 assert os.path.exists(filename), 'file {} not found'.format(filename) 155 156 # The repack process will add back these files later in the payload image. 157 for name in ['apex_manifest.pb', 'apex_manifest.json', 'lost+found']: 158 path = os.path.join(payload_dir, name) 159 if os.path.isfile(path): 160 os.remove(path) 161 elif os.path.isdir(path): 162 shutil.rmtree(path) 163 164 # TODO(xunchang) the signing process can be improved by using 165 # '--unsigned_payload_only'. But we need to parse the vbmeta earlier for 166 # the signing arguments, e.g. algorithm, salt, etc. 167 payload_img = os.path.join(apex_dir, APEX_PAYLOAD_IMAGE) 168 generate_image_cmd = ['apexer', '--force', '--payload_only', 169 '--do_not_check_keyname', '--apexer_tool_path', 170 os.getenv('PATH')] 171 for key, val in arguments_dict.items(): 172 generate_image_cmd.extend(['--' + key, val]) 173 174 # Add quote to the signing_args as we will pass 175 # --signing_args "--signing_helper_with_files=%path" to apexer 176 if signing_args: 177 generate_image_cmd.extend( 178 ['--signing_args', '"{}"'.format(signing_args)]) 179 180 # optional arguments for apex repacking 181 manifest_json = os.path.join(apex_dir, 'apex_manifest.json') 182 if os.path.exists(manifest_json): 183 generate_image_cmd.extend(['--manifest_json', manifest_json]) 184 generate_image_cmd.extend([payload_dir, payload_img]) 185 if OPTIONS.verbose: 186 generate_image_cmd.append('-v') 187 common.RunAndCheckOutput(generate_image_cmd) 188 189 # Add the payload image back to the apex file. 190 common.ZipDelete(self.apex_path, APEX_PAYLOAD_IMAGE) 191 with zipfile.ZipFile(self.apex_path, 'a', allowZip64=True) as output_apex: 192 common.ZipWrite(output_apex, payload_img, APEX_PAYLOAD_IMAGE, 193 compress_type=zipfile.ZIP_STORED) 194 return self.apex_path 195 196 197def SignApexPayload(avbtool, payload_file, payload_key_path, payload_key_name, 198 algorithm, salt, hash_algorithm, no_hashtree, signing_args=None): 199 """Signs a given payload_file with the payload key.""" 200 # Add the new footer. Old footer, if any, will be replaced by avbtool. 201 cmd = [avbtool, 'add_hashtree_footer', 202 '--do_not_generate_fec', 203 '--algorithm', algorithm, 204 '--key', payload_key_path, 205 '--prop', 'apex.key:{}'.format(payload_key_name), 206 '--image', payload_file, 207 '--salt', salt, 208 '--hash_algorithm', hash_algorithm] 209 if no_hashtree: 210 cmd.append('--no_hashtree') 211 if signing_args: 212 cmd.extend(shlex.split(signing_args)) 213 214 try: 215 common.RunAndCheckOutput(cmd) 216 except common.ExternalError as e: 217 raise ApexSigningError( 218 'Failed to sign APEX payload {} with {}:\n{}'.format( 219 payload_file, payload_key_path, e)) 220 221 # Verify the signed payload image with specified public key. 222 logger.info('Verifying %s', payload_file) 223 VerifyApexPayload(avbtool, payload_file, payload_key_path, no_hashtree) 224 225 226def VerifyApexPayload(avbtool, payload_file, payload_key, no_hashtree=False): 227 """Verifies the APEX payload signature with the given key.""" 228 cmd = [avbtool, 'verify_image', '--image', payload_file, 229 '--key', payload_key] 230 if no_hashtree: 231 cmd.append('--accept_zeroed_hashtree') 232 try: 233 common.RunAndCheckOutput(cmd) 234 except common.ExternalError as e: 235 raise ApexSigningError( 236 'Failed to validate payload signing for {} with {}:\n{}'.format( 237 payload_file, payload_key, e)) 238 239 240def ParseApexPayloadInfo(avbtool, payload_path): 241 """Parses the APEX payload info. 242 243 Args: 244 avbtool: The AVB tool to use. 245 payload_path: The path to the payload image. 246 247 Raises: 248 ApexInfoError on parsing errors. 249 250 Returns: 251 A dict that contains payload property-value pairs. The dict should at least 252 contain Algorithm, Salt, Tree Size and apex.key. 253 """ 254 if not os.path.exists(payload_path): 255 raise ApexInfoError('Failed to find image: {}'.format(payload_path)) 256 257 cmd = [avbtool, 'info_image', '--image', payload_path] 258 try: 259 output = common.RunAndCheckOutput(cmd) 260 except common.ExternalError as e: 261 raise ApexInfoError( 262 'Failed to get APEX payload info for {}:\n{}'.format( 263 payload_path, e)) 264 265 # Extract the Algorithm / Hash Algorithm / Salt / Prop info / Tree size from 266 # payload (i.e. an image signed with avbtool). For example, 267 # Algorithm: SHA256_RSA4096 268 PAYLOAD_INFO_PATTERN = ( 269 r'^\s*(?P<key>Algorithm|Hash Algorithm|Salt|Prop|Tree Size)\:\s*(?P<value>.*?)$') 270 payload_info_matcher = re.compile(PAYLOAD_INFO_PATTERN) 271 272 payload_info = {} 273 for line in output.split('\n'): 274 line_info = payload_info_matcher.match(line) 275 if not line_info: 276 continue 277 278 key, value = line_info.group('key'), line_info.group('value') 279 280 if key == 'Prop': 281 # Further extract the property key-value pair, from a 'Prop:' line. For 282 # example, 283 # Prop: apex.key -> 'com.android.runtime' 284 # Note that avbtool writes single or double quotes around values. 285 PROPERTY_DESCRIPTOR_PATTERN = r'^\s*(?P<key>.*?)\s->\s*(?P<value>.*?)$' 286 287 prop_matcher = re.compile(PROPERTY_DESCRIPTOR_PATTERN) 288 prop = prop_matcher.match(value) 289 if not prop: 290 raise ApexInfoError( 291 'Failed to parse prop string {}'.format(value)) 292 293 prop_key, prop_value = prop.group('key'), prop.group('value') 294 if prop_key == 'apex.key': 295 # avbtool dumps the prop value with repr(), which contains single / 296 # double quotes that we don't want. 297 payload_info[prop_key] = prop_value.strip('\"\'') 298 299 else: 300 payload_info[key] = value 301 302 # Validation check. 303 for key in ('Algorithm', 'Salt', 'apex.key', 'Hash Algorithm'): 304 if key not in payload_info: 305 raise ApexInfoError( 306 'Failed to find {} prop in {}'.format(key, payload_path)) 307 308 return payload_info 309 310 311def SignUncompressedApex(avbtool, apex_file, payload_key, container_key, 312 container_pw, apk_keys, codename_to_api_level_map, 313 no_hashtree, signing_args=None): 314 """Signs the current uncompressed APEX with the given payload/container keys. 315 316 Args: 317 apex_file: Uncompressed APEX file. 318 payload_key: The path to payload signing key (w/ extension). 319 container_key: The path to container signing key (w/o extension). 320 container_pw: The matching password of the container_key, or None. 321 apk_keys: A dict that holds the signing keys for apk files. 322 codename_to_api_level_map: A dict that maps from codename to API level. 323 no_hashtree: Don't include hashtree in the signed APEX. 324 signing_args: Additional args to be passed to the payload signer. 325 326 Returns: 327 The path to the signed APEX file. 328 """ 329 # 1. Extract the apex payload image and sign the containing apk files. Repack 330 # the apex file after signing. 331 apk_signer = ApexApkSigner(apex_file, container_pw, 332 codename_to_api_level_map) 333 apex_file = apk_signer.ProcessApexFile(apk_keys, payload_key, signing_args) 334 335 # 2a. Extract and sign the APEX_PAYLOAD_IMAGE entry with the given 336 # payload_key. 337 payload_dir = common.MakeTempDir(prefix='apex-payload-') 338 with zipfile.ZipFile(apex_file) as apex_fd: 339 payload_file = apex_fd.extract(APEX_PAYLOAD_IMAGE, payload_dir) 340 zip_items = apex_fd.namelist() 341 342 payload_info = ParseApexPayloadInfo(avbtool, payload_file) 343 if no_hashtree is None: 344 no_hashtree = payload_info.get("Tree Size", 0) == 0 345 SignApexPayload( 346 avbtool, 347 payload_file, 348 payload_key, 349 payload_info['apex.key'], 350 payload_info['Algorithm'], 351 payload_info['Salt'], 352 payload_info['Hash Algorithm'], 353 no_hashtree, 354 signing_args) 355 356 # 2b. Update the embedded payload public key. 357 payload_public_key = common.ExtractAvbPublicKey(avbtool, payload_key) 358 common.ZipDelete(apex_file, APEX_PAYLOAD_IMAGE) 359 if APEX_PUBKEY in zip_items: 360 common.ZipDelete(apex_file, APEX_PUBKEY) 361 apex_zip = zipfile.ZipFile(apex_file, 'a', allowZip64=True) 362 common.ZipWrite(apex_zip, payload_file, arcname=APEX_PAYLOAD_IMAGE) 363 common.ZipWrite(apex_zip, payload_public_key, arcname=APEX_PUBKEY) 364 common.ZipClose(apex_zip) 365 366 # 3. Align the files at page boundary (same as in apexer). 367 aligned_apex = common.MakeTempFile(prefix='apex-container-', suffix='.apex') 368 common.RunAndCheckOutput(['zipalign', '-f', '4096', apex_file, aligned_apex]) 369 370 # 4. Sign the APEX container with container_key. 371 signed_apex = common.MakeTempFile(prefix='apex-container-', suffix='.apex') 372 373 # Specify the 4K alignment when calling SignApk. 374 extra_signapk_args = OPTIONS.extra_signapk_args[:] 375 extra_signapk_args.extend(['-a', '4096']) 376 377 password = container_pw.get(container_key) if container_pw else None 378 common.SignFile( 379 aligned_apex, 380 signed_apex, 381 container_key, 382 password, 383 codename_to_api_level_map=codename_to_api_level_map, 384 extra_signapk_args=extra_signapk_args) 385 386 return signed_apex 387 388 389def SignCompressedApex(avbtool, apex_file, payload_key, container_key, 390 container_pw, apk_keys, codename_to_api_level_map, 391 no_hashtree, signing_args=None): 392 """Signs the current compressed APEX with the given payload/container keys. 393 394 Args: 395 apex_file: Raw uncompressed APEX data. 396 payload_key: The path to payload signing key (w/ extension). 397 container_key: The path to container signing key (w/o extension). 398 container_pw: The matching password of the container_key, or None. 399 apk_keys: A dict that holds the signing keys for apk files. 400 codename_to_api_level_map: A dict that maps from codename to API level. 401 no_hashtree: Don't include hashtree in the signed APEX. 402 signing_args: Additional args to be passed to the payload signer. 403 404 Returns: 405 The path to the signed APEX file. 406 """ 407 debugfs_path = os.path.join(OPTIONS.search_path, 'bin', 'debugfs_static') 408 409 # 1. Decompress original_apex inside compressed apex. 410 original_apex_file = common.MakeTempFile(prefix='original-apex-', 411 suffix='.apex') 412 # Decompression target path should not exist 413 os.remove(original_apex_file) 414 common.RunAndCheckOutput(['deapexer', '--debugfs_path', debugfs_path, 415 'decompress', '--input', apex_file, 416 '--output', original_apex_file]) 417 418 # 2. Sign original_apex 419 signed_original_apex_file = SignUncompressedApex( 420 avbtool, 421 original_apex_file, 422 payload_key, 423 container_key, 424 container_pw, 425 apk_keys, 426 codename_to_api_level_map, 427 no_hashtree, 428 signing_args) 429 430 # 3. Compress signed original apex. 431 compressed_apex_file = common.MakeTempFile(prefix='apex-container-', 432 suffix='.capex') 433 common.RunAndCheckOutput(['apex_compression_tool', 434 'compress', 435 '--apex_compression_tool_path', os.getenv('PATH'), 436 '--input', signed_original_apex_file, 437 '--output', compressed_apex_file]) 438 439 # 4. Align apex 440 aligned_apex = common.MakeTempFile(prefix='apex-container-', suffix='.capex') 441 common.RunAndCheckOutput(['zipalign', '-f', '4096', compressed_apex_file, 442 aligned_apex]) 443 444 # 5. Sign the APEX container with container_key. 445 signed_apex = common.MakeTempFile(prefix='apex-container-', suffix='.capex') 446 447 # Specify the 4K alignment when calling SignApk. 448 extra_signapk_args = OPTIONS.extra_signapk_args[:] 449 extra_signapk_args.extend(['-a', '4096']) 450 451 password = container_pw.get(container_key) if container_pw else None 452 common.SignFile( 453 aligned_apex, 454 signed_apex, 455 container_key, 456 password, 457 codename_to_api_level_map=codename_to_api_level_map, 458 extra_signapk_args=extra_signapk_args) 459 460 return signed_apex 461 462 463def SignApex(avbtool, apex_data, payload_key, container_key, container_pw, 464 apk_keys, codename_to_api_level_map, 465 no_hashtree, signing_args=None): 466 """Signs the current APEX with the given payload/container keys. 467 468 Args: 469 apex_file: Path to apex file path. 470 payload_key: The path to payload signing key (w/ extension). 471 container_key: The path to container signing key (w/o extension). 472 container_pw: The matching password of the container_key, or None. 473 apk_keys: A dict that holds the signing keys for apk files. 474 codename_to_api_level_map: A dict that maps from codename to API level. 475 no_hashtree: Don't include hashtree in the signed APEX. 476 signing_args: Additional args to be passed to the payload signer. 477 478 Returns: 479 The path to the signed APEX file. 480 """ 481 apex_file = common.MakeTempFile(prefix='apex-container-', suffix='.apex') 482 with open(apex_file, 'wb') as output_fp: 483 output_fp.write(apex_data) 484 485 debugfs_path = os.path.join(OPTIONS.search_path, 'bin', 'debugfs_static') 486 cmd = ['deapexer', '--debugfs_path', debugfs_path, 487 'info', '--print-type', apex_file] 488 489 try: 490 apex_type = common.RunAndCheckOutput(cmd).strip() 491 if apex_type == 'UNCOMPRESSED': 492 return SignUncompressedApex( 493 avbtool, 494 apex_file, 495 payload_key=payload_key, 496 container_key=container_key, 497 container_pw=None, 498 codename_to_api_level_map=codename_to_api_level_map, 499 no_hashtree=no_hashtree, 500 apk_keys=apk_keys, 501 signing_args=signing_args) 502 elif apex_type == 'COMPRESSED': 503 return SignCompressedApex( 504 avbtool, 505 apex_file, 506 payload_key=payload_key, 507 container_key=container_key, 508 container_pw=None, 509 codename_to_api_level_map=codename_to_api_level_map, 510 no_hashtree=no_hashtree, 511 apk_keys=apk_keys, 512 signing_args=signing_args) 513 else: 514 # TODO(b/172912232): support signing compressed apex 515 raise ApexInfoError('Unsupported apex type {}'.format(apex_type)) 516 517 except common.ExternalError as e: 518 raise ApexInfoError( 519 'Failed to get type for {}:\n{}'.format(apex_file, e)) 520 521 522def GetApexInfoFromTargetFiles(input_file, partition, compressed_only=True): 523 """ 524 Get information about system APEX stored in the input_file zip 525 526 Args: 527 input_file: The filename of the target build target-files zip or directory. 528 529 Return: 530 A list of ota_metadata_pb2.ApexInfo() populated using the APEX stored in 531 /system partition of the input_file 532 """ 533 534 # Extract the apex files so that we can run checks on them 535 if not isinstance(input_file, str): 536 raise RuntimeError("must pass filepath to target-files zip or directory") 537 538 apex_subdir = os.path.join(partition.upper(), 'apex') 539 if os.path.isdir(input_file): 540 tmp_dir = input_file 541 else: 542 tmp_dir = UnzipTemp(input_file, [os.path.join(apex_subdir, '*')]) 543 target_dir = os.path.join(tmp_dir, apex_subdir) 544 545 # Partial target-files packages for vendor-only builds may not contain 546 # a system apex directory. 547 if not os.path.exists(target_dir): 548 logger.info('No APEX directory at path: %s', target_dir) 549 return [] 550 551 apex_infos = [] 552 553 debugfs_path = "debugfs" 554 if OPTIONS.search_path: 555 debugfs_path = os.path.join(OPTIONS.search_path, "bin", "debugfs_static") 556 deapexer = 'deapexer' 557 if OPTIONS.search_path: 558 deapexer_path = os.path.join(OPTIONS.search_path, "bin", "deapexer") 559 if os.path.isfile(deapexer_path): 560 deapexer = deapexer_path 561 for apex_filename in os.listdir(target_dir): 562 apex_filepath = os.path.join(target_dir, apex_filename) 563 if not os.path.isfile(apex_filepath) or \ 564 not zipfile.is_zipfile(apex_filepath): 565 logger.info("Skipping %s because it's not a zipfile", apex_filepath) 566 continue 567 apex_info = ota_metadata_pb2.ApexInfo() 568 # Open the apex file to retrieve information 569 manifest = apex_manifest.fromApex(apex_filepath) 570 apex_info.package_name = manifest.name 571 apex_info.version = manifest.version 572 # Check if the file is compressed or not 573 apex_type = RunAndCheckOutput([ 574 deapexer, "--debugfs_path", debugfs_path, 575 'info', '--print-type', apex_filepath]).rstrip() 576 if apex_type == 'COMPRESSED': 577 apex_info.is_compressed = True 578 elif apex_type == 'UNCOMPRESSED': 579 apex_info.is_compressed = False 580 else: 581 raise RuntimeError('Not an APEX file: ' + apex_type) 582 583 # Decompress compressed APEX to determine its size 584 if apex_info.is_compressed: 585 decompressed_file_path = MakeTempFile(prefix="decompressed-", 586 suffix=".apex") 587 # Decompression target path should not exist 588 os.remove(decompressed_file_path) 589 RunAndCheckOutput([deapexer, 'decompress', '--input', apex_filepath, 590 '--output', decompressed_file_path]) 591 apex_info.decompressed_size = os.path.getsize(decompressed_file_path) 592 593 if not compressed_only or apex_info.is_compressed: 594 apex_infos.append(apex_info) 595 596 return apex_infos 597