• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17"""Unit tests for apexer."""
18
19import hashlib
20import logging
21import os
22import shutil
23import stat
24import subprocess
25import tempfile
26import unittest
27from zipfile import ZipFile
28
29from apex_manifest import ValidateApexManifest
30from apex_manifest import ParseApexManifest
31
32logger = logging.getLogger(__name__)
33
34TEST_APEX = "com.android.example.apex"
35TEST_APEX_LEGACY = "com.android.example-legacy.apex"
36TEST_APEX_WITH_LOGGING_PARENT = "com.android.example-logging_parent.apex"
37TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME = "com.android.example-overridden_package_name.apex"
38
39TEST_PRIVATE_KEY = os.path.join("testdata", "com.android.example.apex.pem")
40TEST_X509_KEY = os.path.join("testdata", "com.android.example.apex.x509.pem")
41TEST_PK8_KEY = os.path.join("testdata", "com.android.example.apex.pk8")
42TEST_AVB_PUBLIC_KEY = os.path.join("testdata", "com.android.example.apex.avbpubkey")
43
44def run(args, verbose=None, **kwargs):
45    """Creates and returns a subprocess.Popen object.
46
47    Args:
48      args: The command represented as a list of strings.
49      verbose: Whether the commands should be shown. Default to the global
50          verbosity if unspecified.
51      kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
52          stdin, etc. stdout and stderr will default to subprocess.PIPE and
53          subprocess.STDOUT respectively unless caller specifies any of them.
54          universal_newlines will default to True, as most of the users in
55          releasetools expect string output.
56
57    Returns:
58      A subprocess.Popen object.
59    """
60    if 'stdout' not in kwargs and 'stderr' not in kwargs:
61        kwargs['stdout'] = subprocess.PIPE
62        kwargs['stderr'] = subprocess.STDOUT
63    if 'universal_newlines' not in kwargs:
64        kwargs['universal_newlines'] = True
65    # Don't log any if caller explicitly says so.
66    if DEBUG_TEST:
67        print("\nRunning: \n%s\n" % " ".join(args))
68    if verbose:
69        logger.info("  Running: \"%s\"", " ".join(args))
70    return subprocess.Popen(args, **kwargs)
71
72
73def run_host_command(args, verbose=None, **kwargs):
74    host_build_top = os.environ.get("ANDROID_BUILD_TOP")
75    if host_build_top:
76        host_command_dir = os.path.join(host_build_top, "out/host/linux-x86/bin")
77        args[0] = os.path.join(host_command_dir, args[0])
78    return run_and_check_output(args, verbose, **kwargs)
79
80
81def run_and_check_output(args, verbose=None, **kwargs):
82    """Runs the given command and returns the output.
83
84    Args:
85      args: The command represented as a list of strings.
86      verbose: Whether the commands should be shown. Default to the global
87          verbosity if unspecified.
88      kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
89          stdin, etc. stdout and stderr will default to subprocess.PIPE and
90          subprocess.STDOUT respectively unless caller specifies any of them.
91
92    Returns:
93      The output string.
94
95    Raises:
96      ExternalError: On non-zero exit from the command.
97    """
98    proc = run(args, verbose=verbose, **kwargs)
99    output, _ = proc.communicate()
100    if output is None:
101        output = ""
102    # Don't log any if caller explicitly says so.
103    if verbose:
104        logger.info("%s", output.rstrip())
105    if proc.returncode != 0:
106        raise RuntimeError(
107            "Failed to run command '{}' (exit code {}):\n{}".format(
108                args, proc.returncode, output))
109    return output
110
111
112def get_sha1sum(file_path):
113    h = hashlib.sha256()
114
115    with open(file_path, 'rb') as file:
116        while True:
117            # Reading is buffered, so we can read smaller chunks.
118            chunk = file.read(h.block_size)
119            if not chunk:
120                break
121            h.update(chunk)
122
123    return h.hexdigest()
124
125
126def get_current_dir():
127    """Returns the current dir, relative to the script dir."""
128    # The script dir is the one we want, which could be different from pwd.
129    current_dir = os.path.dirname(os.path.realpath(__file__))
130    return current_dir
131
132def round_up(size, unit):
133    assert unit & (unit - 1) == 0
134    return (size + unit - 1) & (~(unit - 1))
135
136# In order to debug test failures, set DEBUG_TEST to True and run the test from
137# local workstation bypassing atest, e.g.:
138# $ m apexer_test && out/host/linux-x86/nativetest64/apexer_test/apexer_test
139#
140# the test will print out the command used, and the temporary files used by the
141# test. You need to compare e.g. /tmp/test_simple_apex_input_XXXXXXXX.apex with
142# /tmp/test_simple_apex_repacked_YYYYYYYY.apex to check where they are
143# different.
144# A simple script to analyze the differences:
145#
146# FILE_INPUT=/tmp/test_simple_apex_input_XXXXXXXX.apex
147# FILE_OUTPUT=/tmp/test_simple_apex_repacked_YYYYYYYY.apex
148#
149# cd ~/tmp/
150# rm -rf input output
151# mkdir input output
152# unzip ${FILE_INPUT} -d input/
153# unzip ${FILE_OUTPUT} -d output/
154#
155# diff -r input/ output/
156#
157# For analyzing binary diffs I had mild success using the vbindiff utility.
158DEBUG_TEST = False
159
160
161class ApexerRebuildTest(unittest.TestCase):
162    def setUp(self):
163        self._to_cleanup = []
164        self._get_host_tools(os.path.join(get_current_dir(), "apexer_test_host_tools.zip"))
165
166    def tearDown(self):
167        if not DEBUG_TEST:
168            for i in self._to_cleanup:
169                if os.path.isdir(i):
170                    shutil.rmtree(i, ignore_errors=True)
171                else:
172                    os.remove(i)
173            del self._to_cleanup[:]
174        else:
175            print(self._to_cleanup)
176
177    def _get_host_tools(self, host_tools_file_path):
178        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_host_tools_")
179        self._to_cleanup.append(dir_name)
180        if os.path.isfile(host_tools_file_path):
181            with ZipFile(host_tools_file_path, 'r') as zip_obj:
182                zip_obj.extractall(path=dir_name)
183
184        files = {}
185        for i in ["apexer", "deapexer", "avbtool", "mke2fs", "sefcontext_compile", "e2fsdroid",
186            "resize2fs", "soong_zip", "aapt2", "merge_zips", "zipalign", "debugfs_static",
187                  "signapk.jar", "android.jar", "blkid", "fsck.erofs"]:
188            file_path = os.path.join(dir_name, "bin", i)
189            if os.path.exists(file_path):
190                os.chmod(file_path, stat.S_IRUSR | stat.S_IXUSR);
191                files[i] = file_path
192            else:
193                files[i] = i
194        self.host_tools = files
195        self.host_tools_path = os.path.join(dir_name, "bin")
196
197        path = os.path.join(dir_name, "bin")
198        if "PATH" in os.environ:
199            path += ":" + os.environ["PATH"]
200        os.environ["PATH"] = path
201
202        ld_library_path = os.path.join(dir_name, "lib64")
203        if "LD_LIBRARY_PATH" in os.environ:
204            ld_library_path += ":" + os.environ["LD_LIBRARY_PATH"]
205        if "ANDROID_HOST_OUT" in os.environ:
206            ld_library_path += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64")
207        os.environ["LD_LIBRARY_PATH"] = ld_library_path
208
209    def _get_container_files(self, apex_file_path):
210        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_container_files_")
211        self._to_cleanup.append(dir_name)
212        with ZipFile(apex_file_path, 'r') as zip_obj:
213            zip_obj.extractall(path=dir_name)
214        files = {}
215        for i in ["apex_manifest.json", "apex_manifest.pb",
216                  "apex_build_info.pb", "assets",
217                  "apex_payload.img", "apex_payload.zip"]:
218            file_path = os.path.join(dir_name, i)
219            if os.path.exists(file_path):
220                files[i] = file_path
221        self.assertIn("apex_manifest.pb", files)
222        self.assertIn("apex_build_info.pb", files)
223
224        image_file = None
225        if "apex_payload.img" in files:
226            image_file = files["apex_payload.img"]
227        elif "apex_payload.zip" in files:
228            image_file = files["apex_payload.zip"]
229        self.assertIsNotNone(image_file)
230        files["apex_payload"] = image_file
231
232        return files
233
234    def _extract_payload_from_img(self, img_file_path):
235        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_")
236        self._to_cleanup.append(dir_name)
237        cmd = ["debugfs_static", '-R', 'rdump ./ %s' % dir_name, img_file_path]
238        run_host_command(cmd)
239
240        # Remove payload files added by apexer and e2fs tools.
241        for i in ["apex_manifest.json", "apex_manifest.pb"]:
242            if os.path.exists(os.path.join(dir_name, i)):
243                os.remove(os.path.join(dir_name, i))
244        if os.path.isdir(os.path.join(dir_name, "lost+found")):
245            shutil.rmtree(os.path.join(dir_name, "lost+found"))
246        return dir_name
247
248    def _extract_payload(self, apex_file_path):
249        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_")
250        self._to_cleanup.append(dir_name)
251        cmd = ["deapexer", "--debugfs_path", self.host_tools["debugfs_static"],
252               "--blkid_path",self.host_tools["blkid"], "--fsckerofs_path",
253               self.host_tools["fsck.erofs"], "extract", apex_file_path, dir_name]
254        run_host_command(cmd)
255
256        # Remove payload files added by apexer and e2fs tools.
257        for i in ["apex_manifest.json", "apex_manifest.pb"]:
258            if os.path.exists(os.path.join(dir_name, i)):
259                os.remove(os.path.join(dir_name, i))
260        if os.path.isdir(os.path.join(dir_name, "lost+found")):
261            shutil.rmtree(os.path.join(dir_name, "lost+found"))
262        return dir_name
263
264    def _run_apexer(self, container_files, payload_dir, args=[]):
265        unsigned_payload_only = False
266        payload_only = False
267        if "--unsigned_payload_only" in args:
268            unsigned_payload_only = True
269        if unsigned_payload_only or "--payload_only" in args:
270            payload_only = True
271
272        os.environ["APEXER_TOOL_PATH"] = (self.host_tools_path +
273            ":out/host/linux-x86/bin:prebuilts/sdk/tools/linux/bin")
274        cmd = ["apexer", "--force", "--include_build_info", "--do_not_check_keyname"]
275        if DEBUG_TEST:
276            cmd.append('-v')
277        cmd.extend(["--apexer_tool_path", os.environ["APEXER_TOOL_PATH"]])
278        cmd.extend(["--android_jar_path", self.host_tools["android.jar"]])
279        cmd.extend(["--manifest", container_files["apex_manifest.pb"]])
280        if "apex_manifest.json" in container_files:
281            cmd.extend(["--manifest_json", container_files["apex_manifest.json"]])
282        cmd.extend(["--build_info", container_files["apex_build_info.pb"]])
283        if not payload_only and "assets" in container_files:
284            cmd.extend(["--assets_dir", container_files["assets"]])
285        if not unsigned_payload_only:
286            cmd.extend(["--key", os.path.join(get_current_dir(), TEST_PRIVATE_KEY)])
287            cmd.extend(["--pubkey", os.path.join(get_current_dir(), TEST_AVB_PUBLIC_KEY)])
288        cmd.extend(args)
289
290        # Decide on output file name
291        apex_suffix = ".apex.unsigned"
292        if payload_only:
293            apex_suffix = ".payload"
294        fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=apex_suffix)
295        os.close(fd)
296        self._to_cleanup.append(fn)
297        cmd.extend([payload_dir, fn])
298
299        run_host_command(cmd)
300        return fn
301
302    def _get_java_toolchain(self):
303        java_toolchain = "java"
304        if os.path.isfile("prebuilts/jdk/jdk17/linux-x86/bin/java"):
305            java_toolchain = "prebuilts/jdk/jdk17/linux-x86/bin/java"
306        elif os.path.isfile("/jdk/jdk17/linux-x86/bin/java"):
307            java_toolchain = "/jdk/jdk17/linux-x86/bin/java"
308        elif "ANDROID_JAVA_TOOLCHAIN" in os.environ:
309            java_toolchain = os.path.join(os.environ["ANDROID_JAVA_TOOLCHAIN"], "java")
310        elif "ANDROID_JAVA_HOME" in os.environ:
311            java_toolchain = os.path.join(os.environ["ANDROID_JAVA_HOME"], "bin", "java")
312        elif "JAVA_HOME" in os.environ:
313            java_toolchain = os.path.join(os.environ["JAVA_HOME"], "bin", "java")
314
315        java_dep_lib = os.environ["LD_LIBRARY_PATH"]
316        if "ANDROID_HOST_OUT" in os.environ:
317            java_dep_lib += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64")
318        if "ANDROID_BUILD_TOP" in os.environ:
319            java_dep_lib += ":" + os.path.join(os.environ["ANDROID_BUILD_TOP"],
320                "out/host/linux-x86/lib64")
321
322        return [java_toolchain, java_dep_lib]
323
324    def _sign_apk_container(self, unsigned_apex):
325        fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".apex")
326        os.close(fd)
327        self._to_cleanup.append(fn)
328        java_toolchain, java_dep_lib = self._get_java_toolchain()
329        cmd = [
330            java_toolchain,
331            "-Djava.library.path=" + java_dep_lib,
332            "-jar", self.host_tools['signapk.jar'],
333            "-a", "4096", "--align-file-size",
334            os.path.join(get_current_dir(), TEST_X509_KEY),
335            os.path.join(get_current_dir(), TEST_PK8_KEY),
336            unsigned_apex, fn]
337        run_and_check_output(cmd)
338        return fn
339
340    def _sign_payload(self, container_files, unsigned_payload):
341        fd, signed_payload = \
342            tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".payload")
343        os.close(fd)
344        self._to_cleanup.append(signed_payload)
345        shutil.copyfile(unsigned_payload, signed_payload)
346
347        cmd = ['avbtool']
348        cmd.append('add_hashtree_footer')
349        cmd.append('--do_not_generate_fec')
350        cmd.extend(['--algorithm', 'SHA256_RSA4096'])
351        cmd.extend(['--hash_algorithm', 'sha256'])
352        cmd.extend(['--key', os.path.join(get_current_dir(), TEST_PRIVATE_KEY)])
353        manifest_apex = ParseApexManifest(container_files["apex_manifest.pb"])
354        ValidateApexManifest(manifest_apex)
355        cmd.extend(['--prop', 'apex.key:' + manifest_apex.name])
356        # Set up the salt based on manifest content which includes name
357        # and version
358        salt = hashlib.sha256(manifest_apex.SerializeToString()).hexdigest()
359        cmd.extend(['--salt', salt])
360        cmd.extend(['--image', signed_payload])
361        cmd.append('--no_hashtree')
362        run_and_check_output(cmd)
363
364        return signed_payload
365
366    def _verify_payload(self, payload):
367        """Verifies that the payload is properly signed by avbtool"""
368        cmd = ["avbtool", "verify_image", "--image", payload, "--accept_zeroed_hashtree"]
369        run_and_check_output(cmd)
370
371    def _run_build_test(self, apex_name):
372        apex_file_path = os.path.join(get_current_dir(), apex_name + ".apex")
373        if DEBUG_TEST:
374            fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_input_", suffix=".apex")
375            os.close(fd)
376            shutil.copyfile(apex_file_path, fn)
377            self._to_cleanup.append(fn)
378        container_files = self._get_container_files(apex_file_path)
379        payload_dir = self._extract_payload(apex_file_path)
380        repack_apex_file_path = self._run_apexer(container_files, payload_dir)
381        resigned_apex_file_path = self._sign_apk_container(repack_apex_file_path)
382        self.assertEqual(get_sha1sum(apex_file_path), get_sha1sum(resigned_apex_file_path))
383
384    def test_simple_apex(self):
385        self._run_build_test(TEST_APEX)
386
387    def test_legacy_apex(self):
388        self._run_build_test(TEST_APEX_LEGACY)
389
390    def test_output_payload_only(self):
391        """Assert that payload-only output from apexer is same as the payload we get by unzipping
392        apex.
393        """
394        apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex")
395        container_files = self._get_container_files(apex_file_path)
396        payload_dir = self._extract_payload(apex_file_path)
397        payload_only_file_path = self._run_apexer(container_files, payload_dir, ["--payload_only"])
398        self._verify_payload(payload_only_file_path)
399        self.assertEqual(get_sha1sum(payload_only_file_path),
400                         get_sha1sum(container_files["apex_payload"]))
401
402    def test_output_unsigned_payload_only(self):
403        """Assert that when unsigned-payload-only output from apexer is signed by the avb key, it is
404        same as the payload we get by unzipping apex.
405        """
406        apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex")
407        container_files = self._get_container_files(apex_file_path)
408        payload_dir = self._extract_payload(apex_file_path)
409        unsigned_payload_only_file_path = self._run_apexer(container_files, payload_dir,
410                                                  ["--unsigned_payload_only"])
411        with self.assertRaises(RuntimeError) as error:
412            self._verify_payload(unsigned_payload_only_file_path)
413        self.assertIn("Given image does not look like a vbmeta image", str(error.exception))
414        signed_payload = self._sign_payload(container_files, unsigned_payload_only_file_path)
415        self.assertEqual(get_sha1sum(signed_payload),
416                         get_sha1sum(container_files["apex_payload"]))
417
418        # Now assert that given an unsigned image and the original container
419        # files, we can produce an identical unsigned image.
420        unsigned_payload_dir = self._extract_payload_from_img(unsigned_payload_only_file_path)
421        unsigned_payload_only_2_file_path = self._run_apexer(container_files, unsigned_payload_dir,
422                                                             ["--unsigned_payload_only"])
423        self.assertEqual(get_sha1sum(unsigned_payload_only_file_path),
424                         get_sha1sum(unsigned_payload_only_2_file_path))
425
426    def test_apex_with_logging_parent(self):
427      self._run_build_test(TEST_APEX_WITH_LOGGING_PARENT)
428
429    def test_apex_with_overridden_package_name(self):
430      self._run_build_test(TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME)
431
432
433if __name__ == '__main__':
434    unittest.main(verbosity=2)
435