1#!/usr/bin/env python 2# 3# Copyright (C) 2020 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17"""Unit tests for apexer.""" 18 19import hashlib 20import logging 21import os 22import shutil 23import stat 24import subprocess 25import tempfile 26import unittest 27from zipfile import ZipFile 28 29from apex_manifest import ValidateApexManifest 30from apex_manifest import ParseApexManifest 31 32logger = logging.getLogger(__name__) 33 34TEST_APEX = "com.android.example.apex" 35TEST_APEX_LEGACY = "com.android.example-legacy.apex" 36TEST_APEX_WITH_LOGGING_PARENT = "com.android.example-logging_parent.apex" 37TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME = "com.android.example-overridden_package_name.apex" 38 39TEST_PRIVATE_KEY = os.path.join("testdata", "com.android.example.apex.pem") 40TEST_X509_KEY = os.path.join("testdata", "com.android.example.apex.x509.pem") 41TEST_PK8_KEY = os.path.join("testdata", "com.android.example.apex.pk8") 42TEST_AVB_PUBLIC_KEY = os.path.join("testdata", "com.android.example.apex.avbpubkey") 43 44def run(args, verbose=None, **kwargs): 45 """Creates and returns a subprocess.Popen object. 46 47 Args: 48 args: The command represented as a list of strings. 49 verbose: Whether the commands should be shown. Default to the global 50 verbosity if unspecified. 51 kwargs: Any additional args to be passed to subprocess.Popen(), such as env, 52 stdin, etc. stdout and stderr will default to subprocess.PIPE and 53 subprocess.STDOUT respectively unless caller specifies any of them. 54 universal_newlines will default to True, as most of the users in 55 releasetools expect string output. 56 57 Returns: 58 A subprocess.Popen object. 59 """ 60 if 'stdout' not in kwargs and 'stderr' not in kwargs: 61 kwargs['stdout'] = subprocess.PIPE 62 kwargs['stderr'] = subprocess.STDOUT 63 if 'universal_newlines' not in kwargs: 64 kwargs['universal_newlines'] = True 65 # Don't log any if caller explicitly says so. 66 if DEBUG_TEST: 67 print("\nRunning: \n%s\n" % " ".join(args)) 68 if verbose: 69 logger.info(" Running: \"%s\"", " ".join(args)) 70 return subprocess.Popen(args, **kwargs) 71 72 73def run_host_command(args, verbose=None, **kwargs): 74 host_build_top = os.environ.get("ANDROID_BUILD_TOP") 75 if host_build_top: 76 host_command_dir = os.path.join(host_build_top, "out/host/linux-x86/bin") 77 args[0] = os.path.join(host_command_dir, args[0]) 78 return run_and_check_output(args, verbose, **kwargs) 79 80 81def run_and_check_output(args, verbose=None, **kwargs): 82 """Runs the given command and returns the output. 83 84 Args: 85 args: The command represented as a list of strings. 86 verbose: Whether the commands should be shown. Default to the global 87 verbosity if unspecified. 88 kwargs: Any additional args to be passed to subprocess.Popen(), such as env, 89 stdin, etc. stdout and stderr will default to subprocess.PIPE and 90 subprocess.STDOUT respectively unless caller specifies any of them. 91 92 Returns: 93 The output string. 94 95 Raises: 96 ExternalError: On non-zero exit from the command. 97 """ 98 proc = run(args, verbose=verbose, **kwargs) 99 output, _ = proc.communicate() 100 if output is None: 101 output = "" 102 # Don't log any if caller explicitly says so. 103 if verbose: 104 logger.info("%s", output.rstrip()) 105 if proc.returncode != 0: 106 raise RuntimeError( 107 "Failed to run command '{}' (exit code {}):\n{}".format( 108 args, proc.returncode, output)) 109 return output 110 111 112def get_sha1sum(file_path): 113 h = hashlib.sha256() 114 115 with open(file_path, 'rb') as file: 116 while True: 117 # Reading is buffered, so we can read smaller chunks. 118 chunk = file.read(h.block_size) 119 if not chunk: 120 break 121 h.update(chunk) 122 123 return h.hexdigest() 124 125 126def get_current_dir(): 127 """Returns the current dir, relative to the script dir.""" 128 # The script dir is the one we want, which could be different from pwd. 129 current_dir = os.path.dirname(os.path.realpath(__file__)) 130 return current_dir 131 132def round_up(size, unit): 133 assert unit & (unit - 1) == 0 134 return (size + unit - 1) & (~(unit - 1)) 135 136# In order to debug test failures, set DEBUG_TEST to True and run the test from 137# local workstation bypassing atest, e.g.: 138# $ m apexer_test && out/host/linux-x86/nativetest64/apexer_test/apexer_test 139# 140# the test will print out the command used, and the temporary files used by the 141# test. You need to compare e.g. /tmp/test_simple_apex_input_XXXXXXXX.apex with 142# /tmp/test_simple_apex_repacked_YYYYYYYY.apex to check where they are 143# different. 144# A simple script to analyze the differences: 145# 146# FILE_INPUT=/tmp/test_simple_apex_input_XXXXXXXX.apex 147# FILE_OUTPUT=/tmp/test_simple_apex_repacked_YYYYYYYY.apex 148# 149# cd ~/tmp/ 150# rm -rf input output 151# mkdir input output 152# unzip ${FILE_INPUT} -d input/ 153# unzip ${FILE_OUTPUT} -d output/ 154# 155# diff -r input/ output/ 156# 157# For analyzing binary diffs I had mild success using the vbindiff utility. 158DEBUG_TEST = False 159 160 161class ApexerRebuildTest(unittest.TestCase): 162 def setUp(self): 163 self._to_cleanup = [] 164 self._get_host_tools(os.path.join(get_current_dir(), "apexer_test_host_tools.zip")) 165 166 def tearDown(self): 167 if not DEBUG_TEST: 168 for i in self._to_cleanup: 169 if os.path.isdir(i): 170 shutil.rmtree(i, ignore_errors=True) 171 else: 172 os.remove(i) 173 del self._to_cleanup[:] 174 else: 175 print(self._to_cleanup) 176 177 def _get_host_tools(self, host_tools_file_path): 178 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_host_tools_") 179 self._to_cleanup.append(dir_name) 180 if os.path.isfile(host_tools_file_path): 181 with ZipFile(host_tools_file_path, 'r') as zip_obj: 182 zip_obj.extractall(path=dir_name) 183 184 files = {} 185 for i in ["apexer", "deapexer", "avbtool", "mke2fs", "sefcontext_compile", "e2fsdroid", 186 "resize2fs", "soong_zip", "aapt2", "merge_zips", "zipalign", "debugfs_static", 187 "signapk.jar", "android.jar", "blkid", "fsck.erofs"]: 188 file_path = os.path.join(dir_name, "bin", i) 189 if os.path.exists(file_path): 190 os.chmod(file_path, stat.S_IRUSR | stat.S_IXUSR); 191 files[i] = file_path 192 else: 193 files[i] = i 194 self.host_tools = files 195 self.host_tools_path = os.path.join(dir_name, "bin") 196 197 path = os.path.join(dir_name, "bin") 198 if "PATH" in os.environ: 199 path += ":" + os.environ["PATH"] 200 os.environ["PATH"] = path 201 202 ld_library_path = os.path.join(dir_name, "lib64") 203 if "LD_LIBRARY_PATH" in os.environ: 204 ld_library_path += ":" + os.environ["LD_LIBRARY_PATH"] 205 if "ANDROID_HOST_OUT" in os.environ: 206 ld_library_path += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64") 207 os.environ["LD_LIBRARY_PATH"] = ld_library_path 208 209 def _get_container_files(self, apex_file_path): 210 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_container_files_") 211 self._to_cleanup.append(dir_name) 212 with ZipFile(apex_file_path, 'r') as zip_obj: 213 zip_obj.extractall(path=dir_name) 214 files = {} 215 for i in ["apex_manifest.json", "apex_manifest.pb", 216 "apex_build_info.pb", "assets", 217 "apex_payload.img", "apex_payload.zip"]: 218 file_path = os.path.join(dir_name, i) 219 if os.path.exists(file_path): 220 files[i] = file_path 221 self.assertIn("apex_manifest.pb", files) 222 self.assertIn("apex_build_info.pb", files) 223 224 image_file = None 225 if "apex_payload.img" in files: 226 image_file = files["apex_payload.img"] 227 elif "apex_payload.zip" in files: 228 image_file = files["apex_payload.zip"] 229 self.assertIsNotNone(image_file) 230 files["apex_payload"] = image_file 231 232 return files 233 234 def _extract_payload_from_img(self, img_file_path): 235 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_") 236 self._to_cleanup.append(dir_name) 237 cmd = ["debugfs_static", '-R', 'rdump ./ %s' % dir_name, img_file_path] 238 run_host_command(cmd) 239 240 # Remove payload files added by apexer and e2fs tools. 241 for i in ["apex_manifest.json", "apex_manifest.pb"]: 242 if os.path.exists(os.path.join(dir_name, i)): 243 os.remove(os.path.join(dir_name, i)) 244 if os.path.isdir(os.path.join(dir_name, "lost+found")): 245 shutil.rmtree(os.path.join(dir_name, "lost+found")) 246 return dir_name 247 248 def _extract_payload(self, apex_file_path): 249 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_") 250 self._to_cleanup.append(dir_name) 251 cmd = ["deapexer", "--debugfs_path", self.host_tools["debugfs_static"], 252 "--blkid_path",self.host_tools["blkid"], "--fsckerofs_path", 253 self.host_tools["fsck.erofs"], "extract", apex_file_path, dir_name] 254 run_host_command(cmd) 255 256 # Remove payload files added by apexer and e2fs tools. 257 for i in ["apex_manifest.json", "apex_manifest.pb"]: 258 if os.path.exists(os.path.join(dir_name, i)): 259 os.remove(os.path.join(dir_name, i)) 260 if os.path.isdir(os.path.join(dir_name, "lost+found")): 261 shutil.rmtree(os.path.join(dir_name, "lost+found")) 262 return dir_name 263 264 def _run_apexer(self, container_files, payload_dir, args=[]): 265 unsigned_payload_only = False 266 payload_only = False 267 if "--unsigned_payload_only" in args: 268 unsigned_payload_only = True 269 if unsigned_payload_only or "--payload_only" in args: 270 payload_only = True 271 272 os.environ["APEXER_TOOL_PATH"] = (self.host_tools_path + 273 ":out/host/linux-x86/bin:prebuilts/sdk/tools/linux/bin") 274 cmd = ["apexer", "--force", "--include_build_info", "--do_not_check_keyname"] 275 if DEBUG_TEST: 276 cmd.append('-v') 277 cmd.extend(["--apexer_tool_path", os.environ["APEXER_TOOL_PATH"]]) 278 cmd.extend(["--android_jar_path", self.host_tools["android.jar"]]) 279 cmd.extend(["--manifest", container_files["apex_manifest.pb"]]) 280 if "apex_manifest.json" in container_files: 281 cmd.extend(["--manifest_json", container_files["apex_manifest.json"]]) 282 cmd.extend(["--build_info", container_files["apex_build_info.pb"]]) 283 if not payload_only and "assets" in container_files: 284 cmd.extend(["--assets_dir", container_files["assets"]]) 285 if not unsigned_payload_only: 286 cmd.extend(["--key", os.path.join(get_current_dir(), TEST_PRIVATE_KEY)]) 287 cmd.extend(["--pubkey", os.path.join(get_current_dir(), TEST_AVB_PUBLIC_KEY)]) 288 cmd.extend(args) 289 290 # Decide on output file name 291 apex_suffix = ".apex.unsigned" 292 if payload_only: 293 apex_suffix = ".payload" 294 fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=apex_suffix) 295 os.close(fd) 296 self._to_cleanup.append(fn) 297 cmd.extend([payload_dir, fn]) 298 299 run_host_command(cmd) 300 return fn 301 302 def _get_java_toolchain(self): 303 java_toolchain = "java" 304 if os.path.isfile("prebuilts/jdk/jdk17/linux-x86/bin/java"): 305 java_toolchain = "prebuilts/jdk/jdk17/linux-x86/bin/java" 306 elif os.path.isfile("/jdk/jdk17/linux-x86/bin/java"): 307 java_toolchain = "/jdk/jdk17/linux-x86/bin/java" 308 elif "ANDROID_JAVA_TOOLCHAIN" in os.environ: 309 java_toolchain = os.path.join(os.environ["ANDROID_JAVA_TOOLCHAIN"], "java") 310 elif "ANDROID_JAVA_HOME" in os.environ: 311 java_toolchain = os.path.join(os.environ["ANDROID_JAVA_HOME"], "bin", "java") 312 elif "JAVA_HOME" in os.environ: 313 java_toolchain = os.path.join(os.environ["JAVA_HOME"], "bin", "java") 314 315 java_dep_lib = os.environ["LD_LIBRARY_PATH"] 316 if "ANDROID_HOST_OUT" in os.environ: 317 java_dep_lib += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64") 318 if "ANDROID_BUILD_TOP" in os.environ: 319 java_dep_lib += ":" + os.path.join(os.environ["ANDROID_BUILD_TOP"], 320 "out/host/linux-x86/lib64") 321 322 return [java_toolchain, java_dep_lib] 323 324 def _sign_apk_container(self, unsigned_apex): 325 fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".apex") 326 os.close(fd) 327 self._to_cleanup.append(fn) 328 java_toolchain, java_dep_lib = self._get_java_toolchain() 329 cmd = [ 330 java_toolchain, 331 "-Djava.library.path=" + java_dep_lib, 332 "-jar", self.host_tools['signapk.jar'], 333 "-a", "4096", "--align-file-size", 334 os.path.join(get_current_dir(), TEST_X509_KEY), 335 os.path.join(get_current_dir(), TEST_PK8_KEY), 336 unsigned_apex, fn] 337 run_and_check_output(cmd) 338 return fn 339 340 def _sign_payload(self, container_files, unsigned_payload): 341 fd, signed_payload = \ 342 tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".payload") 343 os.close(fd) 344 self._to_cleanup.append(signed_payload) 345 shutil.copyfile(unsigned_payload, signed_payload) 346 347 cmd = ['avbtool'] 348 cmd.append('add_hashtree_footer') 349 cmd.append('--do_not_generate_fec') 350 cmd.extend(['--algorithm', 'SHA256_RSA4096']) 351 cmd.extend(['--hash_algorithm', 'sha256']) 352 cmd.extend(['--key', os.path.join(get_current_dir(), TEST_PRIVATE_KEY)]) 353 manifest_apex = ParseApexManifest(container_files["apex_manifest.pb"]) 354 ValidateApexManifest(manifest_apex) 355 cmd.extend(['--prop', 'apex.key:' + manifest_apex.name]) 356 # Set up the salt based on manifest content which includes name 357 # and version 358 salt = hashlib.sha256(manifest_apex.SerializeToString()).hexdigest() 359 cmd.extend(['--salt', salt]) 360 cmd.extend(['--image', signed_payload]) 361 cmd.append('--no_hashtree') 362 run_and_check_output(cmd) 363 364 return signed_payload 365 366 def _verify_payload(self, payload): 367 """Verifies that the payload is properly signed by avbtool""" 368 cmd = ["avbtool", "verify_image", "--image", payload, "--accept_zeroed_hashtree"] 369 run_and_check_output(cmd) 370 371 def _run_build_test(self, apex_name): 372 apex_file_path = os.path.join(get_current_dir(), apex_name + ".apex") 373 if DEBUG_TEST: 374 fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_input_", suffix=".apex") 375 os.close(fd) 376 shutil.copyfile(apex_file_path, fn) 377 self._to_cleanup.append(fn) 378 container_files = self._get_container_files(apex_file_path) 379 payload_dir = self._extract_payload(apex_file_path) 380 repack_apex_file_path = self._run_apexer(container_files, payload_dir) 381 resigned_apex_file_path = self._sign_apk_container(repack_apex_file_path) 382 self.assertEqual(get_sha1sum(apex_file_path), get_sha1sum(resigned_apex_file_path)) 383 384 def test_simple_apex(self): 385 self._run_build_test(TEST_APEX) 386 387 def test_legacy_apex(self): 388 self._run_build_test(TEST_APEX_LEGACY) 389 390 def test_output_payload_only(self): 391 """Assert that payload-only output from apexer is same as the payload we get by unzipping 392 apex. 393 """ 394 apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex") 395 container_files = self._get_container_files(apex_file_path) 396 payload_dir = self._extract_payload(apex_file_path) 397 payload_only_file_path = self._run_apexer(container_files, payload_dir, ["--payload_only"]) 398 self._verify_payload(payload_only_file_path) 399 self.assertEqual(get_sha1sum(payload_only_file_path), 400 get_sha1sum(container_files["apex_payload"])) 401 402 def test_output_unsigned_payload_only(self): 403 """Assert that when unsigned-payload-only output from apexer is signed by the avb key, it is 404 same as the payload we get by unzipping apex. 405 """ 406 apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex") 407 container_files = self._get_container_files(apex_file_path) 408 payload_dir = self._extract_payload(apex_file_path) 409 unsigned_payload_only_file_path = self._run_apexer(container_files, payload_dir, 410 ["--unsigned_payload_only"]) 411 with self.assertRaises(RuntimeError) as error: 412 self._verify_payload(unsigned_payload_only_file_path) 413 self.assertIn("Given image does not look like a vbmeta image", str(error.exception)) 414 signed_payload = self._sign_payload(container_files, unsigned_payload_only_file_path) 415 self.assertEqual(get_sha1sum(signed_payload), 416 get_sha1sum(container_files["apex_payload"])) 417 418 # Now assert that given an unsigned image and the original container 419 # files, we can produce an identical unsigned image. 420 unsigned_payload_dir = self._extract_payload_from_img(unsigned_payload_only_file_path) 421 unsigned_payload_only_2_file_path = self._run_apexer(container_files, unsigned_payload_dir, 422 ["--unsigned_payload_only"]) 423 self.assertEqual(get_sha1sum(unsigned_payload_only_file_path), 424 get_sha1sum(unsigned_payload_only_2_file_path)) 425 426 def test_apex_with_logging_parent(self): 427 self._run_build_test(TEST_APEX_WITH_LOGGING_PARENT) 428 429 def test_apex_with_overridden_package_name(self): 430 self._run_build_test(TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME) 431 432 433if __name__ == '__main__': 434 unittest.main(verbosity=2) 435