1#!/usr/bin/env python 2# 3# Copyright (C) 2020 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17"""Unit tests for apexer.""" 18 19import hashlib 20import logging 21import os 22import re 23import shutil 24import subprocess 25import tempfile 26import unittest 27from zipfile import ZipFile 28 29from apex_manifest import ValidateApexManifest 30 31logger = logging.getLogger(__name__) 32 33TEST_APEX = "com.android.example.apex" 34TEST_APEX_LEGACY = "com.android.example-legacy.apex" 35TEST_APEX_WITH_LOGGING_PARENT = "com.android.example-logging_parent.apex" 36TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME = "com.android.example-overridden_package_name.apex" 37 38TEST_PRIVATE_KEY = os.path.join("testdata", "com.android.example.apex.pem") 39TEST_X509_KEY = os.path.join("testdata", "com.android.example.apex.x509.pem") 40TEST_PK8_KEY = os.path.join("testdata", "com.android.example.apex.pk8") 41TEST_AVB_PUBLIC_KEY = os.path.join("testdata", "com.android.example.apex.avbpubkey") 42 43 44def run(args, verbose=None, **kwargs): 45 """Creates and returns a subprocess.Popen object. 46 47 Args: 48 args: The command represented as a list of strings. 49 verbose: Whether the commands should be shown. Default to the global 50 verbosity if unspecified. 51 kwargs: Any additional args to be passed to subprocess.Popen(), such as env, 52 stdin, etc. stdout and stderr will default to subprocess.PIPE and 53 subprocess.STDOUT respectively unless caller specifies any of them. 54 universal_newlines will default to True, as most of the users in 55 releasetools expect string output. 56 57 Returns: 58 A subprocess.Popen object. 59 """ 60 if 'stdout' not in kwargs and 'stderr' not in kwargs: 61 kwargs['stdout'] = subprocess.PIPE 62 kwargs['stderr'] = subprocess.STDOUT 63 if 'universal_newlines' not in kwargs: 64 kwargs['universal_newlines'] = True 65 # Don't log any if caller explicitly says so. 66 if DEBUG_TEST: 67 print("\nRunning: \n%s\n" % " ".join(args)) 68 if verbose: 69 logger.info(" Running: \"%s\"", " ".join(args)) 70 return subprocess.Popen(args, **kwargs) 71 72 73def run_host_command(args, verbose=None, **kwargs): 74 host_build_top = os.environ.get("ANDROID_BUILD_TOP") 75 if host_build_top: 76 host_command_dir = os.path.join(host_build_top, "out/soong/host/linux-x86/bin") 77 args[0] = os.path.join(host_command_dir, args[0]) 78 return run_and_check_output(args, verbose, **kwargs) 79 80 81def run_and_check_output(args, verbose=None, **kwargs): 82 """Runs the given command and returns the output. 83 84 Args: 85 args: The command represented as a list of strings. 86 verbose: Whether the commands should be shown. Default to the global 87 verbosity if unspecified. 88 kwargs: Any additional args to be passed to subprocess.Popen(), such as env, 89 stdin, etc. stdout and stderr will default to subprocess.PIPE and 90 subprocess.STDOUT respectively unless caller specifies any of them. 91 92 Returns: 93 The output string. 94 95 Raises: 96 ExternalError: On non-zero exit from the command. 97 """ 98 proc = run(args, verbose=verbose, **kwargs) 99 output, _ = proc.communicate() 100 if output is None: 101 output = "" 102 # Don't log any if caller explicitly says so. 103 if verbose: 104 logger.info("%s", output.rstrip()) 105 if proc.returncode != 0: 106 raise RuntimeError( 107 "Failed to run command '{}' (exit code {}):\n{}".format( 108 args, proc.returncode, output)) 109 return output 110 111 112def get_sha1sum(file_path): 113 h = hashlib.sha256() 114 115 with open(file_path, 'rb') as file: 116 while True: 117 # Reading is buffered, so we can read smaller chunks. 118 chunk = file.read(h.block_size) 119 if not chunk: 120 break 121 h.update(chunk) 122 123 return h.hexdigest() 124 125 126def get_current_dir(): 127 """Returns the current dir, relative to the script dir.""" 128 # The script dir is the one we want, which could be different from pwd. 129 current_dir = os.path.dirname(os.path.realpath(__file__)) 130 return current_dir 131 132def round_up(size, unit): 133 assert unit & (unit - 1) == 0 134 return (size + unit - 1) & (~(unit - 1)) 135 136# In order to debug test failures, set DEBUG_TEST to True and run the test from 137# local workstation bypassing atest, e.g.: 138# $ m apexer_test && out/host/linux-x86/nativetest64/apexer_test/apexer_test 139# 140# the test will print out the command used, and the temporary files used by the 141# test. You need to compare e.g. /tmp/test_simple_apex_input_XXXXXXXX.apex with 142# /tmp/test_simple_apex_repacked_YYYYYYYY.apex to check where they are 143# different. 144# A simple script to analyze the differences: 145# 146# FILE_INPUT=/tmp/test_simple_apex_input_XXXXXXXX.apex 147# FILE_OUTPUT=/tmp/test_simple_apex_repacked_YYYYYYYY.apex 148# 149# cd ~/tmp/ 150# rm -rf input output 151# mkdir input output 152# unzip ${FILE_INPUT} -d input/ 153# unzip ${FILE_OUTPUT} -d output/ 154# 155# diff -r input/ output/ 156# 157# For analyzing binary diffs I had mild success using the vbindiff utility. 158DEBUG_TEST = False 159 160 161class ApexerRebuildTest(unittest.TestCase): 162 def setUp(self): 163 self._to_cleanup = [] 164 165 def tearDown(self): 166 if not DEBUG_TEST: 167 for i in self._to_cleanup: 168 if os.path.isdir(i): 169 shutil.rmtree(i, ignore_errors=True) 170 else: 171 os.remove(i) 172 del self._to_cleanup[:] 173 else: 174 print(self._to_cleanup) 175 176 177 def _get_container_files(self, apex_file_path): 178 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_container_files_") 179 self._to_cleanup.append(dir_name) 180 with ZipFile(apex_file_path, 'r') as zip_obj: 181 zip_obj.extractall(path=dir_name) 182 files = {} 183 for i in ["apex_manifest.json", "apex_manifest.pb", 184 "apex_build_info.pb", "assets", 185 "apex_payload.img", "apex_payload.zip"]: 186 file_path = os.path.join(dir_name, i) 187 if os.path.exists(file_path): 188 files[i] = file_path 189 self.assertIn("apex_manifest.pb", files) 190 self.assertIn("apex_build_info.pb", files) 191 192 image_file = None 193 if "apex_payload.img" in files: 194 image_file = files["apex_payload.img"] 195 elif "apex_payload.zip" in files: 196 image_file = files["apex_payload.zip"] 197 self.assertIsNotNone(image_file) 198 files["apex_payload"] = image_file 199 200 return files 201 202 def _extract_payload_from_img(self, img_file_path): 203 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_") 204 self._to_cleanup.append(dir_name) 205 cmd = ["debugfs_static", '-R', 'rdump ./ %s' % dir_name, img_file_path] 206 run_host_command(cmd) 207 208 # Remove payload files added by apexer and e2fs tools. 209 for i in ["apex_manifest.json", "apex_manifest.pb"]: 210 if os.path.exists(os.path.join(dir_name, i)): 211 os.remove(os.path.join(dir_name, i)) 212 if os.path.isdir(os.path.join(dir_name, "lost+found")): 213 shutil.rmtree(os.path.join(dir_name, "lost+found")) 214 return dir_name 215 216 def _extract_payload(self, apex_file_path): 217 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_") 218 self._to_cleanup.append(dir_name) 219 cmd = ["deapexer", "extract", apex_file_path, dir_name] 220 run_host_command(cmd) 221 222 # Remove payload files added by apexer and e2fs tools. 223 for i in ["apex_manifest.json", "apex_manifest.pb"]: 224 if os.path.exists(os.path.join(dir_name, i)): 225 os.remove(os.path.join(dir_name, i)) 226 if os.path.isdir(os.path.join(dir_name, "lost+found")): 227 shutil.rmtree(os.path.join(dir_name, "lost+found")) 228 return dir_name 229 230 def _run_apexer(self, container_files, payload_dir, args=[]): 231 unsigned_payload_only = False 232 payload_only = False 233 if "--unsigned_payload_only" in args: 234 unsigned_payload_only = True 235 if unsigned_payload_only or "--payload_only" in args: 236 payload_only = True 237 238 os.environ["APEXER_TOOL_PATH"] = ( 239 "out/soong/host/linux-x86/bin:prebuilts/sdk/tools/linux/bin") 240 cmd = ["apexer", "--force", "--include_build_info", "--do_not_check_keyname"] 241 cmd.extend(["--manifest", container_files["apex_manifest.pb"]]) 242 if "apex_manifest.json" in container_files: 243 cmd.extend(["--manifest_json", container_files["apex_manifest.json"]]) 244 cmd.extend(["--build_info", container_files["apex_build_info.pb"]]) 245 if not payload_only and "assets" in container_files: 246 cmd.extend(["--assets_dir", "assets"]) 247 if not unsigned_payload_only: 248 cmd.extend(["--key", os.path.join(get_current_dir(), TEST_PRIVATE_KEY)]) 249 cmd.extend(["--pubkey", os.path.join(get_current_dir(), TEST_AVB_PUBLIC_KEY)]) 250 cmd.extend(args) 251 252 # Decide on output file name 253 apex_suffix = ".apex.unsigned" 254 if payload_only: 255 apex_suffix = ".payload" 256 fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=apex_suffix) 257 os.close(fd) 258 self._to_cleanup.append(fn) 259 cmd.extend([payload_dir, fn]) 260 261 run_host_command(cmd) 262 return fn 263 264 def _sign_apk_container(self, unsigned_apex): 265 fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".apex") 266 os.close(fd) 267 self._to_cleanup.append(fn) 268 cmd = [ 269 "prebuilts/jdk/jdk11/linux-x86/bin/java", 270 "-Djava.library.path=out/soong/host/linux-x86/lib64", 271 "-jar", "out/soong/host/linux-x86/framework/signapk.jar", 272 "-a", "4096", 273 os.path.join(get_current_dir(), TEST_X509_KEY), 274 os.path.join(get_current_dir(), TEST_PK8_KEY), 275 unsigned_apex, fn] 276 run_and_check_output(cmd) 277 return fn 278 279 def _sign_payload(self, container_files, unsigned_payload): 280 fd, signed_payload = \ 281 tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".payload") 282 os.close(fd) 283 self._to_cleanup.append(signed_payload) 284 shutil.copyfile(unsigned_payload, signed_payload) 285 286 cmd = ['avbtool'] 287 cmd.append('add_hashtree_footer') 288 cmd.append('--do_not_generate_fec') 289 cmd.extend(['--algorithm', 'SHA256_RSA4096']) 290 cmd.extend(['--hash_algorithm', 'sha256']) 291 cmd.extend(['--key', os.path.join(get_current_dir(), TEST_PRIVATE_KEY)]) 292 manifest_apex = ValidateApexManifest(container_files["apex_manifest.pb"]) 293 cmd.extend(['--prop', 'apex.key:' + manifest_apex.name]) 294 # Set up the salt based on manifest content which includes name 295 # and version 296 salt = hashlib.sha256(manifest_apex.SerializeToString()).hexdigest() 297 cmd.extend(['--salt', salt]) 298 cmd.extend(['--image', signed_payload]) 299 cmd.append('--no_hashtree') 300 run_and_check_output(cmd) 301 302 return signed_payload 303 304 def _verify_payload(self, payload): 305 """Verifies that the payload is properly signed by avbtool""" 306 cmd = ["avbtool", "verify_image", "--image", payload, "--accept_zeroed_hashtree"] 307 run_and_check_output(cmd) 308 309 def _run_build_test(self, apex_name): 310 apex_file_path = os.path.join(get_current_dir(), apex_name + ".apex") 311 if DEBUG_TEST: 312 fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_input_", suffix=".apex") 313 os.close(fd) 314 shutil.copyfile(apex_file_path, fn) 315 self._to_cleanup.append(fn) 316 container_files = self._get_container_files(apex_file_path) 317 payload_dir = self._extract_payload(apex_file_path) 318 repack_apex_file_path = self._run_apexer(container_files, payload_dir) 319 resigned_apex_file_path = self._sign_apk_container(repack_apex_file_path) 320 self.assertEqual(get_sha1sum(apex_file_path), get_sha1sum(resigned_apex_file_path)) 321 322 def test_simple_apex(self): 323 self._run_build_test(TEST_APEX) 324 325 def test_legacy_apex(self): 326 self._run_build_test(TEST_APEX_LEGACY) 327 328 def test_output_payload_only(self): 329 """Assert that payload-only output from apexer is same as the payload we get by unzipping 330 apex. 331 """ 332 apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex") 333 container_files = self._get_container_files(apex_file_path) 334 payload_dir = self._extract_payload(apex_file_path) 335 payload_only_file_path = self._run_apexer(container_files, payload_dir, ["--payload_only"]) 336 self._verify_payload(payload_only_file_path) 337 self.assertEqual(get_sha1sum(payload_only_file_path), 338 get_sha1sum(container_files["apex_payload"])) 339 340 def test_output_unsigned_payload_only(self): 341 """Assert that when unsigned-payload-only output from apexer is signed by the avb key, it is 342 same as the payload we get by unzipping apex. 343 """ 344 apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex") 345 container_files = self._get_container_files(apex_file_path) 346 payload_dir = self._extract_payload(apex_file_path) 347 unsigned_payload_only_file_path = self._run_apexer(container_files, payload_dir, 348 ["--unsigned_payload_only"]) 349 with self.assertRaises(RuntimeError) as error: 350 self._verify_payload(unsigned_payload_only_file_path) 351 self.assertIn("Given image does not look like a vbmeta image", str(error.exception)) 352 signed_payload = self._sign_payload(container_files, unsigned_payload_only_file_path) 353 self.assertEqual(get_sha1sum(signed_payload), 354 get_sha1sum(container_files["apex_payload"])) 355 356 # Now assert that given an unsigned image and the original container 357 # files, we can produce an identical unsigned image. 358 unsigned_payload_dir = self._extract_payload_from_img(unsigned_payload_only_file_path) 359 unsigned_payload_only_2_file_path = self._run_apexer(container_files, unsigned_payload_dir, 360 ["--unsigned_payload_only"]) 361 self.assertEqual(get_sha1sum(unsigned_payload_only_file_path), 362 get_sha1sum(unsigned_payload_only_2_file_path)) 363 364 def test_apex_with_logging_parent(self): 365 self._run_build_test(TEST_APEX_WITH_LOGGING_PARENT) 366 367 def test_apex_with_overridden_package_name(self): 368 self._run_build_test(TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME) 369 370 371if __name__ == '__main__': 372 unittest.main(verbosity=2) 373