1#!/usr/bin/env python 2# 3# Copyright (C) 2020 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17"""Unit tests for apexer.""" 18 19import hashlib 20import logging 21import os 22import re 23import shutil 24import stat 25import subprocess 26import tempfile 27import unittest 28from zipfile import ZipFile 29 30from apex_manifest import ValidateApexManifest 31 32logger = logging.getLogger(__name__) 33 34TEST_APEX = "com.android.example.apex" 35TEST_APEX_LEGACY = "com.android.example-legacy.apex" 36TEST_APEX_WITH_LOGGING_PARENT = "com.android.example-logging_parent.apex" 37TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME = "com.android.example-overridden_package_name.apex" 38 39TEST_PRIVATE_KEY = os.path.join("testdata", "com.android.example.apex.pem") 40TEST_X509_KEY = os.path.join("testdata", "com.android.example.apex.x509.pem") 41TEST_PK8_KEY = os.path.join("testdata", "com.android.example.apex.pk8") 42TEST_AVB_PUBLIC_KEY = os.path.join("testdata", "com.android.example.apex.avbpubkey") 43 44def run(args, verbose=None, **kwargs): 45 """Creates and returns a subprocess.Popen object. 46 47 Args: 48 args: The command represented as a list of strings. 49 verbose: Whether the commands should be shown. Default to the global 50 verbosity if unspecified. 51 kwargs: Any additional args to be passed to subprocess.Popen(), such as env, 52 stdin, etc. stdout and stderr will default to subprocess.PIPE and 53 subprocess.STDOUT respectively unless caller specifies any of them. 54 universal_newlines will default to True, as most of the users in 55 releasetools expect string output. 56 57 Returns: 58 A subprocess.Popen object. 59 """ 60 if 'stdout' not in kwargs and 'stderr' not in kwargs: 61 kwargs['stdout'] = subprocess.PIPE 62 kwargs['stderr'] = subprocess.STDOUT 63 if 'universal_newlines' not in kwargs: 64 kwargs['universal_newlines'] = True 65 # Don't log any if caller explicitly says so. 66 if DEBUG_TEST: 67 print("\nRunning: \n%s\n" % " ".join(args)) 68 if verbose: 69 logger.info(" Running: \"%s\"", " ".join(args)) 70 return subprocess.Popen(args, **kwargs) 71 72 73def run_host_command(args, verbose=None, **kwargs): 74 host_build_top = os.environ.get("ANDROID_BUILD_TOP") 75 if host_build_top: 76 host_command_dir = os.path.join(host_build_top, "out/host/linux-x86/bin") 77 args[0] = os.path.join(host_command_dir, args[0]) 78 return run_and_check_output(args, verbose, **kwargs) 79 80 81def run_and_check_output(args, verbose=None, **kwargs): 82 """Runs the given command and returns the output. 83 84 Args: 85 args: The command represented as a list of strings. 86 verbose: Whether the commands should be shown. Default to the global 87 verbosity if unspecified. 88 kwargs: Any additional args to be passed to subprocess.Popen(), such as env, 89 stdin, etc. stdout and stderr will default to subprocess.PIPE and 90 subprocess.STDOUT respectively unless caller specifies any of them. 91 92 Returns: 93 The output string. 94 95 Raises: 96 ExternalError: On non-zero exit from the command. 97 """ 98 proc = run(args, verbose=verbose, **kwargs) 99 output, _ = proc.communicate() 100 if output is None: 101 output = "" 102 # Don't log any if caller explicitly says so. 103 if verbose: 104 logger.info("%s", output.rstrip()) 105 if proc.returncode != 0: 106 raise RuntimeError( 107 "Failed to run command '{}' (exit code {}):\n{}".format( 108 args, proc.returncode, output)) 109 return output 110 111 112def get_sha1sum(file_path): 113 h = hashlib.sha256() 114 115 with open(file_path, 'rb') as file: 116 while True: 117 # Reading is buffered, so we can read smaller chunks. 118 chunk = file.read(h.block_size) 119 if not chunk: 120 break 121 h.update(chunk) 122 123 return h.hexdigest() 124 125 126def get_current_dir(): 127 """Returns the current dir, relative to the script dir.""" 128 # The script dir is the one we want, which could be different from pwd. 129 current_dir = os.path.dirname(os.path.realpath(__file__)) 130 return current_dir 131 132def round_up(size, unit): 133 assert unit & (unit - 1) == 0 134 return (size + unit - 1) & (~(unit - 1)) 135 136# In order to debug test failures, set DEBUG_TEST to True and run the test from 137# local workstation bypassing atest, e.g.: 138# $ m apexer_test && out/host/linux-x86/nativetest64/apexer_test/apexer_test 139# 140# the test will print out the command used, and the temporary files used by the 141# test. You need to compare e.g. /tmp/test_simple_apex_input_XXXXXXXX.apex with 142# /tmp/test_simple_apex_repacked_YYYYYYYY.apex to check where they are 143# different. 144# A simple script to analyze the differences: 145# 146# FILE_INPUT=/tmp/test_simple_apex_input_XXXXXXXX.apex 147# FILE_OUTPUT=/tmp/test_simple_apex_repacked_YYYYYYYY.apex 148# 149# cd ~/tmp/ 150# rm -rf input output 151# mkdir input output 152# unzip ${FILE_INPUT} -d input/ 153# unzip ${FILE_OUTPUT} -d output/ 154# 155# diff -r input/ output/ 156# 157# For analyzing binary diffs I had mild success using the vbindiff utility. 158DEBUG_TEST = False 159 160 161class ApexerRebuildTest(unittest.TestCase): 162 def setUp(self): 163 self._to_cleanup = [] 164 self._get_host_tools(os.path.join(get_current_dir(), "apexer_test_host_tools.zip")) 165 166 def tearDown(self): 167 if not DEBUG_TEST: 168 for i in self._to_cleanup: 169 if os.path.isdir(i): 170 shutil.rmtree(i, ignore_errors=True) 171 else: 172 os.remove(i) 173 del self._to_cleanup[:] 174 else: 175 print(self._to_cleanup) 176 177 def _get_host_tools(self, host_tools_file_path): 178 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_host_tools_") 179 self._to_cleanup.append(dir_name) 180 if os.path.isfile(host_tools_file_path): 181 with ZipFile(host_tools_file_path, 'r') as zip_obj: 182 zip_obj.extractall(path=dir_name) 183 184 files = {} 185 for i in ["apexer", "deapexer", "avbtool", "mke2fs", "sefcontext_compile", "e2fsdroid", 186 "resize2fs", "soong_zip", "aapt2", "merge_zips", "zipalign", "debugfs_static", 187 "signapk.jar", "android.jar"]: 188 file_path = os.path.join(dir_name, "bin", i) 189 if os.path.exists(file_path): 190 os.chmod(file_path, stat.S_IRUSR | stat.S_IXUSR); 191 files[i] = file_path 192 else: 193 files[i] = i 194 self.host_tools = files 195 self.host_tools_path = os.path.join(dir_name, "bin") 196 197 path = os.path.join(dir_name, "bin") 198 if "PATH" in os.environ: 199 path += ":" + os.environ["PATH"] 200 os.environ["PATH"] = path 201 202 ld_library_path = os.path.join(dir_name, "lib64") 203 if "LD_LIBRARY_PATH" in os.environ: 204 ld_library_path += ":" + os.environ["LD_LIBRARY_PATH"] 205 if "ANDROID_HOST_OUT" in os.environ: 206 ld_library_path += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64") 207 os.environ["LD_LIBRARY_PATH"] = ld_library_path 208 209 def _get_container_files(self, apex_file_path): 210 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_container_files_") 211 self._to_cleanup.append(dir_name) 212 with ZipFile(apex_file_path, 'r') as zip_obj: 213 zip_obj.extractall(path=dir_name) 214 files = {} 215 for i in ["apex_manifest.json", "apex_manifest.pb", 216 "apex_build_info.pb", "assets", 217 "apex_payload.img", "apex_payload.zip"]: 218 file_path = os.path.join(dir_name, i) 219 if os.path.exists(file_path): 220 files[i] = file_path 221 self.assertIn("apex_manifest.pb", files) 222 self.assertIn("apex_build_info.pb", files) 223 224 image_file = None 225 if "apex_payload.img" in files: 226 image_file = files["apex_payload.img"] 227 elif "apex_payload.zip" in files: 228 image_file = files["apex_payload.zip"] 229 self.assertIsNotNone(image_file) 230 files["apex_payload"] = image_file 231 232 return files 233 234 def _extract_payload_from_img(self, img_file_path): 235 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_") 236 self._to_cleanup.append(dir_name) 237 cmd = ["debugfs_static", '-R', 'rdump ./ %s' % dir_name, img_file_path] 238 run_host_command(cmd) 239 240 # Remove payload files added by apexer and e2fs tools. 241 for i in ["apex_manifest.json", "apex_manifest.pb"]: 242 if os.path.exists(os.path.join(dir_name, i)): 243 os.remove(os.path.join(dir_name, i)) 244 if os.path.isdir(os.path.join(dir_name, "lost+found")): 245 shutil.rmtree(os.path.join(dir_name, "lost+found")) 246 return dir_name 247 248 def _extract_payload(self, apex_file_path): 249 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_") 250 self._to_cleanup.append(dir_name) 251 cmd = ["deapexer", "--debugfs_path", self.host_tools["debugfs_static"], 252 "extract", apex_file_path, dir_name] 253 run_host_command(cmd) 254 255 # Remove payload files added by apexer and e2fs tools. 256 for i in ["apex_manifest.json", "apex_manifest.pb"]: 257 if os.path.exists(os.path.join(dir_name, i)): 258 os.remove(os.path.join(dir_name, i)) 259 if os.path.isdir(os.path.join(dir_name, "lost+found")): 260 shutil.rmtree(os.path.join(dir_name, "lost+found")) 261 return dir_name 262 263 def _run_apexer(self, container_files, payload_dir, args=[]): 264 unsigned_payload_only = False 265 payload_only = False 266 if "--unsigned_payload_only" in args: 267 unsigned_payload_only = True 268 if unsigned_payload_only or "--payload_only" in args: 269 payload_only = True 270 271 os.environ["APEXER_TOOL_PATH"] = (self.host_tools_path + 272 ":out/host/linux-x86/bin:prebuilts/sdk/tools/linux/bin") 273 cmd = ["apexer", "--force", "--include_build_info", "--do_not_check_keyname"] 274 if DEBUG_TEST: 275 cmd.append('-v') 276 cmd.extend(["--apexer_tool_path", os.environ["APEXER_TOOL_PATH"]]) 277 cmd.extend(["--android_jar_path", self.host_tools["android.jar"]]) 278 cmd.extend(["--manifest", container_files["apex_manifest.pb"]]) 279 if "apex_manifest.json" in container_files: 280 cmd.extend(["--manifest_json", container_files["apex_manifest.json"]]) 281 cmd.extend(["--build_info", container_files["apex_build_info.pb"]]) 282 if not payload_only and "assets" in container_files: 283 cmd.extend(["--assets_dir", container_files["assets"]]) 284 if not unsigned_payload_only: 285 cmd.extend(["--key", os.path.join(get_current_dir(), TEST_PRIVATE_KEY)]) 286 cmd.extend(["--pubkey", os.path.join(get_current_dir(), TEST_AVB_PUBLIC_KEY)]) 287 cmd.extend(args) 288 289 # Decide on output file name 290 apex_suffix = ".apex.unsigned" 291 if payload_only: 292 apex_suffix = ".payload" 293 fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=apex_suffix) 294 os.close(fd) 295 self._to_cleanup.append(fn) 296 cmd.extend([payload_dir, fn]) 297 298 run_host_command(cmd) 299 return fn 300 301 def _get_java_toolchain(self): 302 java_toolchain = "java" 303 if os.path.isfile("prebuilts/jdk/jdk11/linux-x86/bin/java"): 304 java_toolchain = "prebuilts/jdk/jdk11/linux-x86/bin/java" 305 elif "ANDROID_JAVA_TOOLCHAIN" in os.environ: 306 java_toolchain = os.path.join(os.environ["ANDROID_JAVA_TOOLCHAIN"], "java") 307 elif "ANDROID_JAVA_HOME" in os.environ: 308 java_toolchain = os.path.join(os.environ["ANDROID_JAVA_HOME"], "bin", "java") 309 elif "JAVA_HOME" in os.environ: 310 java_toolchain = os.path.join(os.environ["JAVA_HOME"], "bin", "java") 311 312 java_dep_lib = os.environ["LD_LIBRARY_PATH"] 313 if "ANDROID_HOST_OUT" in os.environ: 314 java_dep_lib += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64") 315 if "ANDROID_BUILD_TOP" in os.environ: 316 java_dep_lib += ":" + os.path.join(os.environ["ANDROID_BUILD_TOP"], 317 "out/host/linux-x86/lib64") 318 319 return [java_toolchain, java_dep_lib] 320 321 def _sign_apk_container(self, unsigned_apex): 322 fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".apex") 323 os.close(fd) 324 self._to_cleanup.append(fn) 325 java_toolchain, java_dep_lib = self._get_java_toolchain() 326 cmd = [ 327 java_toolchain, 328 "-Djava.library.path=" + java_dep_lib, 329 "-jar", self.host_tools['signapk.jar'], 330 "-a", "4096", "--align-file-size", 331 os.path.join(get_current_dir(), TEST_X509_KEY), 332 os.path.join(get_current_dir(), TEST_PK8_KEY), 333 unsigned_apex, fn] 334 run_and_check_output(cmd) 335 return fn 336 337 def _sign_payload(self, container_files, unsigned_payload): 338 fd, signed_payload = \ 339 tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".payload") 340 os.close(fd) 341 self._to_cleanup.append(signed_payload) 342 shutil.copyfile(unsigned_payload, signed_payload) 343 344 cmd = ['avbtool'] 345 cmd.append('add_hashtree_footer') 346 cmd.append('--do_not_generate_fec') 347 cmd.extend(['--algorithm', 'SHA256_RSA4096']) 348 cmd.extend(['--hash_algorithm', 'sha256']) 349 cmd.extend(['--key', os.path.join(get_current_dir(), TEST_PRIVATE_KEY)]) 350 manifest_apex = ValidateApexManifest(container_files["apex_manifest.pb"]) 351 cmd.extend(['--prop', 'apex.key:' + manifest_apex.name]) 352 # Set up the salt based on manifest content which includes name 353 # and version 354 salt = hashlib.sha256(manifest_apex.SerializeToString()).hexdigest() 355 cmd.extend(['--salt', salt]) 356 cmd.extend(['--image', signed_payload]) 357 cmd.append('--no_hashtree') 358 run_and_check_output(cmd) 359 360 return signed_payload 361 362 def _verify_payload(self, payload): 363 """Verifies that the payload is properly signed by avbtool""" 364 cmd = ["avbtool", "verify_image", "--image", payload, "--accept_zeroed_hashtree"] 365 run_and_check_output(cmd) 366 367 def _run_build_test(self, apex_name): 368 apex_file_path = os.path.join(get_current_dir(), apex_name + ".apex") 369 if DEBUG_TEST: 370 fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_input_", suffix=".apex") 371 os.close(fd) 372 shutil.copyfile(apex_file_path, fn) 373 self._to_cleanup.append(fn) 374 container_files = self._get_container_files(apex_file_path) 375 payload_dir = self._extract_payload(apex_file_path) 376 repack_apex_file_path = self._run_apexer(container_files, payload_dir) 377 resigned_apex_file_path = self._sign_apk_container(repack_apex_file_path) 378 self.assertEqual(get_sha1sum(apex_file_path), get_sha1sum(resigned_apex_file_path)) 379 380 def test_simple_apex(self): 381 self._run_build_test(TEST_APEX) 382 383 def test_legacy_apex(self): 384 self._run_build_test(TEST_APEX_LEGACY) 385 386 def test_output_payload_only(self): 387 """Assert that payload-only output from apexer is same as the payload we get by unzipping 388 apex. 389 """ 390 apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex") 391 container_files = self._get_container_files(apex_file_path) 392 payload_dir = self._extract_payload(apex_file_path) 393 payload_only_file_path = self._run_apexer(container_files, payload_dir, ["--payload_only"]) 394 self._verify_payload(payload_only_file_path) 395 self.assertEqual(get_sha1sum(payload_only_file_path), 396 get_sha1sum(container_files["apex_payload"])) 397 398 def test_output_unsigned_payload_only(self): 399 """Assert that when unsigned-payload-only output from apexer is signed by the avb key, it is 400 same as the payload we get by unzipping apex. 401 """ 402 apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex") 403 container_files = self._get_container_files(apex_file_path) 404 payload_dir = self._extract_payload(apex_file_path) 405 unsigned_payload_only_file_path = self._run_apexer(container_files, payload_dir, 406 ["--unsigned_payload_only"]) 407 with self.assertRaises(RuntimeError) as error: 408 self._verify_payload(unsigned_payload_only_file_path) 409 self.assertIn("Given image does not look like a vbmeta image", str(error.exception)) 410 signed_payload = self._sign_payload(container_files, unsigned_payload_only_file_path) 411 self.assertEqual(get_sha1sum(signed_payload), 412 get_sha1sum(container_files["apex_payload"])) 413 414 # Now assert that given an unsigned image and the original container 415 # files, we can produce an identical unsigned image. 416 unsigned_payload_dir = self._extract_payload_from_img(unsigned_payload_only_file_path) 417 unsigned_payload_only_2_file_path = self._run_apexer(container_files, unsigned_payload_dir, 418 ["--unsigned_payload_only"]) 419 self.assertEqual(get_sha1sum(unsigned_payload_only_file_path), 420 get_sha1sum(unsigned_payload_only_2_file_path)) 421 422 def test_apex_with_logging_parent(self): 423 self._run_build_test(TEST_APEX_WITH_LOGGING_PARENT) 424 425 def test_apex_with_overridden_package_name(self): 426 self._run_build_test(TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME) 427 428if __name__ == '__main__': 429 unittest.main(verbosity=2) 430