• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Unit tests for apexer_with_DCLA_preprocessing."""
18import hashlib
19import logging
20import os
21import shutil
22import stat
23import subprocess
24import tempfile
25from typing import List
26import unittest
27import zipfile
28
29TEST_PRIVATE_KEY = os.path.join('testdata', 'com.android.example.apex.pem')
30TEST_APEX = 'com.android.example.apex'
31
32# In order to debug test failures, set DEBUG_TEST to True and run the test from
33# local workstation bypassing atest, e.g.:
34# $ m apexer_with_DCLA_preprocessing_test && \
35#   out/host/linux-x86/nativetest64/apexer_with_DCLA_preprocessing_test/\
36#   apexer_with_DCLA_preprocessing_test
37#
38# the test will print out the command used, and the temporary files used by the
39# test.
40DEBUG_TEST = False
41
42def get_current_dir():
43  """returns the current dir, relative to the script dir."""
44  # The script dir is the one we want, which could be different from pwd.
45  current_dir = os.path.dirname(os.path.realpath(__file__))
46  return current_dir
47
48# TODO: consolidate these common test utilities into a common python_library_host
49# to be shared across tests under system/apex
50def run_command(cmd: List[str]) -> None:
51  """Run a command."""
52  try:
53    if DEBUG_TEST:
54      cmd_str = ' '.join(cmd)
55      print(f'\nRunning: \n{cmd_str}\n')
56    res = subprocess.run(
57        cmd,
58        check=True,
59        stdout=subprocess.PIPE,
60        universal_newlines=True,
61        stderr=subprocess.PIPE)
62  except subprocess.CalledProcessError as err:
63    print(err.stderr)
64    print(err.output)
65    raise err
66
67def get_apexer_with_DCLA_preprocessing() -> str:
68  tool_binary = os.path.join(get_current_dir(), 'apexer_with_DCLA_preprocessing')
69  if not os.path.isfile(tool_binary):
70    raise FileNotFoundError(f'cannot find tooling apexer with DCLA preprocessing')
71  else:
72    os.chmod(tool_binary, stat.S_IRUSR | stat.S_IXUSR);
73    return tool_binary
74
75def get_host_tool(tool_name: str) -> str:
76  """get host tools."""
77  tool_binary = os.path.join(get_current_dir(), 'bin', tool_name)
78  if not os.path.isfile(tool_binary):
79    host_build_top = os.environ.get('ANDROID_BUILD_TOP')
80    if host_build_top:
81      host_command_dir = os.path.join(host_build_top, 'out/host/linux-x86/bin')
82      tool_binary = os.path.join(host_command_dir, tool_name)
83      if not os.path.isfile(tool_binary):
84        host_command_dir = os.path.join(host_build_top, 'prebuilts/sdk/current/public')
85        tool_binary = os.path.join(host_command_dir, tool_name)
86    else:
87      tool_binary = shutil.which(tool_name)
88
89  if not tool_binary or not os.path.isfile(tool_binary):
90    raise FileNotFoundError(f'cannot find tooling {tool_name}')
91  else:
92    return tool_binary
93
94def get_digest(file_path: str) -> str:
95  """Get sha512 digest of a file """
96  digester = hashlib.sha512()
97  with open(file_path, 'rb') as f:
98    bytes_to_digest = f.read()
99    digester.update(bytes_to_digest)
100    return digester.hexdigest()
101
102class ApexerWithDCLAPreprocessingTest(unittest.TestCase):
103
104  def setUp(self):
105    self._to_cleanup = []
106    tools_zip_file = os.path.join(get_current_dir(), 'apexer_test_host_tools.zip')
107    self.unzip_host_tools(tools_zip_file)
108
109  def tearDown(self):
110    if not DEBUG_TEST:
111      for i in self._to_cleanup:
112        if os.path.isdir(i):
113          shutil.rmtree(i, ignore_errors=True)
114        else:
115          os.remove(i)
116      del self._to_cleanup[:]
117    else:
118      print('Cleanup: ' + str(self._to_cleanup))
119
120  def create_temp_dir(self) -> str:
121    tmp_dir = tempfile.mkdtemp()
122    self._to_cleanup.append(tmp_dir)
123    return tmp_dir
124
125  def expand_apex(self, apex_file: str) -> None:
126    """expand an apex file include apex_payload."""
127    apex_dir = self.create_temp_dir()
128    with zipfile.ZipFile(apex_file, 'r') as apex_zip:
129      apex_zip.extractall(apex_dir)
130    payload_img = os.path.join(apex_dir, 'apex_payload.img')
131    extract_dir = os.path.join(apex_dir, 'payload_extract')
132    os.mkdir(extract_dir)
133    debugfs = get_host_tool('debugfs_static')
134    run_command([debugfs, payload_img, '-R', f'rdump / {extract_dir}'])
135
136    # remove /etc and /lost+found and /payload_extract/apex_manifest.pb
137    lost_and_found = os.path.join(extract_dir, 'lost+found')
138    etc_dir = os.path.join(extract_dir, 'etc')
139    os.remove(os.path.join(extract_dir, 'apex_manifest.pb'))
140    if os.path.isdir(lost_and_found):
141      shutil.rmtree(lost_and_found)
142    if os.path.isdir(etc_dir):
143      shutil.rmtree(etc_dir)
144
145    return apex_dir
146
147  def unzip_host_tools(self, host_tools_file_path: str) -> None:
148    dir_name = get_current_dir()
149    if os.path.isfile(host_tools_file_path):
150      with zipfile.ZipFile(host_tools_file_path, 'r') as zip_obj:
151        zip_obj.extractall(dir_name)
152
153    for i in ["apexer", "deapexer", "avbtool", "mke2fs", "sefcontext_compile", "e2fsdroid",
154      "resize2fs", "soong_zip", "aapt2", "merge_zips", "zipalign", "debugfs_static",
155      "signapk.jar", "android.jar"]:
156      file_path = os.path.join(dir_name, "bin", i)
157      if os.path.exists(file_path):
158        os.chmod(file_path, stat.S_IRUSR | stat.S_IXUSR);
159
160  def test_DCLA_preprocessing(self):
161    """test DCLA preprocessing done properly."""
162    apex_file = os.path.join(get_current_dir(), TEST_APEX + '.apex')
163    apex_dir = self.expand_apex(apex_file)
164
165    # create apex canned_fs_config file, TEST_APEX does not come with one
166    canned_fs_config_file = os.path.join(apex_dir, 'canned_fs_config')
167    with open(canned_fs_config_file, 'w') as f:
168      # add /lib/foo.so file
169      lib_dir = os.path.join(apex_dir, 'payload_extract', 'lib')
170      os.makedirs(lib_dir)
171      foo_file = os.path.join(lib_dir, 'foo.so')
172      with open(foo_file, 'w') as lib_file:
173        lib_file.write('This is a placeholder lib file.')
174      foo_digest = get_digest(foo_file)
175
176      # add /lib dir and /lib/foo.so in canned_fs_config
177      f.write(f'/lib 0 2000 0755\n')
178      f.write(f'/lib/foo.so 1000 1000 0644\n')
179
180      # add /lib/bar.so file
181      lib_dir = os.path.join(apex_dir, 'payload_extract', 'lib64')
182      os.makedirs(lib_dir)
183      bar_file = os.path.join(lib_dir, 'bar.so')
184      with open(bar_file, 'w') as lib_file:
185        lib_file.write('This is another placeholder lib file.')
186      bar_digest = get_digest(bar_file)
187
188      # add /lib dir and /lib/foo.so in canned_fs_config
189      f.write(f'/lib64 0 2000 0755\n')
190      f.write(f'/lib64/bar.so 1000 1000 0644\n')
191
192      f.write(f'/ 0 2000 0755\n')
193      f.write(f'/apex_manifest.pb 1000 1000 0644\n')
194
195    # call apexer_with_DCLA_preprocessing
196    manifest_file = os.path.join(apex_dir, 'apex_manifest.pb')
197    build_info_file = os.path.join(apex_dir, 'apex_build_info.pb')
198    key_file = os.path.join(get_current_dir(), TEST_PRIVATE_KEY)
199    apexer= get_host_tool('apexer')
200    apexer_wrapper = get_apexer_with_DCLA_preprocessing()
201    android_jar = get_host_tool('android.jar')
202    apex_out = os.path.join(apex_dir, 'DCLA_preprocessed_output.apex')
203    run_command([apexer_wrapper,
204                 '--apexer', apexer,
205                 '--canned_fs_config', canned_fs_config_file,
206                 os.path.join(apex_dir, 'payload_extract'),
207                 apex_out,
208                 '--',
209                 '--android_jar_path', android_jar,
210                 '--apexer_tool_path', os.path.dirname(apexer),
211                 '--key', key_file,
212                 '--manifest', manifest_file,
213                 '--build_info', build_info_file,
214                 '--payload_fs_type', 'ext4',
215                 '--payload_type', 'image',
216                 '--force'
217                 ])
218
219    # check the existence of updated canned_fs_config
220    updated_canned_fs_config = os.path.join(apex_dir, 'updated_canned_fs_config')
221    self.assertTrue(
222        os.path.isfile(updated_canned_fs_config),
223        'missing updated canned_fs_config file named updated_canned_fs_config')
224
225    # check the resulting apex, it should have /lib/foo.so/<hash>/foo.so and
226    # /lib64/bar.so/<hash>/bar.so
227    result_apex_dir = self.expand_apex(apex_out)
228    replaced_foo = os.path.join(
229        result_apex_dir, f'payload_extract/lib/foo.so/{foo_digest}/foo.so')
230    replaced_bar = os.path.join(
231        result_apex_dir, f'payload_extract/lib64/bar.so/{bar_digest}/bar.so')
232    self.assertTrue(
233        os.path.isfile(replaced_foo),
234        f'expecting /lib/foo.so/{foo_digest}/foo.so')
235    self.assertTrue(
236        os.path.isfile(replaced_bar),
237        f'expecting /lib64/bar.so/{bar_digest}/bar.so')
238
239if __name__ == '__main__':
240  unittest.main(verbosity=2)
241