• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17"""Unit tests for apexer."""
18
19import hashlib
20import logging
21import os
22import re
23import shutil
24import subprocess
25import tempfile
26import unittest
27from zipfile import ZipFile
28
29from apex_manifest import ValidateApexManifest
30
31logger = logging.getLogger(__name__)
32
33TEST_APEX = "com.android.example.apex"
34TEST_APEX_LEGACY = "com.android.example-legacy.apex"
35TEST_APEX_WITH_LOGGING_PARENT = "com.android.example-logging_parent.apex"
36TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME = "com.android.example-overridden_package_name.apex"
37
38TEST_PRIVATE_KEY = os.path.join("testdata", "com.android.example.apex.pem")
39TEST_X509_KEY = os.path.join("testdata", "com.android.example.apex.x509.pem")
40TEST_PK8_KEY = os.path.join("testdata", "com.android.example.apex.pk8")
41TEST_AVB_PUBLIC_KEY = os.path.join("testdata", "com.android.example.apex.avbpubkey")
42
43
44def run(args, verbose=None, **kwargs):
45    """Creates and returns a subprocess.Popen object.
46
47    Args:
48      args: The command represented as a list of strings.
49      verbose: Whether the commands should be shown. Default to the global
50          verbosity if unspecified.
51      kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
52          stdin, etc. stdout and stderr will default to subprocess.PIPE and
53          subprocess.STDOUT respectively unless caller specifies any of them.
54          universal_newlines will default to True, as most of the users in
55          releasetools expect string output.
56
57    Returns:
58      A subprocess.Popen object.
59    """
60    if 'stdout' not in kwargs and 'stderr' not in kwargs:
61        kwargs['stdout'] = subprocess.PIPE
62        kwargs['stderr'] = subprocess.STDOUT
63    if 'universal_newlines' not in kwargs:
64        kwargs['universal_newlines'] = True
65    # Don't log any if caller explicitly says so.
66    if DEBUG_TEST:
67        print("\nRunning: \n%s\n" % " ".join(args))
68    if verbose:
69        logger.info("  Running: \"%s\"", " ".join(args))
70    return subprocess.Popen(args, **kwargs)
71
72
73def run_host_command(args, verbose=None, **kwargs):
74    host_build_top = os.environ.get("ANDROID_BUILD_TOP")
75    if host_build_top:
76        host_command_dir = os.path.join(host_build_top, "out/soong/host/linux-x86/bin")
77        args[0] = os.path.join(host_command_dir, args[0])
78    return run_and_check_output(args, verbose, **kwargs)
79
80
81def run_and_check_output(args, verbose=None, **kwargs):
82    """Runs the given command and returns the output.
83
84    Args:
85      args: The command represented as a list of strings.
86      verbose: Whether the commands should be shown. Default to the global
87          verbosity if unspecified.
88      kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
89          stdin, etc. stdout and stderr will default to subprocess.PIPE and
90          subprocess.STDOUT respectively unless caller specifies any of them.
91
92    Returns:
93      The output string.
94
95    Raises:
96      ExternalError: On non-zero exit from the command.
97    """
98    proc = run(args, verbose=verbose, **kwargs)
99    output, _ = proc.communicate()
100    if output is None:
101        output = ""
102    # Don't log any if caller explicitly says so.
103    if verbose:
104        logger.info("%s", output.rstrip())
105    if proc.returncode != 0:
106        raise RuntimeError(
107            "Failed to run command '{}' (exit code {}):\n{}".format(
108                args, proc.returncode, output))
109    return output
110
111
112def get_sha1sum(file_path):
113    h = hashlib.sha256()
114
115    with open(file_path, 'rb') as file:
116        while True:
117            # Reading is buffered, so we can read smaller chunks.
118            chunk = file.read(h.block_size)
119            if not chunk:
120                break
121            h.update(chunk)
122
123    return h.hexdigest()
124
125
126def get_current_dir():
127    """Returns the current dir, relative to the script dir."""
128    # The script dir is the one we want, which could be different from pwd.
129    current_dir = os.path.dirname(os.path.realpath(__file__))
130    return current_dir
131
132def round_up(size, unit):
133    assert unit & (unit - 1) == 0
134    return (size + unit - 1) & (~(unit - 1))
135
136# In order to debug test failures, set DEBUG_TEST to True and run the test from
137# local workstation bypassing atest, e.g.:
138# $ m apexer_test && out/host/linux-x86/nativetest64/apexer_test/apexer_test
139#
140# the test will print out the command used, and the temporary files used by the
141# test. You need to compare e.g. /tmp/test_simple_apex_input_XXXXXXXX.apex with
142# /tmp/test_simple_apex_repacked_YYYYYYYY.apex to check where they are
143# different.
144# A simple script to analyze the differences:
145#
146# FILE_INPUT=/tmp/test_simple_apex_input_XXXXXXXX.apex
147# FILE_OUTPUT=/tmp/test_simple_apex_repacked_YYYYYYYY.apex
148#
149# cd ~/tmp/
150# rm -rf input output
151# mkdir input output
152# unzip ${FILE_INPUT} -d input/
153# unzip ${FILE_OUTPUT} -d output/
154#
155# diff -r input/ output/
156#
157# For analyzing binary diffs I had mild success using the vbindiff utility.
158DEBUG_TEST = False
159
160
161class ApexerRebuildTest(unittest.TestCase):
162    def setUp(self):
163        self._to_cleanup = []
164
165    def tearDown(self):
166        if not DEBUG_TEST:
167            for i in self._to_cleanup:
168                if os.path.isdir(i):
169                    shutil.rmtree(i, ignore_errors=True)
170                else:
171                    os.remove(i)
172            del self._to_cleanup[:]
173        else:
174            print(self._to_cleanup)
175
176
177    def _get_container_files(self, apex_file_path):
178        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_container_files_")
179        self._to_cleanup.append(dir_name)
180        with ZipFile(apex_file_path, 'r') as zip_obj:
181            zip_obj.extractall(path=dir_name)
182        files = {}
183        for i in ["apex_manifest.json", "apex_manifest.pb",
184                  "apex_build_info.pb", "assets",
185                  "apex_payload.img", "apex_payload.zip"]:
186            file_path = os.path.join(dir_name, i)
187            if os.path.exists(file_path):
188                files[i] = file_path
189        self.assertIn("apex_manifest.pb", files)
190        self.assertIn("apex_build_info.pb", files)
191
192        image_file = None
193        if "apex_payload.img" in files:
194            image_file = files["apex_payload.img"]
195        elif "apex_payload.zip" in files:
196            image_file = files["apex_payload.zip"]
197        self.assertIsNotNone(image_file)
198        files["apex_payload"] = image_file
199
200        return files
201
202    def _extract_payload_from_img(self, img_file_path):
203        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_")
204        self._to_cleanup.append(dir_name)
205        cmd = ["debugfs_static", '-R', 'rdump ./ %s' % dir_name, img_file_path]
206        run_host_command(cmd)
207
208        # Remove payload files added by apexer and e2fs tools.
209        for i in ["apex_manifest.json", "apex_manifest.pb"]:
210            if os.path.exists(os.path.join(dir_name, i)):
211                os.remove(os.path.join(dir_name, i))
212        if os.path.isdir(os.path.join(dir_name, "lost+found")):
213            shutil.rmtree(os.path.join(dir_name, "lost+found"))
214        return dir_name
215
216    def _extract_payload(self, apex_file_path):
217        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_")
218        self._to_cleanup.append(dir_name)
219        cmd = ["deapexer", "extract", apex_file_path, dir_name]
220        run_host_command(cmd)
221
222        # Remove payload files added by apexer and e2fs tools.
223        for i in ["apex_manifest.json", "apex_manifest.pb"]:
224            if os.path.exists(os.path.join(dir_name, i)):
225                os.remove(os.path.join(dir_name, i))
226        if os.path.isdir(os.path.join(dir_name, "lost+found")):
227            shutil.rmtree(os.path.join(dir_name, "lost+found"))
228        return dir_name
229
230    def _run_apexer(self, container_files, payload_dir, args=[]):
231        unsigned_payload_only = False
232        payload_only = False
233        if "--unsigned_payload_only" in args:
234            unsigned_payload_only = True
235        if unsigned_payload_only or "--payload_only" in args:
236            payload_only = True
237
238        os.environ["APEXER_TOOL_PATH"] = (
239            "out/soong/host/linux-x86/bin:prebuilts/sdk/tools/linux/bin")
240        cmd = ["apexer", "--force", "--include_build_info", "--do_not_check_keyname"]
241        cmd.extend(["--manifest", container_files["apex_manifest.pb"]])
242        if "apex_manifest.json" in container_files:
243            cmd.extend(["--manifest_json", container_files["apex_manifest.json"]])
244        cmd.extend(["--build_info", container_files["apex_build_info.pb"]])
245        if not payload_only and "assets" in container_files:
246            cmd.extend(["--assets_dir", "assets"])
247        if not unsigned_payload_only:
248            cmd.extend(["--key", os.path.join(get_current_dir(), TEST_PRIVATE_KEY)])
249            cmd.extend(["--pubkey", os.path.join(get_current_dir(), TEST_AVB_PUBLIC_KEY)])
250        cmd.extend(args)
251
252        # Decide on output file name
253        apex_suffix = ".apex.unsigned"
254        if payload_only:
255            apex_suffix = ".payload"
256        fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=apex_suffix)
257        os.close(fd)
258        self._to_cleanup.append(fn)
259        cmd.extend([payload_dir, fn])
260
261        run_host_command(cmd)
262        return fn
263
264    def _sign_apk_container(self, unsigned_apex):
265        fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".apex")
266        os.close(fd)
267        self._to_cleanup.append(fn)
268        cmd = [
269            "prebuilts/jdk/jdk11/linux-x86/bin/java",
270            "-Djava.library.path=out/soong/host/linux-x86/lib64",
271            "-jar", "out/soong/host/linux-x86/framework/signapk.jar",
272            "-a", "4096",
273            os.path.join(get_current_dir(), TEST_X509_KEY),
274            os.path.join(get_current_dir(), TEST_PK8_KEY),
275            unsigned_apex, fn]
276        run_and_check_output(cmd)
277        return fn
278
279    def _sign_payload(self, container_files, unsigned_payload):
280        fd, signed_payload = \
281            tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".payload")
282        os.close(fd)
283        self._to_cleanup.append(signed_payload)
284        shutil.copyfile(unsigned_payload, signed_payload)
285
286        cmd = ['avbtool']
287        cmd.append('add_hashtree_footer')
288        cmd.append('--do_not_generate_fec')
289        cmd.extend(['--algorithm', 'SHA256_RSA4096'])
290        cmd.extend(['--hash_algorithm', 'sha256'])
291        cmd.extend(['--key', os.path.join(get_current_dir(), TEST_PRIVATE_KEY)])
292        manifest_apex = ValidateApexManifest(container_files["apex_manifest.pb"])
293        cmd.extend(['--prop', 'apex.key:' + manifest_apex.name])
294        # Set up the salt based on manifest content which includes name
295        # and version
296        salt = hashlib.sha256(manifest_apex.SerializeToString()).hexdigest()
297        cmd.extend(['--salt', salt])
298        cmd.extend(['--image', signed_payload])
299        cmd.append('--no_hashtree')
300        run_and_check_output(cmd)
301
302        return signed_payload
303
304    def _verify_payload(self, payload):
305        """Verifies that the payload is properly signed by avbtool"""
306        cmd = ["avbtool", "verify_image", "--image", payload, "--accept_zeroed_hashtree"]
307        run_and_check_output(cmd)
308
309    def _run_build_test(self, apex_name):
310        apex_file_path = os.path.join(get_current_dir(), apex_name + ".apex")
311        if DEBUG_TEST:
312            fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_input_", suffix=".apex")
313            os.close(fd)
314            shutil.copyfile(apex_file_path, fn)
315            self._to_cleanup.append(fn)
316        container_files = self._get_container_files(apex_file_path)
317        payload_dir = self._extract_payload(apex_file_path)
318        repack_apex_file_path = self._run_apexer(container_files, payload_dir)
319        resigned_apex_file_path = self._sign_apk_container(repack_apex_file_path)
320        self.assertEqual(get_sha1sum(apex_file_path), get_sha1sum(resigned_apex_file_path))
321
322    def test_simple_apex(self):
323        self._run_build_test(TEST_APEX)
324
325    def test_legacy_apex(self):
326        self._run_build_test(TEST_APEX_LEGACY)
327
328    def test_output_payload_only(self):
329        """Assert that payload-only output from apexer is same as the payload we get by unzipping
330        apex.
331        """
332        apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex")
333        container_files = self._get_container_files(apex_file_path)
334        payload_dir = self._extract_payload(apex_file_path)
335        payload_only_file_path = self._run_apexer(container_files, payload_dir, ["--payload_only"])
336        self._verify_payload(payload_only_file_path)
337        self.assertEqual(get_sha1sum(payload_only_file_path),
338                         get_sha1sum(container_files["apex_payload"]))
339
340    def test_output_unsigned_payload_only(self):
341        """Assert that when unsigned-payload-only output from apexer is signed by the avb key, it is
342        same as the payload we get by unzipping apex.
343        """
344        apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex")
345        container_files = self._get_container_files(apex_file_path)
346        payload_dir = self._extract_payload(apex_file_path)
347        unsigned_payload_only_file_path = self._run_apexer(container_files, payload_dir,
348                                                  ["--unsigned_payload_only"])
349        with self.assertRaises(RuntimeError) as error:
350            self._verify_payload(unsigned_payload_only_file_path)
351        self.assertIn("Given image does not look like a vbmeta image", str(error.exception))
352        signed_payload = self._sign_payload(container_files, unsigned_payload_only_file_path)
353        self.assertEqual(get_sha1sum(signed_payload),
354                         get_sha1sum(container_files["apex_payload"]))
355
356        # Now assert that given an unsigned image and the original container
357        # files, we can produce an identical unsigned image.
358        unsigned_payload_dir = self._extract_payload_from_img(unsigned_payload_only_file_path)
359        unsigned_payload_only_2_file_path = self._run_apexer(container_files, unsigned_payload_dir,
360                                                             ["--unsigned_payload_only"])
361        self.assertEqual(get_sha1sum(unsigned_payload_only_file_path),
362                         get_sha1sum(unsigned_payload_only_2_file_path))
363
364    def test_apex_with_logging_parent(self):
365      self._run_build_test(TEST_APEX_WITH_LOGGING_PARENT)
366
367    def test_apex_with_overridden_package_name(self):
368      self._run_build_test(TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME)
369
370
371if __name__ == '__main__':
372    unittest.main(verbosity=2)
373