1# 2# Copyright (C) 2015 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16import os 17import shutil 18import tempfile 19import time 20import unittest 21import zipfile 22 23import common 24import validate_target_files 25 26 27def random_string_with_holes(size, block_size, step_size): 28 data = ["\0"] * size 29 for begin in range(0, size, step_size): 30 end = begin + block_size 31 data[begin:end] = os.urandom(block_size) 32 return "".join(data) 33 34def get_2gb_string(): 35 kilobytes = 1024 36 megabytes = 1024 * kilobytes 37 gigabytes = 1024 * megabytes 38 39 size = int(2 * gigabytes + 1) 40 block_size = 4 * kilobytes 41 step_size = 4 * megabytes 42 two_gb_string = random_string_with_holes( 43 size, block_size, step_size) 44 return two_gb_string 45 46 47class CommonZipTest(unittest.TestCase): 48 def _verify(self, zip_file, zip_file_name, arcname, contents, 49 test_file_name=None, expected_stat=None, expected_mode=0o644, 50 expected_compress_type=zipfile.ZIP_STORED): 51 # Verify the stat if present. 52 if test_file_name is not None: 53 new_stat = os.stat(test_file_name) 54 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode)) 55 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime)) 56 57 # Reopen the zip file to verify. 58 zip_file = zipfile.ZipFile(zip_file_name, "r") 59 60 # Verify the timestamp. 61 info = zip_file.getinfo(arcname) 62 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0)) 63 64 # Verify the file mode. 65 mode = (info.external_attr >> 16) & 0o777 66 self.assertEqual(mode, expected_mode) 67 68 # Verify the compress type. 69 self.assertEqual(info.compress_type, expected_compress_type) 70 71 # Verify the zip contents. 72 self.assertEqual(zip_file.read(arcname), contents) 73 self.assertIsNone(zip_file.testzip()) 74 75 def _test_ZipWrite(self, contents, extra_zipwrite_args=None): 76 extra_zipwrite_args = dict(extra_zipwrite_args or {}) 77 78 test_file = tempfile.NamedTemporaryFile(delete=False) 79 test_file_name = test_file.name 80 81 zip_file = tempfile.NamedTemporaryFile(delete=False) 82 zip_file_name = zip_file.name 83 84 # File names within an archive strip the leading slash. 85 arcname = extra_zipwrite_args.get("arcname", test_file_name) 86 if arcname[0] == "/": 87 arcname = arcname[1:] 88 89 zip_file.close() 90 zip_file = zipfile.ZipFile(zip_file_name, "w") 91 92 try: 93 test_file.write(contents) 94 test_file.close() 95 96 expected_stat = os.stat(test_file_name) 97 expected_mode = extra_zipwrite_args.get("perms", 0o644) 98 expected_compress_type = extra_zipwrite_args.get("compress_type", 99 zipfile.ZIP_STORED) 100 time.sleep(5) # Make sure the atime/mtime will change measurably. 101 102 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args) 103 common.ZipClose(zip_file) 104 105 self._verify(zip_file, zip_file_name, arcname, contents, test_file_name, 106 expected_stat, expected_mode, expected_compress_type) 107 finally: 108 os.remove(test_file_name) 109 os.remove(zip_file_name) 110 111 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None): 112 extra_args = dict(extra_args or {}) 113 114 zip_file = tempfile.NamedTemporaryFile(delete=False) 115 zip_file_name = zip_file.name 116 zip_file.close() 117 118 zip_file = zipfile.ZipFile(zip_file_name, "w") 119 120 try: 121 expected_compress_type = extra_args.get("compress_type", 122 zipfile.ZIP_STORED) 123 time.sleep(5) # Make sure the atime/mtime will change measurably. 124 125 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo): 126 arcname = zinfo_or_arcname 127 expected_mode = extra_args.get("perms", 0o644) 128 else: 129 arcname = zinfo_or_arcname.filename 130 expected_mode = extra_args.get("perms", 131 zinfo_or_arcname.external_attr >> 16) 132 133 common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args) 134 common.ZipClose(zip_file) 135 136 self._verify(zip_file, zip_file_name, arcname, contents, 137 expected_mode=expected_mode, 138 expected_compress_type=expected_compress_type) 139 finally: 140 os.remove(zip_file_name) 141 142 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None): 143 extra_args = dict(extra_args or {}) 144 145 zip_file = tempfile.NamedTemporaryFile(delete=False) 146 zip_file_name = zip_file.name 147 148 test_file = tempfile.NamedTemporaryFile(delete=False) 149 test_file_name = test_file.name 150 151 arcname_large = test_file_name 152 arcname_small = "bar" 153 154 # File names within an archive strip the leading slash. 155 if arcname_large[0] == "/": 156 arcname_large = arcname_large[1:] 157 158 zip_file.close() 159 zip_file = zipfile.ZipFile(zip_file_name, "w") 160 161 try: 162 test_file.write(large) 163 test_file.close() 164 165 expected_stat = os.stat(test_file_name) 166 expected_mode = 0o644 167 expected_compress_type = extra_args.get("compress_type", 168 zipfile.ZIP_STORED) 169 time.sleep(5) # Make sure the atime/mtime will change measurably. 170 171 common.ZipWrite(zip_file, test_file_name, **extra_args) 172 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args) 173 common.ZipClose(zip_file) 174 175 # Verify the contents written by ZipWrite(). 176 self._verify(zip_file, zip_file_name, arcname_large, large, 177 test_file_name, expected_stat, expected_mode, 178 expected_compress_type) 179 180 # Verify the contents written by ZipWriteStr(). 181 self._verify(zip_file, zip_file_name, arcname_small, small, 182 expected_compress_type=expected_compress_type) 183 finally: 184 os.remove(zip_file_name) 185 os.remove(test_file_name) 186 187 def _test_reset_ZIP64_LIMIT(self, func, *args): 188 default_limit = (1 << 31) - 1 189 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) 190 func(*args) 191 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) 192 193 def test_ZipWrite(self): 194 file_contents = os.urandom(1024) 195 self._test_ZipWrite(file_contents) 196 197 def test_ZipWrite_with_opts(self): 198 file_contents = os.urandom(1024) 199 self._test_ZipWrite(file_contents, { 200 "arcname": "foobar", 201 "perms": 0o777, 202 "compress_type": zipfile.ZIP_DEFLATED, 203 }) 204 self._test_ZipWrite(file_contents, { 205 "arcname": "foobar", 206 "perms": 0o700, 207 "compress_type": zipfile.ZIP_STORED, 208 }) 209 210 def test_ZipWrite_large_file(self): 211 file_contents = get_2gb_string() 212 self._test_ZipWrite(file_contents, { 213 "compress_type": zipfile.ZIP_DEFLATED, 214 }) 215 216 def test_ZipWrite_resets_ZIP64_LIMIT(self): 217 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "") 218 219 def test_ZipWriteStr(self): 220 random_string = os.urandom(1024) 221 # Passing arcname 222 self._test_ZipWriteStr("foo", random_string) 223 224 # Passing zinfo 225 zinfo = zipfile.ZipInfo(filename="foo") 226 self._test_ZipWriteStr(zinfo, random_string) 227 228 # Timestamp in the zinfo should be overwritten. 229 zinfo.date_time = (2015, 3, 1, 15, 30, 0) 230 self._test_ZipWriteStr(zinfo, random_string) 231 232 def test_ZipWriteStr_with_opts(self): 233 random_string = os.urandom(1024) 234 # Passing arcname 235 self._test_ZipWriteStr("foo", random_string, { 236 "perms": 0o700, 237 "compress_type": zipfile.ZIP_DEFLATED, 238 }) 239 self._test_ZipWriteStr("bar", random_string, { 240 "compress_type": zipfile.ZIP_STORED, 241 }) 242 243 # Passing zinfo 244 zinfo = zipfile.ZipInfo(filename="foo") 245 self._test_ZipWriteStr(zinfo, random_string, { 246 "compress_type": zipfile.ZIP_DEFLATED, 247 }) 248 self._test_ZipWriteStr(zinfo, random_string, { 249 "perms": 0o600, 250 "compress_type": zipfile.ZIP_STORED, 251 }) 252 253 def test_ZipWriteStr_large_file(self): 254 # zipfile.writestr() doesn't work when the str size is over 2GiB even with 255 # the workaround. We will only test the case of writing a string into a 256 # large archive. 257 long_string = get_2gb_string() 258 short_string = os.urandom(1024) 259 self._test_ZipWriteStr_large_file(long_string, short_string, { 260 "compress_type": zipfile.ZIP_DEFLATED, 261 }) 262 263 def test_ZipWriteStr_resets_ZIP64_LIMIT(self): 264 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "") 265 zinfo = zipfile.ZipInfo(filename="foo") 266 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "") 267 268 def test_bug21309935(self): 269 zip_file = tempfile.NamedTemporaryFile(delete=False) 270 zip_file_name = zip_file.name 271 zip_file.close() 272 273 try: 274 random_string = os.urandom(1024) 275 zip_file = zipfile.ZipFile(zip_file_name, "w") 276 # Default perms should be 0o644 when passing the filename. 277 common.ZipWriteStr(zip_file, "foo", random_string) 278 # Honor the specified perms. 279 common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755) 280 # The perms in zinfo should be untouched. 281 zinfo = zipfile.ZipInfo(filename="baz") 282 zinfo.external_attr = 0o740 << 16 283 common.ZipWriteStr(zip_file, zinfo, random_string) 284 # Explicitly specified perms has the priority. 285 zinfo = zipfile.ZipInfo(filename="qux") 286 zinfo.external_attr = 0o700 << 16 287 common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400) 288 common.ZipClose(zip_file) 289 290 self._verify(zip_file, zip_file_name, "foo", random_string, 291 expected_mode=0o644) 292 self._verify(zip_file, zip_file_name, "bar", random_string, 293 expected_mode=0o755) 294 self._verify(zip_file, zip_file_name, "baz", random_string, 295 expected_mode=0o740) 296 self._verify(zip_file, zip_file_name, "qux", random_string, 297 expected_mode=0o400) 298 finally: 299 os.remove(zip_file_name) 300 301class InstallRecoveryScriptFormatTest(unittest.TestCase): 302 """Check the format of install-recovery.sh 303 304 Its format should match between common.py and validate_target_files.py.""" 305 306 def setUp(self): 307 self._tempdir = tempfile.mkdtemp() 308 # Create a dummy dict that contains the fstab info for boot&recovery. 309 self._info = {"fstab" : {}} 310 dummy_fstab = \ 311 ["/dev/soc.0/by-name/boot /boot emmc defaults defaults", 312 "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"] 313 self._info["fstab"] = common.LoadRecoveryFSTab(lambda x : "\n".join(x), 314 2, dummy_fstab) 315 316 def _out_tmp_sink(self, name, data, prefix="SYSTEM"): 317 loc = os.path.join(self._tempdir, prefix, name) 318 if not os.path.exists(os.path.dirname(loc)): 319 os.makedirs(os.path.dirname(loc)) 320 with open(loc, "w+") as f: 321 f.write(data) 322 323 def test_full_recovery(self): 324 recovery_image = common.File("recovery.img", "recovery"); 325 boot_image = common.File("boot.img", "boot"); 326 self._info["full_recovery_image"] = "true" 327 328 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink, 329 recovery_image, boot_image, self._info) 330 validate_target_files.ValidateInstallRecoveryScript(self._tempdir, 331 self._info) 332 333 def test_recovery_from_boot(self): 334 recovery_image = common.File("recovery.img", "recovery"); 335 self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES") 336 boot_image = common.File("boot.img", "boot"); 337 self._out_tmp_sink("boot.img", boot_image.data, "IMAGES") 338 339 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink, 340 recovery_image, boot_image, self._info) 341 validate_target_files.ValidateInstallRecoveryScript(self._tempdir, 342 self._info) 343 # Validate 'recovery-from-boot' with bonus argument. 344 self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM") 345 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink, 346 recovery_image, boot_image, self._info) 347 validate_target_files.ValidateInstallRecoveryScript(self._tempdir, 348 self._info) 349 350 def tearDown(self): 351 shutil.rmtree(self._tempdir) 352