1#!/usr/bin/env python 2# 3# Copyright (C) 2020 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17"""Unit tests for apexer.""" 18 19import hashlib 20import logging 21import os 22import re 23import shutil 24import stat 25import subprocess 26import tempfile 27import unittest 28from zipfile import ZipFile 29 30from apex_manifest import ValidateApexManifest 31 32logger = logging.getLogger(__name__) 33 34TEST_APEX = "com.android.example.apex" 35TEST_APEX_LEGACY = "com.android.example-legacy.apex" 36TEST_APEX_WITH_LOGGING_PARENT = "com.android.example-logging_parent.apex" 37TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME = "com.android.example-overridden_package_name.apex" 38 39TEST_PRIVATE_KEY = os.path.join("testdata", "com.android.example.apex.pem") 40TEST_X509_KEY = os.path.join("testdata", "com.android.example.apex.x509.pem") 41TEST_PK8_KEY = os.path.join("testdata", "com.android.example.apex.pk8") 42TEST_AVB_PUBLIC_KEY = os.path.join("testdata", "com.android.example.apex.avbpubkey") 43 44 45def run(args, verbose=None, **kwargs): 46 """Creates and returns a subprocess.Popen object. 47 48 Args: 49 args: The command represented as a list of strings. 50 verbose: Whether the commands should be shown. Default to the global 51 verbosity if unspecified. 52 kwargs: Any additional args to be passed to subprocess.Popen(), such as env, 53 stdin, etc. stdout and stderr will default to subprocess.PIPE and 54 subprocess.STDOUT respectively unless caller specifies any of them. 55 universal_newlines will default to True, as most of the users in 56 releasetools expect string output. 57 58 Returns: 59 A subprocess.Popen object. 60 """ 61 if 'stdout' not in kwargs and 'stderr' not in kwargs: 62 kwargs['stdout'] = subprocess.PIPE 63 kwargs['stderr'] = subprocess.STDOUT 64 if 'universal_newlines' not in kwargs: 65 kwargs['universal_newlines'] = True 66 # Don't log any if caller explicitly says so. 67 if DEBUG_TEST: 68 print("\nRunning: \n%s\n" % " ".join(args)) 69 if verbose: 70 logger.info(" Running: \"%s\"", " ".join(args)) 71 return subprocess.Popen(args, **kwargs) 72 73 74def run_host_command(args, verbose=None, **kwargs): 75 host_build_top = os.environ.get("ANDROID_BUILD_TOP") 76 if host_build_top: 77 host_command_dir = os.path.join(host_build_top, "out/soong/host/linux-x86/bin") 78 args[0] = os.path.join(host_command_dir, args[0]) 79 return run_and_check_output(args, verbose, **kwargs) 80 81 82def run_and_check_output(args, verbose=None, **kwargs): 83 """Runs the given command and returns the output. 84 85 Args: 86 args: The command represented as a list of strings. 87 verbose: Whether the commands should be shown. Default to the global 88 verbosity if unspecified. 89 kwargs: Any additional args to be passed to subprocess.Popen(), such as env, 90 stdin, etc. stdout and stderr will default to subprocess.PIPE and 91 subprocess.STDOUT respectively unless caller specifies any of them. 92 93 Returns: 94 The output string. 95 96 Raises: 97 ExternalError: On non-zero exit from the command. 98 """ 99 proc = run(args, verbose=verbose, **kwargs) 100 output, _ = proc.communicate() 101 if output is None: 102 output = "" 103 # Don't log any if caller explicitly says so. 104 if verbose: 105 logger.info("%s", output.rstrip()) 106 if proc.returncode != 0: 107 raise RuntimeError( 108 "Failed to run command '{}' (exit code {}):\n{}".format( 109 args, proc.returncode, output)) 110 return output 111 112 113def get_sha1sum(file_path): 114 h = hashlib.sha256() 115 116 with open(file_path, 'rb') as file: 117 while True: 118 # Reading is buffered, so we can read smaller chunks. 119 chunk = file.read(h.block_size) 120 if not chunk: 121 break 122 h.update(chunk) 123 124 return h.hexdigest() 125 126 127def get_current_dir(): 128 """Returns the current dir, relative to the script dir.""" 129 # The script dir is the one we want, which could be different from pwd. 130 current_dir = os.path.dirname(os.path.realpath(__file__)) 131 return current_dir 132 133def round_up(size, unit): 134 assert unit & (unit - 1) == 0 135 return (size + unit - 1) & (~(unit - 1)) 136 137# In order to debug test failures, set DEBUG_TEST to True and run the test from 138# local workstation bypassing atest, e.g.: 139# $ m apexer_test && out/host/linux-x86/nativetest64/apexer_test/apexer_test 140# 141# the test will print out the command used, and the temporary files used by the 142# test. You need to compare e.g. /tmp/test_simple_apex_input_XXXXXXXX.apex with 143# /tmp/test_simple_apex_repacked_YYYYYYYY.apex to check where they are 144# different. 145# A simple script to analyze the differences: 146# 147# FILE_INPUT=/tmp/test_simple_apex_input_XXXXXXXX.apex 148# FILE_OUTPUT=/tmp/test_simple_apex_repacked_YYYYYYYY.apex 149# 150# cd ~/tmp/ 151# rm -rf input output 152# mkdir input output 153# unzip ${FILE_INPUT} -d input/ 154# unzip ${FILE_OUTPUT} -d output/ 155# 156# diff -r input/ output/ 157# 158# For analyzing binary diffs I had mild success using the vbindiff utility. 159DEBUG_TEST = False 160 161 162class ApexerRebuildTest(unittest.TestCase): 163 def setUp(self): 164 self._to_cleanup = [] 165 self._get_host_tools(os.path.join(get_current_dir(), "apexer_test_host_tools.zip")) 166 167 def tearDown(self): 168 if not DEBUG_TEST: 169 for i in self._to_cleanup: 170 if os.path.isdir(i): 171 shutil.rmtree(i, ignore_errors=True) 172 else: 173 os.remove(i) 174 del self._to_cleanup[:] 175 else: 176 print(self._to_cleanup) 177 178 def _get_host_tools(self, host_tools_file_path): 179 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_host_tools_") 180 self._to_cleanup.append(dir_name) 181 if os.path.isfile(host_tools_file_path): 182 with ZipFile(host_tools_file_path, 'r') as zip_obj: 183 zip_obj.extractall(path=dir_name) 184 185 files = {} 186 for i in ["apexer", "deapexer", "avbtool", "mke2fs", "sefcontext_compile", "e2fsdroid", 187 "resize2fs", "soong_zip", "aapt2", "merge_zips", "zipalign", "debugfs_static", 188 "signapk.jar", "android.jar"]: 189 file_path = os.path.join(dir_name, "bin", i) 190 if os.path.exists(file_path): 191 os.chmod(file_path, stat.S_IRUSR | stat.S_IXUSR); 192 files[i] = file_path 193 else: 194 files[i] = i 195 self.host_tools = files 196 self.host_tools_path = os.path.join(dir_name, "bin") 197 198 path = os.path.join(dir_name, "bin") 199 if "PATH" in os.environ: 200 path += ":" + os.environ["PATH"] 201 os.environ["PATH"] = path 202 203 ld_library_path = os.path.join(dir_name, "lib64") 204 if "LD_LIBRARY_PATH" in os.environ: 205 ld_library_path += ":" + os.environ["LD_LIBRARY_PATH"] 206 if "ANDROID_HOST_OUT" in os.environ: 207 ld_library_path += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64") 208 os.environ["LD_LIBRARY_PATH"] = ld_library_path 209 210 def _get_container_files(self, apex_file_path): 211 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_container_files_") 212 self._to_cleanup.append(dir_name) 213 with ZipFile(apex_file_path, 'r') as zip_obj: 214 zip_obj.extractall(path=dir_name) 215 files = {} 216 for i in ["apex_manifest.json", "apex_manifest.pb", 217 "apex_build_info.pb", "assets", 218 "apex_payload.img", "apex_payload.zip"]: 219 file_path = os.path.join(dir_name, i) 220 if os.path.exists(file_path): 221 files[i] = file_path 222 self.assertIn("apex_manifest.pb", files) 223 self.assertIn("apex_build_info.pb", files) 224 225 image_file = None 226 if "apex_payload.img" in files: 227 image_file = files["apex_payload.img"] 228 elif "apex_payload.zip" in files: 229 image_file = files["apex_payload.zip"] 230 self.assertIsNotNone(image_file) 231 files["apex_payload"] = image_file 232 233 return files 234 235 def _extract_payload_from_img(self, img_file_path): 236 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_") 237 self._to_cleanup.append(dir_name) 238 cmd = ["debugfs_static", '-R', 'rdump ./ %s' % dir_name, img_file_path] 239 run_host_command(cmd) 240 241 # Remove payload files added by apexer and e2fs tools. 242 for i in ["apex_manifest.json", "apex_manifest.pb"]: 243 if os.path.exists(os.path.join(dir_name, i)): 244 os.remove(os.path.join(dir_name, i)) 245 if os.path.isdir(os.path.join(dir_name, "lost+found")): 246 shutil.rmtree(os.path.join(dir_name, "lost+found")) 247 return dir_name 248 249 def _extract_payload(self, apex_file_path): 250 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_") 251 self._to_cleanup.append(dir_name) 252 cmd = ["deapexer", "--debugfs_path", self.host_tools["debugfs_static"], 253 "extract", apex_file_path, dir_name] 254 run_host_command(cmd) 255 256 # Remove payload files added by apexer and e2fs tools. 257 for i in ["apex_manifest.json", "apex_manifest.pb"]: 258 if os.path.exists(os.path.join(dir_name, i)): 259 os.remove(os.path.join(dir_name, i)) 260 if os.path.isdir(os.path.join(dir_name, "lost+found")): 261 shutil.rmtree(os.path.join(dir_name, "lost+found")) 262 return dir_name 263 264 def _run_apexer(self, container_files, payload_dir, args=[]): 265 unsigned_payload_only = False 266 payload_only = False 267 if "--unsigned_payload_only" in args: 268 unsigned_payload_only = True 269 if unsigned_payload_only or "--payload_only" in args: 270 payload_only = True 271 272 os.environ["APEXER_TOOL_PATH"] = (self.host_tools_path + 273 ":out/soong/host/linux-x86/bin:prebuilts/sdk/tools/linux/bin") 274 cmd = ["apexer", "--force", "--include_build_info", "--do_not_check_keyname"] 275 if DEBUG_TEST: 276 cmd.append('-v') 277 cmd.extend(["--apexer_tool_path", os.environ["APEXER_TOOL_PATH"]]) 278 cmd.extend(["--android_jar_path", self.host_tools["android.jar"]]) 279 cmd.extend(["--manifest", container_files["apex_manifest.pb"]]) 280 if "apex_manifest.json" in container_files: 281 cmd.extend(["--manifest_json", container_files["apex_manifest.json"]]) 282 cmd.extend(["--build_info", container_files["apex_build_info.pb"]]) 283 if not payload_only and "assets" in container_files: 284 cmd.extend(["--assets_dir", "assets"]) 285 if not unsigned_payload_only: 286 cmd.extend(["--key", os.path.join(get_current_dir(), TEST_PRIVATE_KEY)]) 287 cmd.extend(["--pubkey", os.path.join(get_current_dir(), TEST_AVB_PUBLIC_KEY)]) 288 cmd.extend(args) 289 290 # Decide on output file name 291 apex_suffix = ".apex.unsigned" 292 if payload_only: 293 apex_suffix = ".payload" 294 fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=apex_suffix) 295 os.close(fd) 296 self._to_cleanup.append(fn) 297 cmd.extend([payload_dir, fn]) 298 299 run_host_command(cmd) 300 return fn 301 302 def _get_java_toolchain(self): 303 java_toolchain = "java" 304 if os.path.isfile("prebuilts/jdk/jdk11/linux-x86/bin/java"): 305 java_toolchain = "prebuilts/jdk/jdk11/linux-x86/bin/java" 306 elif "ANDROID_JAVA_TOOLCHAIN" in os.environ: 307 java_toolchain = os.path.join(os.environ["ANDROID_JAVA_TOOLCHAIN"], "java") 308 elif "ANDROID_JAVA_HOME" in os.environ: 309 java_toolchain = os.path.join(os.environ["ANDROID_JAVA_HOME"], "bin", "java") 310 elif "JAVA_HOME" in os.environ: 311 java_toolchain = os.path.join(os.environ["JAVA_HOME"], "bin", "java") 312 313 java_dep_lib = os.environ["LD_LIBRARY_PATH"] 314 if "ANDROID_HOST_OUT" in os.environ: 315 java_dep_lib += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64") 316 if "ANDROID_BUILD_TOP" in os.environ: 317 java_dep_lib += ":" + os.path.join(os.environ["ANDROID_BUILD_TOP"], 318 "out/soong/host/linux-x86/lib64") 319 320 return [java_toolchain, java_dep_lib] 321 322 def _sign_apk_container(self, unsigned_apex): 323 fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".apex") 324 os.close(fd) 325 self._to_cleanup.append(fn) 326 java_toolchain, java_dep_lib = self._get_java_toolchain() 327 cmd = [ 328 java_toolchain, 329 "-Djava.library.path=" + java_dep_lib, 330 "-jar", self.host_tools['signapk.jar'], 331 "-a", "4096", 332 os.path.join(get_current_dir(), TEST_X509_KEY), 333 os.path.join(get_current_dir(), TEST_PK8_KEY), 334 unsigned_apex, fn] 335 run_and_check_output(cmd) 336 return fn 337 338 def _sign_payload(self, container_files, unsigned_payload): 339 fd, signed_payload = \ 340 tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".payload") 341 os.close(fd) 342 self._to_cleanup.append(signed_payload) 343 shutil.copyfile(unsigned_payload, signed_payload) 344 345 cmd = ['avbtool'] 346 cmd.append('add_hashtree_footer') 347 cmd.append('--do_not_generate_fec') 348 cmd.extend(['--algorithm', 'SHA256_RSA4096']) 349 cmd.extend(['--hash_algorithm', 'sha256']) 350 cmd.extend(['--key', os.path.join(get_current_dir(), TEST_PRIVATE_KEY)]) 351 manifest_apex = ValidateApexManifest(container_files["apex_manifest.pb"]) 352 cmd.extend(['--prop', 'apex.key:' + manifest_apex.name]) 353 # Set up the salt based on manifest content which includes name 354 # and version 355 salt = hashlib.sha256(manifest_apex.SerializeToString()).hexdigest() 356 cmd.extend(['--salt', salt]) 357 cmd.extend(['--image', signed_payload]) 358 cmd.append('--no_hashtree') 359 run_and_check_output(cmd) 360 361 return signed_payload 362 363 def _verify_payload(self, payload): 364 """Verifies that the payload is properly signed by avbtool""" 365 cmd = ["avbtool", "verify_image", "--image", payload, "--accept_zeroed_hashtree"] 366 run_and_check_output(cmd) 367 368 def _run_build_test(self, apex_name): 369 apex_file_path = os.path.join(get_current_dir(), apex_name + ".apex") 370 if DEBUG_TEST: 371 fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_input_", suffix=".apex") 372 os.close(fd) 373 shutil.copyfile(apex_file_path, fn) 374 self._to_cleanup.append(fn) 375 container_files = self._get_container_files(apex_file_path) 376 payload_dir = self._extract_payload(apex_file_path) 377 repack_apex_file_path = self._run_apexer(container_files, payload_dir) 378 resigned_apex_file_path = self._sign_apk_container(repack_apex_file_path) 379 self.assertEqual(get_sha1sum(apex_file_path), get_sha1sum(resigned_apex_file_path)) 380 381 def test_simple_apex(self): 382 self._run_build_test(TEST_APEX) 383 384 def test_legacy_apex(self): 385 self._run_build_test(TEST_APEX_LEGACY) 386 387 def test_output_payload_only(self): 388 """Assert that payload-only output from apexer is same as the payload we get by unzipping 389 apex. 390 """ 391 apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex") 392 container_files = self._get_container_files(apex_file_path) 393 payload_dir = self._extract_payload(apex_file_path) 394 payload_only_file_path = self._run_apexer(container_files, payload_dir, ["--payload_only"]) 395 self._verify_payload(payload_only_file_path) 396 self.assertEqual(get_sha1sum(payload_only_file_path), 397 get_sha1sum(container_files["apex_payload"])) 398 399 def test_output_unsigned_payload_only(self): 400 """Assert that when unsigned-payload-only output from apexer is signed by the avb key, it is 401 same as the payload we get by unzipping apex. 402 """ 403 apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex") 404 container_files = self._get_container_files(apex_file_path) 405 payload_dir = self._extract_payload(apex_file_path) 406 unsigned_payload_only_file_path = self._run_apexer(container_files, payload_dir, 407 ["--unsigned_payload_only"]) 408 with self.assertRaises(RuntimeError) as error: 409 self._verify_payload(unsigned_payload_only_file_path) 410 self.assertIn("Given image does not look like a vbmeta image", str(error.exception)) 411 signed_payload = self._sign_payload(container_files, unsigned_payload_only_file_path) 412 self.assertEqual(get_sha1sum(signed_payload), 413 get_sha1sum(container_files["apex_payload"])) 414 415 # Now assert that given an unsigned image and the original container 416 # files, we can produce an identical unsigned image. 417 unsigned_payload_dir = self._extract_payload_from_img(unsigned_payload_only_file_path) 418 unsigned_payload_only_2_file_path = self._run_apexer(container_files, unsigned_payload_dir, 419 ["--unsigned_payload_only"]) 420 self.assertEqual(get_sha1sum(unsigned_payload_only_file_path), 421 get_sha1sum(unsigned_payload_only_2_file_path)) 422 423 def test_apex_with_logging_parent(self): 424 self._run_build_test(TEST_APEX_WITH_LOGGING_PARENT) 425 426 def test_apex_with_overridden_package_name(self): 427 self._run_build_test(TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME) 428 429 430if __name__ == '__main__': 431 unittest.main(verbosity=2) 432