• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17"""Unit tests for apexer."""
18
19import hashlib
20import logging
21import os
22import re
23import shutil
24import stat
25import subprocess
26import tempfile
27import unittest
28from zipfile import ZipFile
29
30from apex_manifest import ValidateApexManifest
31
32logger = logging.getLogger(__name__)
33
34TEST_APEX = "com.android.example.apex"
35TEST_APEX_LEGACY = "com.android.example-legacy.apex"
36TEST_APEX_WITH_LOGGING_PARENT = "com.android.example-logging_parent.apex"
37TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME = "com.android.example-overridden_package_name.apex"
38
39TEST_PRIVATE_KEY = os.path.join("testdata", "com.android.example.apex.pem")
40TEST_X509_KEY = os.path.join("testdata", "com.android.example.apex.x509.pem")
41TEST_PK8_KEY = os.path.join("testdata", "com.android.example.apex.pk8")
42TEST_AVB_PUBLIC_KEY = os.path.join("testdata", "com.android.example.apex.avbpubkey")
43
44def run(args, verbose=None, **kwargs):
45    """Creates and returns a subprocess.Popen object.
46
47    Args:
48      args: The command represented as a list of strings.
49      verbose: Whether the commands should be shown. Default to the global
50          verbosity if unspecified.
51      kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
52          stdin, etc. stdout and stderr will default to subprocess.PIPE and
53          subprocess.STDOUT respectively unless caller specifies any of them.
54          universal_newlines will default to True, as most of the users in
55          releasetools expect string output.
56
57    Returns:
58      A subprocess.Popen object.
59    """
60    if 'stdout' not in kwargs and 'stderr' not in kwargs:
61        kwargs['stdout'] = subprocess.PIPE
62        kwargs['stderr'] = subprocess.STDOUT
63    if 'universal_newlines' not in kwargs:
64        kwargs['universal_newlines'] = True
65    # Don't log any if caller explicitly says so.
66    if DEBUG_TEST:
67        print("\nRunning: \n%s\n" % " ".join(args))
68    if verbose:
69        logger.info("  Running: \"%s\"", " ".join(args))
70    return subprocess.Popen(args, **kwargs)
71
72
73def run_host_command(args, verbose=None, **kwargs):
74    host_build_top = os.environ.get("ANDROID_BUILD_TOP")
75    if host_build_top:
76        host_command_dir = os.path.join(host_build_top, "out/host/linux-x86/bin")
77        args[0] = os.path.join(host_command_dir, args[0])
78    return run_and_check_output(args, verbose, **kwargs)
79
80
81def run_and_check_output(args, verbose=None, **kwargs):
82    """Runs the given command and returns the output.
83
84    Args:
85      args: The command represented as a list of strings.
86      verbose: Whether the commands should be shown. Default to the global
87          verbosity if unspecified.
88      kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
89          stdin, etc. stdout and stderr will default to subprocess.PIPE and
90          subprocess.STDOUT respectively unless caller specifies any of them.
91
92    Returns:
93      The output string.
94
95    Raises:
96      ExternalError: On non-zero exit from the command.
97    """
98    proc = run(args, verbose=verbose, **kwargs)
99    output, _ = proc.communicate()
100    if output is None:
101        output = ""
102    # Don't log any if caller explicitly says so.
103    if verbose:
104        logger.info("%s", output.rstrip())
105    if proc.returncode != 0:
106        raise RuntimeError(
107            "Failed to run command '{}' (exit code {}):\n{}".format(
108                args, proc.returncode, output))
109    return output
110
111
112def get_sha1sum(file_path):
113    h = hashlib.sha256()
114
115    with open(file_path, 'rb') as file:
116        while True:
117            # Reading is buffered, so we can read smaller chunks.
118            chunk = file.read(h.block_size)
119            if not chunk:
120                break
121            h.update(chunk)
122
123    return h.hexdigest()
124
125
126def get_current_dir():
127    """Returns the current dir, relative to the script dir."""
128    # The script dir is the one we want, which could be different from pwd.
129    current_dir = os.path.dirname(os.path.realpath(__file__))
130    return current_dir
131
132def round_up(size, unit):
133    assert unit & (unit - 1) == 0
134    return (size + unit - 1) & (~(unit - 1))
135
136# In order to debug test failures, set DEBUG_TEST to True and run the test from
137# local workstation bypassing atest, e.g.:
138# $ m apexer_test && out/host/linux-x86/nativetest64/apexer_test/apexer_test
139#
140# the test will print out the command used, and the temporary files used by the
141# test. You need to compare e.g. /tmp/test_simple_apex_input_XXXXXXXX.apex with
142# /tmp/test_simple_apex_repacked_YYYYYYYY.apex to check where they are
143# different.
144# A simple script to analyze the differences:
145#
146# FILE_INPUT=/tmp/test_simple_apex_input_XXXXXXXX.apex
147# FILE_OUTPUT=/tmp/test_simple_apex_repacked_YYYYYYYY.apex
148#
149# cd ~/tmp/
150# rm -rf input output
151# mkdir input output
152# unzip ${FILE_INPUT} -d input/
153# unzip ${FILE_OUTPUT} -d output/
154#
155# diff -r input/ output/
156#
157# For analyzing binary diffs I had mild success using the vbindiff utility.
158DEBUG_TEST = False
159
160
161class ApexerRebuildTest(unittest.TestCase):
162    def setUp(self):
163        self._to_cleanup = []
164        self._get_host_tools(os.path.join(get_current_dir(), "apexer_test_host_tools.zip"))
165
166    def tearDown(self):
167        if not DEBUG_TEST:
168            for i in self._to_cleanup:
169                if os.path.isdir(i):
170                    shutil.rmtree(i, ignore_errors=True)
171                else:
172                    os.remove(i)
173            del self._to_cleanup[:]
174        else:
175            print(self._to_cleanup)
176
177    def _get_host_tools(self, host_tools_file_path):
178        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_host_tools_")
179        self._to_cleanup.append(dir_name)
180        if os.path.isfile(host_tools_file_path):
181            with ZipFile(host_tools_file_path, 'r') as zip_obj:
182                zip_obj.extractall(path=dir_name)
183
184        files = {}
185        for i in ["apexer", "deapexer", "avbtool", "mke2fs", "sefcontext_compile", "e2fsdroid",
186            "resize2fs", "soong_zip", "aapt2", "merge_zips", "zipalign", "debugfs_static",
187            "signapk.jar", "android.jar"]:
188            file_path = os.path.join(dir_name, "bin", i)
189            if os.path.exists(file_path):
190                os.chmod(file_path, stat.S_IRUSR | stat.S_IXUSR);
191                files[i] = file_path
192            else:
193                files[i] = i
194        self.host_tools = files
195        self.host_tools_path = os.path.join(dir_name, "bin")
196
197        path = os.path.join(dir_name, "bin")
198        if "PATH" in os.environ:
199            path += ":" + os.environ["PATH"]
200        os.environ["PATH"] = path
201
202        ld_library_path = os.path.join(dir_name, "lib64")
203        if "LD_LIBRARY_PATH" in os.environ:
204            ld_library_path += ":" + os.environ["LD_LIBRARY_PATH"]
205        if "ANDROID_HOST_OUT" in os.environ:
206            ld_library_path += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64")
207        os.environ["LD_LIBRARY_PATH"] = ld_library_path
208
209    def _get_container_files(self, apex_file_path):
210        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_container_files_")
211        self._to_cleanup.append(dir_name)
212        with ZipFile(apex_file_path, 'r') as zip_obj:
213            zip_obj.extractall(path=dir_name)
214        files = {}
215        for i in ["apex_manifest.json", "apex_manifest.pb",
216                  "apex_build_info.pb", "assets",
217                  "apex_payload.img", "apex_payload.zip"]:
218            file_path = os.path.join(dir_name, i)
219            if os.path.exists(file_path):
220                files[i] = file_path
221        self.assertIn("apex_manifest.pb", files)
222        self.assertIn("apex_build_info.pb", files)
223
224        image_file = None
225        if "apex_payload.img" in files:
226            image_file = files["apex_payload.img"]
227        elif "apex_payload.zip" in files:
228            image_file = files["apex_payload.zip"]
229        self.assertIsNotNone(image_file)
230        files["apex_payload"] = image_file
231
232        return files
233
234    def _extract_payload_from_img(self, img_file_path):
235        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_")
236        self._to_cleanup.append(dir_name)
237        cmd = ["debugfs_static", '-R', 'rdump ./ %s' % dir_name, img_file_path]
238        run_host_command(cmd)
239
240        # Remove payload files added by apexer and e2fs tools.
241        for i in ["apex_manifest.json", "apex_manifest.pb"]:
242            if os.path.exists(os.path.join(dir_name, i)):
243                os.remove(os.path.join(dir_name, i))
244        if os.path.isdir(os.path.join(dir_name, "lost+found")):
245            shutil.rmtree(os.path.join(dir_name, "lost+found"))
246        return dir_name
247
248    def _extract_payload(self, apex_file_path):
249        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_")
250        self._to_cleanup.append(dir_name)
251        cmd = ["deapexer", "--debugfs_path", self.host_tools["debugfs_static"],
252            "extract", apex_file_path, dir_name]
253        run_host_command(cmd)
254
255        # Remove payload files added by apexer and e2fs tools.
256        for i in ["apex_manifest.json", "apex_manifest.pb"]:
257            if os.path.exists(os.path.join(dir_name, i)):
258                os.remove(os.path.join(dir_name, i))
259        if os.path.isdir(os.path.join(dir_name, "lost+found")):
260            shutil.rmtree(os.path.join(dir_name, "lost+found"))
261        return dir_name
262
263    def _run_apexer(self, container_files, payload_dir, args=[]):
264        unsigned_payload_only = False
265        payload_only = False
266        if "--unsigned_payload_only" in args:
267            unsigned_payload_only = True
268        if unsigned_payload_only or "--payload_only" in args:
269            payload_only = True
270
271        os.environ["APEXER_TOOL_PATH"] = (self.host_tools_path +
272            ":out/host/linux-x86/bin:prebuilts/sdk/tools/linux/bin")
273        cmd = ["apexer", "--force", "--include_build_info", "--do_not_check_keyname"]
274        if DEBUG_TEST:
275            cmd.append('-v')
276        cmd.extend(["--apexer_tool_path", os.environ["APEXER_TOOL_PATH"]])
277        cmd.extend(["--android_jar_path", self.host_tools["android.jar"]])
278        cmd.extend(["--manifest", container_files["apex_manifest.pb"]])
279        if "apex_manifest.json" in container_files:
280            cmd.extend(["--manifest_json", container_files["apex_manifest.json"]])
281        cmd.extend(["--build_info", container_files["apex_build_info.pb"]])
282        if not payload_only and "assets" in container_files:
283            cmd.extend(["--assets_dir", container_files["assets"]])
284        if not unsigned_payload_only:
285            cmd.extend(["--key", os.path.join(get_current_dir(), TEST_PRIVATE_KEY)])
286            cmd.extend(["--pubkey", os.path.join(get_current_dir(), TEST_AVB_PUBLIC_KEY)])
287        cmd.extend(args)
288
289        # Decide on output file name
290        apex_suffix = ".apex.unsigned"
291        if payload_only:
292            apex_suffix = ".payload"
293        fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=apex_suffix)
294        os.close(fd)
295        self._to_cleanup.append(fn)
296        cmd.extend([payload_dir, fn])
297
298        run_host_command(cmd)
299        return fn
300
301    def _get_java_toolchain(self):
302        java_toolchain = "java"
303        if os.path.isfile("prebuilts/jdk/jdk11/linux-x86/bin/java"):
304            java_toolchain = "prebuilts/jdk/jdk11/linux-x86/bin/java"
305        elif "ANDROID_JAVA_TOOLCHAIN" in os.environ:
306            java_toolchain = os.path.join(os.environ["ANDROID_JAVA_TOOLCHAIN"], "java")
307        elif "ANDROID_JAVA_HOME" in os.environ:
308            java_toolchain = os.path.join(os.environ["ANDROID_JAVA_HOME"], "bin", "java")
309        elif "JAVA_HOME" in os.environ:
310            java_toolchain = os.path.join(os.environ["JAVA_HOME"], "bin", "java")
311
312        java_dep_lib = os.environ["LD_LIBRARY_PATH"]
313        if "ANDROID_HOST_OUT" in os.environ:
314            java_dep_lib += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64")
315        if "ANDROID_BUILD_TOP" in os.environ:
316            java_dep_lib += ":" + os.path.join(os.environ["ANDROID_BUILD_TOP"],
317                "out/host/linux-x86/lib64")
318
319        return [java_toolchain, java_dep_lib]
320
321    def _sign_apk_container(self, unsigned_apex):
322        fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".apex")
323        os.close(fd)
324        self._to_cleanup.append(fn)
325        java_toolchain, java_dep_lib = self._get_java_toolchain()
326        cmd = [
327            java_toolchain,
328            "-Djava.library.path=" + java_dep_lib,
329            "-jar", self.host_tools['signapk.jar'],
330            "-a", "4096", "--align-file-size",
331            os.path.join(get_current_dir(), TEST_X509_KEY),
332            os.path.join(get_current_dir(), TEST_PK8_KEY),
333            unsigned_apex, fn]
334        run_and_check_output(cmd)
335        return fn
336
337    def _sign_payload(self, container_files, unsigned_payload):
338        fd, signed_payload = \
339            tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".payload")
340        os.close(fd)
341        self._to_cleanup.append(signed_payload)
342        shutil.copyfile(unsigned_payload, signed_payload)
343
344        cmd = ['avbtool']
345        cmd.append('add_hashtree_footer')
346        cmd.append('--do_not_generate_fec')
347        cmd.extend(['--algorithm', 'SHA256_RSA4096'])
348        cmd.extend(['--hash_algorithm', 'sha256'])
349        cmd.extend(['--key', os.path.join(get_current_dir(), TEST_PRIVATE_KEY)])
350        manifest_apex = ValidateApexManifest(container_files["apex_manifest.pb"])
351        cmd.extend(['--prop', 'apex.key:' + manifest_apex.name])
352        # Set up the salt based on manifest content which includes name
353        # and version
354        salt = hashlib.sha256(manifest_apex.SerializeToString()).hexdigest()
355        cmd.extend(['--salt', salt])
356        cmd.extend(['--image', signed_payload])
357        cmd.append('--no_hashtree')
358        run_and_check_output(cmd)
359
360        return signed_payload
361
362    def _verify_payload(self, payload):
363        """Verifies that the payload is properly signed by avbtool"""
364        cmd = ["avbtool", "verify_image", "--image", payload, "--accept_zeroed_hashtree"]
365        run_and_check_output(cmd)
366
367    def _run_build_test(self, apex_name):
368        apex_file_path = os.path.join(get_current_dir(), apex_name + ".apex")
369        if DEBUG_TEST:
370            fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_input_", suffix=".apex")
371            os.close(fd)
372            shutil.copyfile(apex_file_path, fn)
373            self._to_cleanup.append(fn)
374        container_files = self._get_container_files(apex_file_path)
375        payload_dir = self._extract_payload(apex_file_path)
376        repack_apex_file_path = self._run_apexer(container_files, payload_dir)
377        resigned_apex_file_path = self._sign_apk_container(repack_apex_file_path)
378        self.assertEqual(get_sha1sum(apex_file_path), get_sha1sum(resigned_apex_file_path))
379
380    def test_simple_apex(self):
381        self._run_build_test(TEST_APEX)
382
383    def test_legacy_apex(self):
384        self._run_build_test(TEST_APEX_LEGACY)
385
386    def test_output_payload_only(self):
387        """Assert that payload-only output from apexer is same as the payload we get by unzipping
388        apex.
389        """
390        apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex")
391        container_files = self._get_container_files(apex_file_path)
392        payload_dir = self._extract_payload(apex_file_path)
393        payload_only_file_path = self._run_apexer(container_files, payload_dir, ["--payload_only"])
394        self._verify_payload(payload_only_file_path)
395        self.assertEqual(get_sha1sum(payload_only_file_path),
396                         get_sha1sum(container_files["apex_payload"]))
397
398    def test_output_unsigned_payload_only(self):
399        """Assert that when unsigned-payload-only output from apexer is signed by the avb key, it is
400        same as the payload we get by unzipping apex.
401        """
402        apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex")
403        container_files = self._get_container_files(apex_file_path)
404        payload_dir = self._extract_payload(apex_file_path)
405        unsigned_payload_only_file_path = self._run_apexer(container_files, payload_dir,
406                                                  ["--unsigned_payload_only"])
407        with self.assertRaises(RuntimeError) as error:
408            self._verify_payload(unsigned_payload_only_file_path)
409        self.assertIn("Given image does not look like a vbmeta image", str(error.exception))
410        signed_payload = self._sign_payload(container_files, unsigned_payload_only_file_path)
411        self.assertEqual(get_sha1sum(signed_payload),
412                         get_sha1sum(container_files["apex_payload"]))
413
414        # Now assert that given an unsigned image and the original container
415        # files, we can produce an identical unsigned image.
416        unsigned_payload_dir = self._extract_payload_from_img(unsigned_payload_only_file_path)
417        unsigned_payload_only_2_file_path = self._run_apexer(container_files, unsigned_payload_dir,
418                                                             ["--unsigned_payload_only"])
419        self.assertEqual(get_sha1sum(unsigned_payload_only_file_path),
420                         get_sha1sum(unsigned_payload_only_2_file_path))
421
422    def test_apex_with_logging_parent(self):
423      self._run_build_test(TEST_APEX_WITH_LOGGING_PARENT)
424
425    def test_apex_with_overridden_package_name(self):
426      self._run_build_test(TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME)
427
428if __name__ == '__main__':
429    unittest.main(verbosity=2)
430