1#!/usr/bin/env python3 2# 3# Copyright (C) 2020 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Unit tests for apexer_with_DCLA_preprocessing.""" 18import hashlib 19import logging 20import os 21import shutil 22import stat 23import subprocess 24import tempfile 25from typing import List 26import unittest 27import zipfile 28 29TEST_PRIVATE_KEY = os.path.join('testdata', 'com.android.example.apex.pem') 30TEST_APEX = 'com.android.example.apex' 31 32# In order to debug test failures, set DEBUG_TEST to True and run the test from 33# local workstation bypassing atest, e.g.: 34# $ m apexer_with_DCLA_preprocessing_test && \ 35# out/host/linux-x86/nativetest64/apexer_with_DCLA_preprocessing_test/\ 36# apexer_with_DCLA_preprocessing_test 37# 38# the test will print out the command used, and the temporary files used by the 39# test. 40DEBUG_TEST = False 41 42def get_current_dir(): 43 """returns the current dir, relative to the script dir.""" 44 # The script dir is the one we want, which could be different from pwd. 45 current_dir = os.path.dirname(os.path.realpath(__file__)) 46 return current_dir 47 48# TODO: consolidate these common test utilities into a common python_library_host 49# to be shared across tests under system/apex 50def run_command(cmd: List[str]) -> None: 51 """Run a command.""" 52 try: 53 if DEBUG_TEST: 54 cmd_str = ' '.join(cmd) 55 print(f'\nRunning: \n{cmd_str}\n') 56 res = subprocess.run( 57 cmd, 58 check=True, 59 stdout=subprocess.PIPE, 60 universal_newlines=True, 61 stderr=subprocess.PIPE) 62 except subprocess.CalledProcessError as err: 63 print(err.stderr) 64 print(err.output) 65 raise err 66 67def get_apexer_with_DCLA_preprocessing() -> str: 68 tool_binary = os.path.join(get_current_dir(), 'apexer_with_DCLA_preprocessing') 69 if not os.path.isfile(tool_binary): 70 raise FileNotFoundError(f'cannot find tooling apexer with DCLA preprocessing') 71 else: 72 os.chmod(tool_binary, stat.S_IRUSR | stat.S_IXUSR); 73 return tool_binary 74 75def get_host_tool(tool_name: str) -> str: 76 """get host tools.""" 77 tool_binary = os.path.join(get_current_dir(), 'bin', tool_name) 78 if not os.path.isfile(tool_binary): 79 host_build_top = os.environ.get('ANDROID_BUILD_TOP') 80 if host_build_top: 81 host_command_dir = os.path.join(host_build_top, 'out/host/linux-x86/bin') 82 tool_binary = os.path.join(host_command_dir, tool_name) 83 if not os.path.isfile(tool_binary): 84 host_command_dir = os.path.join(host_build_top, 'prebuilts/sdk/current/public') 85 tool_binary = os.path.join(host_command_dir, tool_name) 86 else: 87 tool_binary = shutil.which(tool_name) 88 89 if not tool_binary or not os.path.isfile(tool_binary): 90 raise FileNotFoundError(f'cannot find tooling {tool_name}') 91 else: 92 return tool_binary 93 94def get_digest(file_path: str) -> str: 95 """Get sha512 digest of a file """ 96 digester = hashlib.sha512() 97 with open(file_path, 'rb') as f: 98 bytes_to_digest = f.read() 99 digester.update(bytes_to_digest) 100 return digester.hexdigest() 101 102class ApexerWithDCLAPreprocessingTest(unittest.TestCase): 103 104 def setUp(self): 105 self._to_cleanup = [] 106 tools_zip_file = os.path.join(get_current_dir(), 'apexer_test_host_tools.zip') 107 self.unzip_host_tools(tools_zip_file) 108 109 def tearDown(self): 110 if not DEBUG_TEST: 111 for i in self._to_cleanup: 112 if os.path.isdir(i): 113 shutil.rmtree(i, ignore_errors=True) 114 else: 115 os.remove(i) 116 del self._to_cleanup[:] 117 else: 118 print('Cleanup: ' + str(self._to_cleanup)) 119 120 def create_temp_dir(self) -> str: 121 tmp_dir = tempfile.mkdtemp() 122 self._to_cleanup.append(tmp_dir) 123 return tmp_dir 124 125 def expand_apex(self, apex_file: str) -> None: 126 """expand an apex file include apex_payload.""" 127 apex_dir = self.create_temp_dir() 128 with zipfile.ZipFile(apex_file, 'r') as apex_zip: 129 apex_zip.extractall(apex_dir) 130 payload_img = os.path.join(apex_dir, 'apex_payload.img') 131 extract_dir = os.path.join(apex_dir, 'payload_extract') 132 os.mkdir(extract_dir) 133 debugfs = get_host_tool('debugfs_static') 134 run_command([debugfs, payload_img, '-R', f'rdump / {extract_dir}']) 135 136 # remove /etc and /lost+found and /payload_extract/apex_manifest.pb 137 lost_and_found = os.path.join(extract_dir, 'lost+found') 138 etc_dir = os.path.join(extract_dir, 'etc') 139 os.remove(os.path.join(extract_dir, 'apex_manifest.pb')) 140 if os.path.isdir(lost_and_found): 141 shutil.rmtree(lost_and_found) 142 if os.path.isdir(etc_dir): 143 shutil.rmtree(etc_dir) 144 145 return apex_dir 146 147 def unzip_host_tools(self, host_tools_file_path: str) -> None: 148 dir_name = get_current_dir() 149 if os.path.isfile(host_tools_file_path): 150 with zipfile.ZipFile(host_tools_file_path, 'r') as zip_obj: 151 zip_obj.extractall(dir_name) 152 153 for i in ["apexer", "deapexer", "avbtool", "mke2fs", "sefcontext_compile", "e2fsdroid", 154 "resize2fs", "soong_zip", "aapt2", "merge_zips", "zipalign", "debugfs_static", 155 "signapk.jar", "android.jar"]: 156 file_path = os.path.join(dir_name, "bin", i) 157 if os.path.exists(file_path): 158 os.chmod(file_path, stat.S_IRUSR | stat.S_IXUSR); 159 160 def test_DCLA_preprocessing(self): 161 """test DCLA preprocessing done properly.""" 162 apex_file = os.path.join(get_current_dir(), TEST_APEX + '.apex') 163 apex_dir = self.expand_apex(apex_file) 164 165 # create apex canned_fs_config file, TEST_APEX does not come with one 166 canned_fs_config_file = os.path.join(apex_dir, 'canned_fs_config') 167 with open(canned_fs_config_file, 'w') as f: 168 # add /lib/foo.so file 169 lib_dir = os.path.join(apex_dir, 'payload_extract', 'lib') 170 os.makedirs(lib_dir) 171 foo_file = os.path.join(lib_dir, 'foo.so') 172 with open(foo_file, 'w') as lib_file: 173 lib_file.write('This is a placeholder lib file.') 174 foo_digest = get_digest(foo_file) 175 176 # add /lib dir and /lib/foo.so in canned_fs_config 177 f.write(f'/lib 0 2000 0755\n') 178 f.write(f'/lib/foo.so 1000 1000 0644\n') 179 180 # add /lib/bar.so file 181 lib_dir = os.path.join(apex_dir, 'payload_extract', 'lib64') 182 os.makedirs(lib_dir) 183 bar_file = os.path.join(lib_dir, 'bar.so') 184 with open(bar_file, 'w') as lib_file: 185 lib_file.write('This is another placeholder lib file.') 186 bar_digest = get_digest(bar_file) 187 188 # add /lib dir and /lib/foo.so in canned_fs_config 189 f.write(f'/lib64 0 2000 0755\n') 190 f.write(f'/lib64/bar.so 1000 1000 0644\n') 191 192 f.write(f'/ 0 2000 0755\n') 193 f.write(f'/apex_manifest.pb 1000 1000 0644\n') 194 195 # call apexer_with_DCLA_preprocessing 196 manifest_file = os.path.join(apex_dir, 'apex_manifest.pb') 197 build_info_file = os.path.join(apex_dir, 'apex_build_info.pb') 198 key_file = os.path.join(get_current_dir(), TEST_PRIVATE_KEY) 199 apexer= get_host_tool('apexer') 200 apexer_wrapper = get_apexer_with_DCLA_preprocessing() 201 android_jar = get_host_tool('android.jar') 202 apex_out = os.path.join(apex_dir, 'DCLA_preprocessed_output.apex') 203 run_command([apexer_wrapper, 204 '--apexer', apexer, 205 '--canned_fs_config', canned_fs_config_file, 206 os.path.join(apex_dir, 'payload_extract'), 207 apex_out, 208 '--', 209 '--android_jar_path', android_jar, 210 '--apexer_tool_path', os.path.dirname(apexer), 211 '--key', key_file, 212 '--manifest', manifest_file, 213 '--build_info', build_info_file, 214 '--payload_fs_type', 'ext4', 215 '--payload_type', 'image', 216 '--force' 217 ]) 218 219 # check the existence of updated canned_fs_config 220 updated_canned_fs_config = os.path.join(apex_dir, 'updated_canned_fs_config') 221 self.assertTrue( 222 os.path.isfile(updated_canned_fs_config), 223 'missing updated canned_fs_config file named updated_canned_fs_config') 224 225 # check the resulting apex, it should have /lib/foo.so/<hash>/foo.so and 226 # /lib64/bar.so/<hash>/bar.so 227 result_apex_dir = self.expand_apex(apex_out) 228 replaced_foo = os.path.join( 229 result_apex_dir, f'payload_extract/lib/foo.so/{foo_digest}/foo.so') 230 replaced_bar = os.path.join( 231 result_apex_dir, f'payload_extract/lib64/bar.so/{bar_digest}/bar.so') 232 self.assertTrue( 233 os.path.isfile(replaced_foo), 234 f'expecting /lib/foo.so/{foo_digest}/foo.so') 235 self.assertTrue( 236 os.path.isfile(replaced_bar), 237 f'expecting /lib64/bar.so/{bar_digest}/bar.so') 238 239if __name__ == '__main__': 240 unittest.main(verbosity=2) 241