1# 2# Copyright (C) 2015 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16 17import copy 18import os 19import subprocess 20import tempfile 21import time 22import zipfile 23from hashlib import sha1 24 25import common 26import test_utils 27import validate_target_files 28from images import EmptyImage, DataImage 29from rangelib import RangeSet 30 31 32KiB = 1024 33MiB = 1024 * KiB 34GiB = 1024 * MiB 35 36 37def get_2gb_string(): 38 size = int(2 * GiB + 1) 39 block_size = 4 * KiB 40 step_size = 4 * MiB 41 # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'. 42 for _ in range(0, size, step_size): 43 yield os.urandom(block_size) 44 yield b'\0' * (step_size - block_size) 45 46 47class BuildInfoTest(test_utils.ReleaseToolsTestCase): 48 49 TEST_INFO_DICT = { 50 'build.prop': common.PartitionBuildProps.FromDictionary( 51 'system', { 52 'ro.product.device': 'product-device', 53 'ro.product.name': 'product-name', 54 'ro.build.fingerprint': 'build-fingerprint', 55 'ro.build.foo': 'build-foo'} 56 ), 57 'system.build.prop': common.PartitionBuildProps.FromDictionary( 58 'system', { 59 'ro.product.system.brand': 'product-brand', 60 'ro.product.system.name': 'product-name', 61 'ro.product.system.device': 'product-device', 62 'ro.system.build.version.release': 'version-release', 63 'ro.system.build.id': 'build-id', 64 'ro.system.build.version.incremental': 'version-incremental', 65 'ro.system.build.type': 'build-type', 66 'ro.system.build.tags': 'build-tags', 67 'ro.system.build.foo': 'build-foo'} 68 ), 69 'vendor.build.prop': common.PartitionBuildProps.FromDictionary( 70 'vendor', { 71 'ro.product.vendor.brand': 'vendor-product-brand', 72 'ro.product.vendor.name': 'vendor-product-name', 73 'ro.product.vendor.device': 'vendor-product-device', 74 'ro.vendor.build.version.release': 'vendor-version-release', 75 'ro.vendor.build.id': 'vendor-build-id', 76 'ro.vendor.build.version.incremental': 77 'vendor-version-incremental', 78 'ro.vendor.build.type': 'vendor-build-type', 79 'ro.vendor.build.tags': 'vendor-build-tags'} 80 ), 81 'property1': 'value1', 82 'property2': 4096, 83 } 84 85 TEST_INFO_DICT_USES_OEM_PROPS = { 86 'build.prop': common.PartitionBuildProps.FromDictionary( 87 'system', { 88 'ro.product.name': 'product-name', 89 'ro.build.thumbprint': 'build-thumbprint', 90 'ro.build.bar': 'build-bar'} 91 ), 92 'vendor.build.prop': common.PartitionBuildProps.FromDictionary( 93 'vendor', { 94 'ro.vendor.build.fingerprint': 'vendor-build-fingerprint'} 95 ), 96 'property1': 'value1', 97 'property2': 4096, 98 'oem_fingerprint_properties': 'ro.product.device ro.product.brand', 99 } 100 101 TEST_OEM_DICTS = [ 102 { 103 'ro.product.brand': 'brand1', 104 'ro.product.device': 'device1', 105 }, 106 { 107 'ro.product.brand': 'brand2', 108 'ro.product.device': 'device2', 109 }, 110 { 111 'ro.product.brand': 'brand3', 112 'ro.product.device': 'device3', 113 }, 114 ] 115 116 TEST_INFO_DICT_PROPERTY_SOURCE_ORDER = { 117 'build.prop': common.PartitionBuildProps.FromDictionary( 118 'system', { 119 'ro.build.fingerprint': 'build-fingerprint', 120 'ro.product.property_source_order': 121 'product,odm,vendor,system_ext,system'} 122 ), 123 'system.build.prop': common.PartitionBuildProps.FromDictionary( 124 'system', { 125 'ro.product.system.device': 'system-product-device'} 126 ), 127 'vendor.build.prop': common.PartitionBuildProps.FromDictionary( 128 'vendor', { 129 'ro.product.vendor.device': 'vendor-product-device'} 130 ), 131 } 132 133 TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_10 = { 134 'build.prop': common.PartitionBuildProps.FromDictionary( 135 'system', { 136 'ro.build.fingerprint': 'build-fingerprint', 137 'ro.product.property_source_order': 138 'product,product_services,odm,vendor,system', 139 'ro.build.version.release': '10', 140 'ro.build.version.codename': 'REL'} 141 ), 142 'system.build.prop': common.PartitionBuildProps.FromDictionary( 143 'system', { 144 'ro.product.system.device': 'system-product-device'} 145 ), 146 'vendor.build.prop': common.PartitionBuildProps.FromDictionary( 147 'vendor', { 148 'ro.product.vendor.device': 'vendor-product-device'} 149 ), 150 } 151 152 TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_9 = { 153 'build.prop': common.PartitionBuildProps.FromDictionary( 154 'system', { 155 'ro.product.device': 'product-device', 156 'ro.build.fingerprint': 'build-fingerprint', 157 'ro.build.version.release': '9', 158 'ro.build.version.codename': 'REL'} 159 ), 160 'system.build.prop': common.PartitionBuildProps.FromDictionary( 161 'system', { 162 'ro.product.system.device': 'system-product-device'} 163 ), 164 'vendor.build.prop': common.PartitionBuildProps.FromDictionary( 165 'vendor', { 166 'ro.product.vendor.device': 'vendor-product-device'} 167 ), 168 } 169 170 def test_init(self): 171 target_info = common.BuildInfo(self.TEST_INFO_DICT, None) 172 self.assertEqual('product-device', target_info.device) 173 self.assertEqual('build-fingerprint', target_info.fingerprint) 174 self.assertFalse(target_info.is_ab) 175 self.assertIsNone(target_info.oem_props) 176 177 def test_init_with_oem_props(self): 178 target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 179 self.TEST_OEM_DICTS) 180 self.assertEqual('device1', target_info.device) 181 self.assertEqual('brand1/product-name/device1:build-thumbprint', 182 target_info.fingerprint) 183 184 # Swap the order in oem_dicts, which would lead to different BuildInfo. 185 oem_dicts = copy.copy(self.TEST_OEM_DICTS) 186 oem_dicts[0], oem_dicts[2] = oem_dicts[2], oem_dicts[0] 187 target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 188 oem_dicts) 189 self.assertEqual('device3', target_info.device) 190 self.assertEqual('brand3/product-name/device3:build-thumbprint', 191 target_info.fingerprint) 192 193 def test_init_badFingerprint(self): 194 info_dict = copy.deepcopy(self.TEST_INFO_DICT) 195 info_dict['build.prop'].build_props[ 196 'ro.build.fingerprint'] = 'bad fingerprint' 197 self.assertRaises(ValueError, common.BuildInfo, info_dict, None) 198 199 info_dict['build.prop'].build_props[ 200 'ro.build.fingerprint'] = 'bad\x80fingerprint' 201 self.assertRaises(ValueError, common.BuildInfo, info_dict, None) 202 203 def test___getitem__(self): 204 target_info = common.BuildInfo(self.TEST_INFO_DICT, None) 205 self.assertEqual('value1', target_info['property1']) 206 self.assertEqual(4096, target_info['property2']) 207 self.assertEqual('build-foo', 208 target_info['build.prop'].GetProp('ro.build.foo')) 209 210 def test___getitem__with_oem_props(self): 211 target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 212 self.TEST_OEM_DICTS) 213 self.assertEqual('value1', target_info['property1']) 214 self.assertEqual(4096, target_info['property2']) 215 self.assertIsNone(target_info['build.prop'].GetProp('ro.build.foo')) 216 217 def test___setitem__(self): 218 target_info = common.BuildInfo(copy.deepcopy(self.TEST_INFO_DICT), None) 219 self.assertEqual('value1', target_info['property1']) 220 target_info['property1'] = 'value2' 221 self.assertEqual('value2', target_info['property1']) 222 223 self.assertEqual('build-foo', 224 target_info['build.prop'].GetProp('ro.build.foo')) 225 target_info['build.prop'].build_props['ro.build.foo'] = 'build-bar' 226 self.assertEqual('build-bar', 227 target_info['build.prop'].GetProp('ro.build.foo')) 228 229 def test_get(self): 230 target_info = common.BuildInfo(self.TEST_INFO_DICT, None) 231 self.assertEqual('value1', target_info.get('property1')) 232 self.assertEqual(4096, target_info.get('property2')) 233 self.assertEqual(4096, target_info.get('property2', 1024)) 234 self.assertEqual(1024, target_info.get('property-nonexistent', 1024)) 235 self.assertEqual('build-foo', 236 target_info.get('build.prop').GetProp('ro.build.foo')) 237 238 def test_get_with_oem_props(self): 239 target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 240 self.TEST_OEM_DICTS) 241 self.assertEqual('value1', target_info.get('property1')) 242 self.assertEqual(4096, target_info.get('property2')) 243 self.assertEqual(4096, target_info.get('property2', 1024)) 244 self.assertEqual(1024, target_info.get('property-nonexistent', 1024)) 245 self.assertIsNone(target_info.get('build.prop').GetProp('ro.build.foo')) 246 247 def test_items(self): 248 target_info = common.BuildInfo(self.TEST_INFO_DICT, None) 249 items = target_info.items() 250 self.assertIn(('property1', 'value1'), items) 251 self.assertIn(('property2', 4096), items) 252 253 def test_GetBuildProp(self): 254 target_info = common.BuildInfo(self.TEST_INFO_DICT, None) 255 self.assertEqual('build-foo', target_info.GetBuildProp('ro.build.foo')) 256 self.assertRaises(common.ExternalError, target_info.GetBuildProp, 257 'ro.build.nonexistent') 258 259 def test_GetBuildProp_with_oem_props(self): 260 target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 261 self.TEST_OEM_DICTS) 262 self.assertEqual('build-bar', target_info.GetBuildProp('ro.build.bar')) 263 self.assertRaises(common.ExternalError, target_info.GetBuildProp, 264 'ro.build.nonexistent') 265 266 def test_GetPartitionFingerprint(self): 267 target_info = common.BuildInfo(self.TEST_INFO_DICT, None) 268 self.assertEqual( 269 target_info.GetPartitionFingerprint('vendor'), 270 'vendor-product-brand/vendor-product-name/vendor-product-device' 271 ':vendor-version-release/vendor-build-id/vendor-version-incremental' 272 ':vendor-build-type/vendor-build-tags') 273 274 def test_GetPartitionFingerprint_system_other_uses_system(self): 275 target_info = common.BuildInfo(self.TEST_INFO_DICT, None) 276 self.assertEqual( 277 target_info.GetPartitionFingerprint('system_other'), 278 target_info.GetPartitionFingerprint('system')) 279 280 def test_GetPartitionFingerprint_uses_fingerprint_prop_if_available(self): 281 info_dict = copy.deepcopy(self.TEST_INFO_DICT) 282 info_dict['vendor.build.prop'].build_props[ 283 'ro.vendor.build.fingerprint'] = 'vendor:fingerprint' 284 target_info = common.BuildInfo(info_dict, None) 285 self.assertEqual( 286 target_info.GetPartitionFingerprint('vendor'), 287 'vendor:fingerprint') 288 289 def test_WriteMountOemScript(self): 290 target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 291 self.TEST_OEM_DICTS) 292 script_writer = test_utils.MockScriptWriter() 293 target_info.WriteMountOemScript(script_writer) 294 self.assertEqual([('Mount', '/oem', None)], script_writer.lines) 295 296 def test_WriteDeviceAssertions(self): 297 target_info = common.BuildInfo(self.TEST_INFO_DICT, None) 298 script_writer = test_utils.MockScriptWriter() 299 target_info.WriteDeviceAssertions(script_writer, False) 300 self.assertEqual([('AssertDevice', 'product-device')], script_writer.lines) 301 302 def test_WriteDeviceAssertions_with_oem_props(self): 303 target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 304 self.TEST_OEM_DICTS) 305 script_writer = test_utils.MockScriptWriter() 306 target_info.WriteDeviceAssertions(script_writer, False) 307 self.assertEqual( 308 [ 309 ('AssertOemProperty', 'ro.product.device', 310 ['device1', 'device2', 'device3'], False), 311 ('AssertOemProperty', 'ro.product.brand', 312 ['brand1', 'brand2', 'brand3'], False), 313 ], 314 script_writer.lines) 315 316 def test_ResolveRoProductProperty_FromVendor(self): 317 info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER) 318 info = common.BuildInfo(info_dict, None) 319 self.assertEqual('vendor-product-device', 320 info.GetBuildProp('ro.product.device')) 321 322 def test_ResolveRoProductProperty_FromSystem(self): 323 info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER) 324 del info_dict['vendor.build.prop'].build_props['ro.product.vendor.device'] 325 info = common.BuildInfo(info_dict, None) 326 self.assertEqual('system-product-device', 327 info.GetBuildProp('ro.product.device')) 328 329 def test_ResolveRoProductProperty_InvalidPropertySearchOrder(self): 330 info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER) 331 info_dict['build.prop'].build_props[ 332 'ro.product.property_source_order'] = 'bad-source' 333 with self.assertRaisesRegexp(common.ExternalError, 334 'Invalid ro.product.property_source_order'): 335 info = common.BuildInfo(info_dict, None) 336 info.GetBuildProp('ro.product.device') 337 338 def test_ResolveRoProductProperty_Android10PropertySearchOrder(self): 339 info_dict = copy.deepcopy( 340 self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_10) 341 info = common.BuildInfo(info_dict, None) 342 self.assertEqual('vendor-product-device', 343 info.GetBuildProp('ro.product.device')) 344 345 def test_ResolveRoProductProperty_Android9PropertySearchOrder(self): 346 info_dict = copy.deepcopy( 347 self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_9) 348 info = common.BuildInfo(info_dict, None) 349 self.assertEqual('product-device', 350 info.GetBuildProp('ro.product.device')) 351 352 353class CommonZipTest(test_utils.ReleaseToolsTestCase): 354 355 def _verify(self, zip_file, zip_file_name, arcname, expected_hash, 356 test_file_name=None, expected_stat=None, expected_mode=0o644, 357 expected_compress_type=zipfile.ZIP_STORED): 358 # Verify the stat if present. 359 if test_file_name is not None: 360 new_stat = os.stat(test_file_name) 361 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode)) 362 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime)) 363 364 # Reopen the zip file to verify. 365 zip_file = zipfile.ZipFile(zip_file_name, "r") 366 367 # Verify the timestamp. 368 info = zip_file.getinfo(arcname) 369 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0)) 370 371 # Verify the file mode. 372 mode = (info.external_attr >> 16) & 0o777 373 self.assertEqual(mode, expected_mode) 374 375 # Verify the compress type. 376 self.assertEqual(info.compress_type, expected_compress_type) 377 378 # Verify the zip contents. 379 entry = zip_file.open(arcname) 380 sha1_hash = sha1() 381 for chunk in iter(lambda: entry.read(4 * MiB), b''): 382 sha1_hash.update(chunk) 383 self.assertEqual(expected_hash, sha1_hash.hexdigest()) 384 self.assertIsNone(zip_file.testzip()) 385 386 def _test_ZipWrite(self, contents, extra_zipwrite_args=None): 387 extra_zipwrite_args = dict(extra_zipwrite_args or {}) 388 389 test_file = tempfile.NamedTemporaryFile(delete=False) 390 test_file_name = test_file.name 391 392 zip_file = tempfile.NamedTemporaryFile(delete=False) 393 zip_file_name = zip_file.name 394 395 # File names within an archive strip the leading slash. 396 arcname = extra_zipwrite_args.get("arcname", test_file_name) 397 if arcname[0] == "/": 398 arcname = arcname[1:] 399 400 zip_file.close() 401 zip_file = zipfile.ZipFile(zip_file_name, "w") 402 403 try: 404 sha1_hash = sha1() 405 for data in contents: 406 sha1_hash.update(bytes(data)) 407 test_file.write(bytes(data)) 408 test_file.close() 409 410 expected_stat = os.stat(test_file_name) 411 expected_mode = extra_zipwrite_args.get("perms", 0o644) 412 expected_compress_type = extra_zipwrite_args.get("compress_type", 413 zipfile.ZIP_STORED) 414 time.sleep(5) # Make sure the atime/mtime will change measurably. 415 416 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args) 417 common.ZipClose(zip_file) 418 419 self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(), 420 test_file_name, expected_stat, expected_mode, 421 expected_compress_type) 422 finally: 423 os.remove(test_file_name) 424 os.remove(zip_file_name) 425 426 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None): 427 extra_args = dict(extra_args or {}) 428 429 zip_file = tempfile.NamedTemporaryFile(delete=False) 430 zip_file_name = zip_file.name 431 zip_file.close() 432 433 zip_file = zipfile.ZipFile(zip_file_name, "w") 434 435 try: 436 expected_compress_type = extra_args.get("compress_type", 437 zipfile.ZIP_STORED) 438 time.sleep(5) # Make sure the atime/mtime will change measurably. 439 440 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo): 441 arcname = zinfo_or_arcname 442 expected_mode = extra_args.get("perms", 0o644) 443 else: 444 arcname = zinfo_or_arcname.filename 445 if zinfo_or_arcname.external_attr: 446 zinfo_perms = zinfo_or_arcname.external_attr >> 16 447 else: 448 zinfo_perms = 0o600 449 expected_mode = extra_args.get("perms", zinfo_perms) 450 451 common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args) 452 common.ZipClose(zip_file) 453 454 self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(), 455 expected_mode=expected_mode, 456 expected_compress_type=expected_compress_type) 457 finally: 458 os.remove(zip_file_name) 459 460 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None): 461 extra_args = dict(extra_args or {}) 462 463 zip_file = tempfile.NamedTemporaryFile(delete=False) 464 zip_file_name = zip_file.name 465 466 test_file = tempfile.NamedTemporaryFile(delete=False) 467 test_file_name = test_file.name 468 469 arcname_large = test_file_name 470 arcname_small = "bar" 471 472 # File names within an archive strip the leading slash. 473 if arcname_large[0] == "/": 474 arcname_large = arcname_large[1:] 475 476 zip_file.close() 477 zip_file = zipfile.ZipFile(zip_file_name, "w") 478 479 try: 480 sha1_hash = sha1() 481 for data in large: 482 sha1_hash.update(data) 483 test_file.write(data) 484 test_file.close() 485 486 expected_stat = os.stat(test_file_name) 487 expected_mode = 0o644 488 expected_compress_type = extra_args.get("compress_type", 489 zipfile.ZIP_STORED) 490 time.sleep(5) # Make sure the atime/mtime will change measurably. 491 492 common.ZipWrite(zip_file, test_file_name, **extra_args) 493 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args) 494 common.ZipClose(zip_file) 495 496 # Verify the contents written by ZipWrite(). 497 self._verify(zip_file, zip_file_name, arcname_large, 498 sha1_hash.hexdigest(), test_file_name, expected_stat, 499 expected_mode, expected_compress_type) 500 501 # Verify the contents written by ZipWriteStr(). 502 self._verify(zip_file, zip_file_name, arcname_small, 503 sha1(small).hexdigest(), 504 expected_compress_type=expected_compress_type) 505 finally: 506 os.remove(zip_file_name) 507 os.remove(test_file_name) 508 509 def _test_reset_ZIP64_LIMIT(self, func, *args): 510 default_limit = (1 << 31) - 1 511 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) 512 func(*args) 513 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) 514 515 def test_ZipWrite(self): 516 file_contents = os.urandom(1024) 517 self._test_ZipWrite(file_contents) 518 519 def test_ZipWrite_with_opts(self): 520 file_contents = os.urandom(1024) 521 self._test_ZipWrite(file_contents, { 522 "arcname": "foobar", 523 "perms": 0o777, 524 "compress_type": zipfile.ZIP_DEFLATED, 525 }) 526 self._test_ZipWrite(file_contents, { 527 "arcname": "foobar", 528 "perms": 0o700, 529 "compress_type": zipfile.ZIP_STORED, 530 }) 531 532 def test_ZipWrite_large_file(self): 533 file_contents = get_2gb_string() 534 self._test_ZipWrite(file_contents, { 535 "compress_type": zipfile.ZIP_DEFLATED, 536 }) 537 538 def test_ZipWrite_resets_ZIP64_LIMIT(self): 539 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "") 540 541 def test_ZipWriteStr(self): 542 random_string = os.urandom(1024) 543 # Passing arcname 544 self._test_ZipWriteStr("foo", random_string) 545 546 # Passing zinfo 547 zinfo = zipfile.ZipInfo(filename="foo") 548 self._test_ZipWriteStr(zinfo, random_string) 549 550 # Timestamp in the zinfo should be overwritten. 551 zinfo.date_time = (2015, 3, 1, 15, 30, 0) 552 self._test_ZipWriteStr(zinfo, random_string) 553 554 def test_ZipWriteStr_with_opts(self): 555 random_string = os.urandom(1024) 556 # Passing arcname 557 self._test_ZipWriteStr("foo", random_string, { 558 "perms": 0o700, 559 "compress_type": zipfile.ZIP_DEFLATED, 560 }) 561 self._test_ZipWriteStr("bar", random_string, { 562 "compress_type": zipfile.ZIP_STORED, 563 }) 564 565 # Passing zinfo 566 zinfo = zipfile.ZipInfo(filename="foo") 567 self._test_ZipWriteStr(zinfo, random_string, { 568 "compress_type": zipfile.ZIP_DEFLATED, 569 }) 570 self._test_ZipWriteStr(zinfo, random_string, { 571 "perms": 0o600, 572 "compress_type": zipfile.ZIP_STORED, 573 }) 574 self._test_ZipWriteStr(zinfo, random_string, { 575 "perms": 0o000, 576 "compress_type": zipfile.ZIP_STORED, 577 }) 578 579 def test_ZipWriteStr_large_file(self): 580 # zipfile.writestr() doesn't work when the str size is over 2GiB even with 581 # the workaround. We will only test the case of writing a string into a 582 # large archive. 583 long_string = get_2gb_string() 584 short_string = os.urandom(1024) 585 self._test_ZipWriteStr_large_file(long_string, short_string, { 586 "compress_type": zipfile.ZIP_DEFLATED, 587 }) 588 589 def test_ZipWriteStr_resets_ZIP64_LIMIT(self): 590 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, 'foo', b'') 591 zinfo = zipfile.ZipInfo(filename="foo") 592 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, b'') 593 594 def test_bug21309935(self): 595 zip_file = tempfile.NamedTemporaryFile(delete=False) 596 zip_file_name = zip_file.name 597 zip_file.close() 598 599 try: 600 random_string = os.urandom(1024) 601 zip_file = zipfile.ZipFile(zip_file_name, "w") 602 # Default perms should be 0o644 when passing the filename. 603 common.ZipWriteStr(zip_file, "foo", random_string) 604 # Honor the specified perms. 605 common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755) 606 # The perms in zinfo should be untouched. 607 zinfo = zipfile.ZipInfo(filename="baz") 608 zinfo.external_attr = 0o740 << 16 609 common.ZipWriteStr(zip_file, zinfo, random_string) 610 # Explicitly specified perms has the priority. 611 zinfo = zipfile.ZipInfo(filename="qux") 612 zinfo.external_attr = 0o700 << 16 613 common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400) 614 common.ZipClose(zip_file) 615 616 self._verify(zip_file, zip_file_name, "foo", 617 sha1(random_string).hexdigest(), 618 expected_mode=0o644) 619 self._verify(zip_file, zip_file_name, "bar", 620 sha1(random_string).hexdigest(), 621 expected_mode=0o755) 622 self._verify(zip_file, zip_file_name, "baz", 623 sha1(random_string).hexdigest(), 624 expected_mode=0o740) 625 self._verify(zip_file, zip_file_name, "qux", 626 sha1(random_string).hexdigest(), 627 expected_mode=0o400) 628 finally: 629 os.remove(zip_file_name) 630 631 @test_utils.SkipIfExternalToolsUnavailable() 632 def test_ZipDelete(self): 633 zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip') 634 output_zip = zipfile.ZipFile(zip_file.name, 'w', 635 compression=zipfile.ZIP_DEFLATED) 636 with tempfile.NamedTemporaryFile() as entry_file: 637 entry_file.write(os.urandom(1024)) 638 common.ZipWrite(output_zip, entry_file.name, arcname='Test1') 639 common.ZipWrite(output_zip, entry_file.name, arcname='Test2') 640 common.ZipWrite(output_zip, entry_file.name, arcname='Test3') 641 common.ZipClose(output_zip) 642 zip_file.close() 643 644 try: 645 common.ZipDelete(zip_file.name, 'Test2') 646 with zipfile.ZipFile(zip_file.name, 'r') as check_zip: 647 entries = check_zip.namelist() 648 self.assertTrue('Test1' in entries) 649 self.assertFalse('Test2' in entries) 650 self.assertTrue('Test3' in entries) 651 652 self.assertRaises( 653 common.ExternalError, common.ZipDelete, zip_file.name, 'Test2') 654 with zipfile.ZipFile(zip_file.name, 'r') as check_zip: 655 entries = check_zip.namelist() 656 self.assertTrue('Test1' in entries) 657 self.assertFalse('Test2' in entries) 658 self.assertTrue('Test3' in entries) 659 660 common.ZipDelete(zip_file.name, ['Test3']) 661 with zipfile.ZipFile(zip_file.name, 'r') as check_zip: 662 entries = check_zip.namelist() 663 self.assertTrue('Test1' in entries) 664 self.assertFalse('Test2' in entries) 665 self.assertFalse('Test3' in entries) 666 667 common.ZipDelete(zip_file.name, ['Test1', 'Test2']) 668 with zipfile.ZipFile(zip_file.name, 'r') as check_zip: 669 entries = check_zip.namelist() 670 self.assertFalse('Test1' in entries) 671 self.assertFalse('Test2' in entries) 672 self.assertFalse('Test3' in entries) 673 finally: 674 os.remove(zip_file.name) 675 676 @staticmethod 677 def _test_UnzipTemp_createZipFile(): 678 zip_file = common.MakeTempFile(suffix='.zip') 679 output_zip = zipfile.ZipFile( 680 zip_file, 'w', compression=zipfile.ZIP_DEFLATED) 681 contents = os.urandom(1024) 682 with tempfile.NamedTemporaryFile() as entry_file: 683 entry_file.write(contents) 684 common.ZipWrite(output_zip, entry_file.name, arcname='Test1') 685 common.ZipWrite(output_zip, entry_file.name, arcname='Test2') 686 common.ZipWrite(output_zip, entry_file.name, arcname='Foo3') 687 common.ZipWrite(output_zip, entry_file.name, arcname='Bar4') 688 common.ZipWrite(output_zip, entry_file.name, arcname='Dir5/Baz5') 689 common.ZipClose(output_zip) 690 common.ZipClose(output_zip) 691 return zip_file 692 693 @test_utils.SkipIfExternalToolsUnavailable() 694 def test_UnzipTemp(self): 695 zip_file = self._test_UnzipTemp_createZipFile() 696 unzipped_dir = common.UnzipTemp(zip_file) 697 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 698 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 699 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 700 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 701 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 702 703 @test_utils.SkipIfExternalToolsUnavailable() 704 def test_UnzipTemp_withPatterns(self): 705 zip_file = self._test_UnzipTemp_createZipFile() 706 707 unzipped_dir = common.UnzipTemp(zip_file, ['Test1']) 708 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 709 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 710 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 711 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 712 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 713 714 unzipped_dir = common.UnzipTemp(zip_file, ['Test1', 'Foo3']) 715 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 716 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 717 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 718 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 719 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 720 721 unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Foo3*']) 722 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 723 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 724 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 725 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 726 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 727 728 unzipped_dir = common.UnzipTemp(zip_file, ['*Test1', '*Baz*']) 729 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 730 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 731 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 732 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 733 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 734 735 def test_UnzipTemp_withEmptyPatterns(self): 736 zip_file = self._test_UnzipTemp_createZipFile() 737 unzipped_dir = common.UnzipTemp(zip_file, []) 738 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 739 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 740 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 741 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 742 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 743 744 @test_utils.SkipIfExternalToolsUnavailable() 745 def test_UnzipTemp_withPartiallyMatchingPatterns(self): 746 zip_file = self._test_UnzipTemp_createZipFile() 747 unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Nonexistent*']) 748 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 749 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 750 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 751 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 752 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 753 754 def test_UnzipTemp_withNoMatchingPatterns(self): 755 zip_file = self._test_UnzipTemp_createZipFile() 756 unzipped_dir = common.UnzipTemp(zip_file, ['Foo4', 'Nonexistent*']) 757 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 758 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 759 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 760 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 761 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 762 763 764class CommonApkUtilsTest(test_utils.ReleaseToolsTestCase): 765 """Tests the APK utils related functions.""" 766 767 APKCERTS_TXT1 = ( 768 'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"' 769 ' private_key="certs/devkey.pk8"\n' 770 'name="Settings.apk"' 771 ' certificate="build/make/target/product/security/platform.x509.pem"' 772 ' private_key="build/make/target/product/security/platform.pk8"\n' 773 'name="TV.apk" certificate="PRESIGNED" private_key=""\n' 774 ) 775 776 APKCERTS_CERTMAP1 = { 777 'RecoveryLocalizer.apk' : 'certs/devkey', 778 'Settings.apk' : 'build/make/target/product/security/platform', 779 'TV.apk' : 'PRESIGNED', 780 } 781 782 APKCERTS_TXT2 = ( 783 'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"' 784 ' private_key="certs/compressed1.pk8" compressed="gz"\n' 785 'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"' 786 ' private_key="certs/compressed2.pk8" compressed="gz"\n' 787 'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"' 788 ' private_key="certs/compressed2.pk8" compressed="gz"\n' 789 'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"' 790 ' private_key="certs/compressed3.pk8" compressed="gz"\n' 791 ) 792 793 APKCERTS_CERTMAP2 = { 794 'Compressed1.apk' : 'certs/compressed1', 795 'Compressed2a.apk' : 'certs/compressed2', 796 'Compressed2b.apk' : 'certs/compressed2', 797 'Compressed3.apk' : 'certs/compressed3', 798 } 799 800 APKCERTS_TXT3 = ( 801 'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"' 802 ' private_key="certs/compressed4.pk8" compressed="xz"\n' 803 ) 804 805 APKCERTS_CERTMAP3 = { 806 'Compressed4.apk' : 'certs/compressed4', 807 } 808 809 # Test parsing with no optional fields, both optional fields, and only the 810 # partition optional field. 811 APKCERTS_TXT4 = ( 812 'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"' 813 ' private_key="certs/devkey.pk8"\n' 814 'name="Settings.apk"' 815 ' certificate="build/make/target/product/security/platform.x509.pem"' 816 ' private_key="build/make/target/product/security/platform.pk8"' 817 ' compressed="gz" partition="system"\n' 818 'name="TV.apk" certificate="PRESIGNED" private_key=""' 819 ' partition="product"\n' 820 ) 821 822 APKCERTS_CERTMAP4 = { 823 'RecoveryLocalizer.apk' : 'certs/devkey', 824 'Settings.apk' : 'build/make/target/product/security/platform', 825 'TV.apk' : 'PRESIGNED', 826 } 827 828 def setUp(self): 829 self.testdata_dir = test_utils.get_testdata_dir() 830 831 @staticmethod 832 def _write_apkcerts_txt(apkcerts_txt, additional=None): 833 if additional is None: 834 additional = [] 835 target_files = common.MakeTempFile(suffix='.zip') 836 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 837 target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt) 838 for entry in additional: 839 target_files_zip.writestr(entry, '') 840 return target_files 841 842 def test_ReadApkCerts_NoncompressedApks(self): 843 target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1) 844 with zipfile.ZipFile(target_files, 'r') as input_zip: 845 certmap, ext = common.ReadApkCerts(input_zip) 846 847 self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap) 848 self.assertIsNone(ext) 849 850 def test_ReadApkCerts_CompressedApks(self): 851 # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is 852 # not stored in '.gz' format, so it shouldn't be considered as installed. 853 target_files = self._write_apkcerts_txt( 854 self.APKCERTS_TXT2, 855 ['Compressed1.apk.gz', 'Compressed3.apk']) 856 857 with zipfile.ZipFile(target_files, 'r') as input_zip: 858 certmap, ext = common.ReadApkCerts(input_zip) 859 860 self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap) 861 self.assertEqual('.gz', ext) 862 863 # Alternative case with '.xz'. 864 target_files = self._write_apkcerts_txt( 865 self.APKCERTS_TXT3, ['Compressed4.apk.xz']) 866 867 with zipfile.ZipFile(target_files, 'r') as input_zip: 868 certmap, ext = common.ReadApkCerts(input_zip) 869 870 self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap) 871 self.assertEqual('.xz', ext) 872 873 def test_ReadApkCerts_CompressedAndNoncompressedApks(self): 874 target_files = self._write_apkcerts_txt( 875 self.APKCERTS_TXT1 + self.APKCERTS_TXT2, 876 ['Compressed1.apk.gz', 'Compressed3.apk']) 877 878 with zipfile.ZipFile(target_files, 'r') as input_zip: 879 certmap, ext = common.ReadApkCerts(input_zip) 880 881 certmap_merged = self.APKCERTS_CERTMAP1.copy() 882 certmap_merged.update(self.APKCERTS_CERTMAP2) 883 self.assertDictEqual(certmap_merged, certmap) 884 self.assertEqual('.gz', ext) 885 886 def test_ReadApkCerts_MultipleCompressionMethods(self): 887 target_files = self._write_apkcerts_txt( 888 self.APKCERTS_TXT2 + self.APKCERTS_TXT3, 889 ['Compressed1.apk.gz', 'Compressed4.apk.xz']) 890 891 with zipfile.ZipFile(target_files, 'r') as input_zip: 892 self.assertRaises(ValueError, common.ReadApkCerts, input_zip) 893 894 def test_ReadApkCerts_MismatchingKeys(self): 895 malformed_apkcerts_txt = ( 896 'name="App1.apk" certificate="certs/cert1.x509.pem"' 897 ' private_key="certs/cert2.pk8"\n' 898 ) 899 target_files = self._write_apkcerts_txt(malformed_apkcerts_txt) 900 901 with zipfile.ZipFile(target_files, 'r') as input_zip: 902 self.assertRaises(ValueError, common.ReadApkCerts, input_zip) 903 904 def test_ReadApkCerts_WithWithoutOptionalFields(self): 905 target_files = self._write_apkcerts_txt(self.APKCERTS_TXT4) 906 with zipfile.ZipFile(target_files, 'r') as input_zip: 907 certmap, ext = common.ReadApkCerts(input_zip) 908 909 self.assertDictEqual(self.APKCERTS_CERTMAP4, certmap) 910 self.assertIsNone(ext) 911 912 def test_ExtractPublicKey(self): 913 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem') 914 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem') 915 with open(pubkey) as pubkey_fp: 916 self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert)) 917 918 def test_ExtractPublicKey_invalidInput(self): 919 wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8') 920 self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input) 921 922 @test_utils.SkipIfExternalToolsUnavailable() 923 def test_ExtractAvbPublicKey(self): 924 privkey = os.path.join(self.testdata_dir, 'testkey.key') 925 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem') 926 extracted_from_privkey = common.ExtractAvbPublicKey('avbtool', privkey) 927 extracted_from_pubkey = common.ExtractAvbPublicKey('avbtool', pubkey) 928 with open(extracted_from_privkey, 'rb') as privkey_fp, \ 929 open(extracted_from_pubkey, 'rb') as pubkey_fp: 930 self.assertEqual(privkey_fp.read(), pubkey_fp.read()) 931 932 def test_ParseCertificate(self): 933 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem') 934 935 cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER'] 936 proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, 937 universal_newlines=False) 938 expected, _ = proc.communicate() 939 self.assertEqual(0, proc.returncode) 940 941 with open(cert) as cert_fp: 942 actual = common.ParseCertificate(cert_fp.read()) 943 self.assertEqual(expected, actual) 944 945 @test_utils.SkipIfExternalToolsUnavailable() 946 def test_GetMinSdkVersion(self): 947 test_app = os.path.join(self.testdata_dir, 'TestApp.apk') 948 self.assertEqual('24', common.GetMinSdkVersion(test_app)) 949 950 @test_utils.SkipIfExternalToolsUnavailable() 951 def test_GetMinSdkVersion_invalidInput(self): 952 self.assertRaises( 953 common.ExternalError, common.GetMinSdkVersion, 'does-not-exist.apk') 954 955 @test_utils.SkipIfExternalToolsUnavailable() 956 def test_GetMinSdkVersionInt(self): 957 test_app = os.path.join(self.testdata_dir, 'TestApp.apk') 958 self.assertEqual(24, common.GetMinSdkVersionInt(test_app, {})) 959 960 @test_utils.SkipIfExternalToolsUnavailable() 961 def test_GetMinSdkVersionInt_invalidInput(self): 962 self.assertRaises( 963 common.ExternalError, common.GetMinSdkVersionInt, 'does-not-exist.apk', 964 {}) 965 966 967class CommonUtilsTest(test_utils.ReleaseToolsTestCase): 968 969 def setUp(self): 970 self.testdata_dir = test_utils.get_testdata_dir() 971 972 @test_utils.SkipIfExternalToolsUnavailable() 973 def test_GetSparseImage_emptyBlockMapFile(self): 974 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 975 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 976 target_files_zip.write( 977 test_utils.construct_sparse_image([ 978 (0xCAC1, 6), 979 (0xCAC3, 3), 980 (0xCAC1, 4)]), 981 arcname='IMAGES/system.img') 982 target_files_zip.writestr('IMAGES/system.map', '') 983 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8)) 984 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 985 986 tempdir = common.UnzipTemp(target_files) 987 with zipfile.ZipFile(target_files, 'r') as input_zip: 988 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False) 989 990 self.assertDictEqual( 991 { 992 '__COPY': RangeSet("0"), 993 '__NONZERO-0': RangeSet("1-5 9-12"), 994 }, 995 sparse_image.file_map) 996 997 def test_GetSparseImage_missingImageFile(self): 998 self.assertRaises( 999 AssertionError, common.GetSparseImage, 'system2', self.testdata_dir, 1000 None, False) 1001 self.assertRaises( 1002 AssertionError, common.GetSparseImage, 'unknown', self.testdata_dir, 1003 None, False) 1004 1005 @test_utils.SkipIfExternalToolsUnavailable() 1006 def test_GetSparseImage_missingBlockMapFile(self): 1007 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 1008 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 1009 target_files_zip.write( 1010 test_utils.construct_sparse_image([ 1011 (0xCAC1, 6), 1012 (0xCAC3, 3), 1013 (0xCAC1, 4)]), 1014 arcname='IMAGES/system.img') 1015 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8)) 1016 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 1017 1018 tempdir = common.UnzipTemp(target_files) 1019 with zipfile.ZipFile(target_files, 'r') as input_zip: 1020 self.assertRaises( 1021 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip, 1022 False) 1023 1024 @test_utils.SkipIfExternalToolsUnavailable() 1025 def test_GetSparseImage_sharedBlocks_notAllowed(self): 1026 """Tests the case of having overlapping blocks but disallowed.""" 1027 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 1028 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 1029 target_files_zip.write( 1030 test_utils.construct_sparse_image([(0xCAC2, 16)]), 1031 arcname='IMAGES/system.img') 1032 # Block 10 is shared between two files. 1033 target_files_zip.writestr( 1034 'IMAGES/system.map', 1035 '\n'.join([ 1036 '/system/file1 1-5 9-10', 1037 '/system/file2 10-12'])) 1038 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 1039 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 1040 1041 tempdir = common.UnzipTemp(target_files) 1042 with zipfile.ZipFile(target_files, 'r') as input_zip: 1043 self.assertRaises( 1044 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip, 1045 False) 1046 1047 @test_utils.SkipIfExternalToolsUnavailable() 1048 def test_GetSparseImage_sharedBlocks_allowed(self): 1049 """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true.""" 1050 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 1051 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 1052 # Construct an image with a care_map of "0-5 9-12". 1053 target_files_zip.write( 1054 test_utils.construct_sparse_image([(0xCAC2, 16)]), 1055 arcname='IMAGES/system.img') 1056 # Block 10 is shared between two files. 1057 target_files_zip.writestr( 1058 'IMAGES/system.map', 1059 '\n'.join([ 1060 '/system/file1 1-5 9-10', 1061 '/system/file2 10-12'])) 1062 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 1063 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 1064 1065 tempdir = common.UnzipTemp(target_files) 1066 with zipfile.ZipFile(target_files, 'r') as input_zip: 1067 sparse_image = common.GetSparseImage('system', tempdir, input_zip, True) 1068 1069 self.assertDictEqual( 1070 { 1071 '__COPY': RangeSet("0"), 1072 '__NONZERO-0': RangeSet("6-8 13-15"), 1073 '/system/file1': RangeSet("1-5 9-10"), 1074 '/system/file2': RangeSet("11-12"), 1075 }, 1076 sparse_image.file_map) 1077 1078 # '/system/file2' should be marked with 'uses_shared_blocks', but not with 1079 # 'incomplete'. 1080 self.assertTrue( 1081 sparse_image.file_map['/system/file2'].extra['uses_shared_blocks']) 1082 self.assertNotIn( 1083 'incomplete', sparse_image.file_map['/system/file2'].extra) 1084 1085 # '/system/file1' will only contain one field -- a copy of the input text. 1086 self.assertEqual(1, len(sparse_image.file_map['/system/file1'].extra)) 1087 1088 # Meta entries should not have any extra tag. 1089 self.assertFalse(sparse_image.file_map['__COPY'].extra) 1090 self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra) 1091 1092 @test_utils.SkipIfExternalToolsUnavailable() 1093 def test_GetSparseImage_incompleteRanges(self): 1094 """Tests the case of ext4 images with holes.""" 1095 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 1096 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 1097 target_files_zip.write( 1098 test_utils.construct_sparse_image([(0xCAC2, 16)]), 1099 arcname='IMAGES/system.img') 1100 target_files_zip.writestr( 1101 'IMAGES/system.map', 1102 '\n'.join([ 1103 '/system/file1 1-5 9-10', 1104 '/system/file2 11-12'])) 1105 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 1106 # '/system/file2' has less blocks listed (2) than actual (3). 1107 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 1108 1109 tempdir = common.UnzipTemp(target_files) 1110 with zipfile.ZipFile(target_files, 'r') as input_zip: 1111 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False) 1112 1113 self.assertEqual( 1114 '1-5 9-10', 1115 sparse_image.file_map['/system/file1'].extra['text_str']) 1116 self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete']) 1117 1118 @test_utils.SkipIfExternalToolsUnavailable() 1119 def test_GetSparseImage_systemRootImage_filenameWithExtraLeadingSlash(self): 1120 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 1121 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 1122 target_files_zip.write( 1123 test_utils.construct_sparse_image([(0xCAC2, 16)]), 1124 arcname='IMAGES/system.img') 1125 target_files_zip.writestr( 1126 'IMAGES/system.map', 1127 '\n'.join([ 1128 '//system/file1 1-5 9-10', 1129 '//system/file2 11-12', 1130 '/system/app/file3 13-15'])) 1131 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 1132 # '/system/file2' has less blocks listed (2) than actual (3). 1133 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 1134 # '/system/app/file3' has less blocks listed (3) than actual (4). 1135 target_files_zip.writestr('SYSTEM/app/file3', os.urandom(4096 * 4)) 1136 1137 tempdir = common.UnzipTemp(target_files) 1138 with zipfile.ZipFile(target_files, 'r') as input_zip: 1139 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False) 1140 1141 self.assertEqual( 1142 '1-5 9-10', 1143 sparse_image.file_map['//system/file1'].extra['text_str']) 1144 self.assertTrue(sparse_image.file_map['//system/file2'].extra['incomplete']) 1145 self.assertTrue( 1146 sparse_image.file_map['/system/app/file3'].extra['incomplete']) 1147 1148 @test_utils.SkipIfExternalToolsUnavailable() 1149 def test_GetSparseImage_systemRootImage_nonSystemFiles(self): 1150 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 1151 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 1152 target_files_zip.write( 1153 test_utils.construct_sparse_image([(0xCAC2, 16)]), 1154 arcname='IMAGES/system.img') 1155 target_files_zip.writestr( 1156 'IMAGES/system.map', 1157 '\n'.join([ 1158 '//system/file1 1-5 9-10', 1159 '//init.rc 13-15'])) 1160 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 1161 # '/init.rc' has less blocks listed (3) than actual (4). 1162 target_files_zip.writestr('ROOT/init.rc', os.urandom(4096 * 4)) 1163 1164 tempdir = common.UnzipTemp(target_files) 1165 with zipfile.ZipFile(target_files, 'r') as input_zip: 1166 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False) 1167 1168 self.assertEqual( 1169 '1-5 9-10', 1170 sparse_image.file_map['//system/file1'].extra['text_str']) 1171 self.assertTrue(sparse_image.file_map['//init.rc'].extra['incomplete']) 1172 1173 @test_utils.SkipIfExternalToolsUnavailable() 1174 def test_GetSparseImage_fileNotFound(self): 1175 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 1176 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 1177 target_files_zip.write( 1178 test_utils.construct_sparse_image([(0xCAC2, 16)]), 1179 arcname='IMAGES/system.img') 1180 target_files_zip.writestr( 1181 'IMAGES/system.map', 1182 '\n'.join([ 1183 '//system/file1 1-5 9-10', 1184 '//system/file2 11-12'])) 1185 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 1186 1187 tempdir = common.UnzipTemp(target_files) 1188 with zipfile.ZipFile(target_files, 'r') as input_zip: 1189 self.assertRaises( 1190 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip, 1191 False) 1192 1193 @test_utils.SkipIfExternalToolsUnavailable() 1194 def test_GetAvbChainedPartitionArg(self): 1195 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem') 1196 info_dict = { 1197 'avb_avbtool': 'avbtool', 1198 'avb_system_key_path': pubkey, 1199 'avb_system_rollback_index_location': 2, 1200 } 1201 args = common.GetAvbChainedPartitionArg('system', info_dict).split(':') 1202 self.assertEqual(3, len(args)) 1203 self.assertEqual('system', args[0]) 1204 self.assertEqual('2', args[1]) 1205 self.assertTrue(os.path.exists(args[2])) 1206 1207 @test_utils.SkipIfExternalToolsUnavailable() 1208 def test_GetAvbChainedPartitionArg_withPrivateKey(self): 1209 key = os.path.join(self.testdata_dir, 'testkey.key') 1210 info_dict = { 1211 'avb_avbtool': 'avbtool', 1212 'avb_product_key_path': key, 1213 'avb_product_rollback_index_location': 2, 1214 } 1215 args = common.GetAvbChainedPartitionArg('product', info_dict).split(':') 1216 self.assertEqual(3, len(args)) 1217 self.assertEqual('product', args[0]) 1218 self.assertEqual('2', args[1]) 1219 self.assertTrue(os.path.exists(args[2])) 1220 1221 @test_utils.SkipIfExternalToolsUnavailable() 1222 def test_GetAvbChainedPartitionArg_withSpecifiedKey(self): 1223 info_dict = { 1224 'avb_avbtool': 'avbtool', 1225 'avb_system_key_path': 'does-not-exist', 1226 'avb_system_rollback_index_location': 2, 1227 } 1228 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem') 1229 args = common.GetAvbChainedPartitionArg( 1230 'system', info_dict, pubkey).split(':') 1231 self.assertEqual(3, len(args)) 1232 self.assertEqual('system', args[0]) 1233 self.assertEqual('2', args[1]) 1234 self.assertTrue(os.path.exists(args[2])) 1235 1236 @test_utils.SkipIfExternalToolsUnavailable() 1237 def test_GetAvbChainedPartitionArg_invalidKey(self): 1238 pubkey = os.path.join(self.testdata_dir, 'testkey_with_passwd.x509.pem') 1239 info_dict = { 1240 'avb_avbtool': 'avbtool', 1241 'avb_system_key_path': pubkey, 1242 'avb_system_rollback_index_location': 2, 1243 } 1244 self.assertRaises( 1245 common.ExternalError, common.GetAvbChainedPartitionArg, 'system', 1246 info_dict) 1247 1248 INFO_DICT_DEFAULT = { 1249 'recovery_api_version': 3, 1250 'fstab_version': 2, 1251 'system_root_image': 'true', 1252 'no_recovery' : 'true', 1253 'recovery_as_boot': 'true', 1254 } 1255 1256 def test_LoadListFromFile(self): 1257 file_path = os.path.join(self.testdata_dir, 1258 'merge_config_framework_item_list') 1259 contents = common.LoadListFromFile(file_path) 1260 expected_contents = [ 1261 'META/apkcerts.txt', 1262 'META/filesystem_config.txt', 1263 'META/root_filesystem_config.txt', 1264 'META/system_manifest.xml', 1265 'META/system_matrix.xml', 1266 'META/update_engine_config.txt', 1267 'PRODUCT/*', 1268 'ROOT/*', 1269 'SYSTEM/*', 1270 ] 1271 self.assertEqual(sorted(contents), sorted(expected_contents)) 1272 1273 @staticmethod 1274 def _test_LoadInfoDict_createTargetFiles(info_dict, fstab_path): 1275 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 1276 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 1277 info_values = ''.join( 1278 ['{}={}\n'.format(k, v) for k, v in sorted(info_dict.items())]) 1279 common.ZipWriteStr(target_files_zip, 'META/misc_info.txt', info_values) 1280 1281 FSTAB_TEMPLATE = "/dev/block/system {} ext4 ro,barrier=1 defaults" 1282 if info_dict.get('system_root_image') == 'true': 1283 fstab_values = FSTAB_TEMPLATE.format('/') 1284 else: 1285 fstab_values = FSTAB_TEMPLATE.format('/system') 1286 common.ZipWriteStr(target_files_zip, fstab_path, fstab_values) 1287 1288 common.ZipWriteStr( 1289 target_files_zip, 'META/file_contexts', 'file-contexts') 1290 return target_files 1291 1292 def test_LoadInfoDict(self): 1293 target_files = self._test_LoadInfoDict_createTargetFiles( 1294 self.INFO_DICT_DEFAULT, 1295 'BOOT/RAMDISK/system/etc/recovery.fstab') 1296 with zipfile.ZipFile(target_files, 'r') as target_files_zip: 1297 loaded_dict = common.LoadInfoDict(target_files_zip) 1298 self.assertEqual(3, loaded_dict['recovery_api_version']) 1299 self.assertEqual(2, loaded_dict['fstab_version']) 1300 self.assertIn('/', loaded_dict['fstab']) 1301 self.assertIn('/system', loaded_dict['fstab']) 1302 1303 def test_LoadInfoDict_legacyRecoveryFstabPath(self): 1304 target_files = self._test_LoadInfoDict_createTargetFiles( 1305 self.INFO_DICT_DEFAULT, 1306 'BOOT/RAMDISK/etc/recovery.fstab') 1307 with zipfile.ZipFile(target_files, 'r') as target_files_zip: 1308 loaded_dict = common.LoadInfoDict(target_files_zip) 1309 self.assertEqual(3, loaded_dict['recovery_api_version']) 1310 self.assertEqual(2, loaded_dict['fstab_version']) 1311 self.assertIn('/', loaded_dict['fstab']) 1312 self.assertIn('/system', loaded_dict['fstab']) 1313 1314 @test_utils.SkipIfExternalToolsUnavailable() 1315 def test_LoadInfoDict_dirInput(self): 1316 target_files = self._test_LoadInfoDict_createTargetFiles( 1317 self.INFO_DICT_DEFAULT, 1318 'BOOT/RAMDISK/system/etc/recovery.fstab') 1319 unzipped = common.UnzipTemp(target_files) 1320 loaded_dict = common.LoadInfoDict(unzipped) 1321 self.assertEqual(3, loaded_dict['recovery_api_version']) 1322 self.assertEqual(2, loaded_dict['fstab_version']) 1323 self.assertIn('/', loaded_dict['fstab']) 1324 self.assertIn('/system', loaded_dict['fstab']) 1325 1326 @test_utils.SkipIfExternalToolsUnavailable() 1327 def test_LoadInfoDict_dirInput_legacyRecoveryFstabPath(self): 1328 target_files = self._test_LoadInfoDict_createTargetFiles( 1329 self.INFO_DICT_DEFAULT, 1330 'BOOT/RAMDISK/system/etc/recovery.fstab') 1331 unzipped = common.UnzipTemp(target_files) 1332 loaded_dict = common.LoadInfoDict(unzipped) 1333 self.assertEqual(3, loaded_dict['recovery_api_version']) 1334 self.assertEqual(2, loaded_dict['fstab_version']) 1335 self.assertIn('/', loaded_dict['fstab']) 1336 self.assertIn('/system', loaded_dict['fstab']) 1337 1338 def test_LoadInfoDict_systemRootImageFalse(self): 1339 # Devices not using system-as-root nor recovery-as-boot. Non-A/B devices 1340 # launched prior to P will likely have this config. 1341 info_dict = copy.copy(self.INFO_DICT_DEFAULT) 1342 del info_dict['no_recovery'] 1343 del info_dict['system_root_image'] 1344 del info_dict['recovery_as_boot'] 1345 target_files = self._test_LoadInfoDict_createTargetFiles( 1346 info_dict, 1347 'RECOVERY/RAMDISK/system/etc/recovery.fstab') 1348 with zipfile.ZipFile(target_files, 'r') as target_files_zip: 1349 loaded_dict = common.LoadInfoDict(target_files_zip) 1350 self.assertEqual(3, loaded_dict['recovery_api_version']) 1351 self.assertEqual(2, loaded_dict['fstab_version']) 1352 self.assertNotIn('/', loaded_dict['fstab']) 1353 self.assertIn('/system', loaded_dict['fstab']) 1354 1355 def test_LoadInfoDict_recoveryAsBootFalse(self): 1356 # Devices using system-as-root, but with standalone recovery image. Non-A/B 1357 # devices launched since P will likely have this config. 1358 info_dict = copy.copy(self.INFO_DICT_DEFAULT) 1359 del info_dict['no_recovery'] 1360 del info_dict['recovery_as_boot'] 1361 target_files = self._test_LoadInfoDict_createTargetFiles( 1362 info_dict, 1363 'RECOVERY/RAMDISK/system/etc/recovery.fstab') 1364 with zipfile.ZipFile(target_files, 'r') as target_files_zip: 1365 loaded_dict = common.LoadInfoDict(target_files_zip) 1366 self.assertEqual(3, loaded_dict['recovery_api_version']) 1367 self.assertEqual(2, loaded_dict['fstab_version']) 1368 self.assertIn('/', loaded_dict['fstab']) 1369 self.assertIn('/system', loaded_dict['fstab']) 1370 1371 def test_LoadInfoDict_noRecoveryTrue(self): 1372 # Device doesn't have a recovery partition at all. 1373 info_dict = copy.copy(self.INFO_DICT_DEFAULT) 1374 del info_dict['recovery_as_boot'] 1375 target_files = self._test_LoadInfoDict_createTargetFiles( 1376 info_dict, 1377 'RECOVERY/RAMDISK/system/etc/recovery.fstab') 1378 with zipfile.ZipFile(target_files, 'r') as target_files_zip: 1379 loaded_dict = common.LoadInfoDict(target_files_zip) 1380 self.assertEqual(3, loaded_dict['recovery_api_version']) 1381 self.assertEqual(2, loaded_dict['fstab_version']) 1382 self.assertIsNone(loaded_dict['fstab']) 1383 1384 @test_utils.SkipIfExternalToolsUnavailable() 1385 def test_LoadInfoDict_missingMetaMiscInfoTxt(self): 1386 target_files = self._test_LoadInfoDict_createTargetFiles( 1387 self.INFO_DICT_DEFAULT, 1388 'BOOT/RAMDISK/system/etc/recovery.fstab') 1389 common.ZipDelete(target_files, 'META/misc_info.txt') 1390 with zipfile.ZipFile(target_files, 'r') as target_files_zip: 1391 self.assertRaises(ValueError, common.LoadInfoDict, target_files_zip) 1392 1393 @test_utils.SkipIfExternalToolsUnavailable() 1394 def test_LoadInfoDict_repacking(self): 1395 target_files = self._test_LoadInfoDict_createTargetFiles( 1396 self.INFO_DICT_DEFAULT, 1397 'BOOT/RAMDISK/system/etc/recovery.fstab') 1398 unzipped = common.UnzipTemp(target_files) 1399 loaded_dict = common.LoadInfoDict(unzipped, True) 1400 self.assertEqual(3, loaded_dict['recovery_api_version']) 1401 self.assertEqual(2, loaded_dict['fstab_version']) 1402 self.assertIn('/', loaded_dict['fstab']) 1403 self.assertIn('/system', loaded_dict['fstab']) 1404 self.assertEqual( 1405 os.path.join(unzipped, 'ROOT'), loaded_dict['root_dir']) 1406 self.assertEqual( 1407 os.path.join(unzipped, 'META', 'root_filesystem_config.txt'), 1408 loaded_dict['root_fs_config']) 1409 1410 def test_LoadInfoDict_repackingWithZipFileInput(self): 1411 target_files = self._test_LoadInfoDict_createTargetFiles( 1412 self.INFO_DICT_DEFAULT, 1413 'BOOT/RAMDISK/system/etc/recovery.fstab') 1414 with zipfile.ZipFile(target_files, 'r') as target_files_zip: 1415 self.assertRaises( 1416 AssertionError, common.LoadInfoDict, target_files_zip, True) 1417 1418 def test_MergeDynamicPartitionInfoDicts_ReturnsMergedDict(self): 1419 framework_dict = { 1420 'super_partition_groups': 'group_a', 1421 'dynamic_partition_list': 'system', 1422 'super_group_a_partition_list': 'system', 1423 } 1424 vendor_dict = { 1425 'super_partition_groups': 'group_a group_b', 1426 'dynamic_partition_list': 'vendor product', 1427 'super_group_a_partition_list': 'vendor', 1428 'super_group_a_group_size': '1000', 1429 'super_group_b_partition_list': 'product', 1430 'super_group_b_group_size': '2000', 1431 } 1432 merged_dict = common.MergeDynamicPartitionInfoDicts( 1433 framework_dict=framework_dict, 1434 vendor_dict=vendor_dict) 1435 expected_merged_dict = { 1436 'super_partition_groups': 'group_a group_b', 1437 'dynamic_partition_list': 'system vendor product', 1438 'super_group_a_partition_list': 'system vendor', 1439 'super_group_a_group_size': '1000', 1440 'super_group_b_partition_list': 'product', 1441 'super_group_b_group_size': '2000', 1442 } 1443 self.assertEqual(merged_dict, expected_merged_dict) 1444 1445 def test_MergeDynamicPartitionInfoDicts_IgnoringFrameworkGroupSize(self): 1446 framework_dict = { 1447 'super_partition_groups': 'group_a', 1448 'dynamic_partition_list': 'system', 1449 'super_group_a_partition_list': 'system', 1450 'super_group_a_group_size': '5000', 1451 } 1452 vendor_dict = { 1453 'super_partition_groups': 'group_a group_b', 1454 'dynamic_partition_list': 'vendor product', 1455 'super_group_a_partition_list': 'vendor', 1456 'super_group_a_group_size': '1000', 1457 'super_group_b_partition_list': 'product', 1458 'super_group_b_group_size': '2000', 1459 } 1460 merged_dict = common.MergeDynamicPartitionInfoDicts( 1461 framework_dict=framework_dict, 1462 vendor_dict=vendor_dict) 1463 expected_merged_dict = { 1464 'super_partition_groups': 'group_a group_b', 1465 'dynamic_partition_list': 'system vendor product', 1466 'super_group_a_partition_list': 'system vendor', 1467 'super_group_a_group_size': '1000', 1468 'super_group_b_partition_list': 'product', 1469 'super_group_b_group_size': '2000', 1470 } 1471 self.assertEqual(merged_dict, expected_merged_dict) 1472 1473 def test_GetAvbPartitionArg(self): 1474 info_dict = {} 1475 cmd = common.GetAvbPartitionArg('system', '/path/to/system.img', info_dict) 1476 self.assertEqual( 1477 ['--include_descriptors_from_image', '/path/to/system.img'], cmd) 1478 1479 @test_utils.SkipIfExternalToolsUnavailable() 1480 def test_AppendVBMetaArgsForPartition_vendorAsChainedPartition(self): 1481 testdata_dir = test_utils.get_testdata_dir() 1482 pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem') 1483 info_dict = { 1484 'avb_avbtool': 'avbtool', 1485 'avb_vendor_key_path': pubkey, 1486 'avb_vendor_rollback_index_location': 5, 1487 } 1488 cmd = common.GetAvbPartitionArg('vendor', '/path/to/vendor.img', info_dict) 1489 self.assertEqual(2, len(cmd)) 1490 self.assertEqual('--chain_partition', cmd[0]) 1491 chained_partition_args = cmd[1].split(':') 1492 self.assertEqual(3, len(chained_partition_args)) 1493 self.assertEqual('vendor', chained_partition_args[0]) 1494 self.assertEqual('5', chained_partition_args[1]) 1495 self.assertTrue(os.path.exists(chained_partition_args[2])) 1496 1497 @test_utils.SkipIfExternalToolsUnavailable() 1498 def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_nonAb(self): 1499 testdata_dir = test_utils.get_testdata_dir() 1500 pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem') 1501 info_dict = { 1502 'avb_avbtool': 'avbtool', 1503 'avb_recovery_key_path': pubkey, 1504 'avb_recovery_rollback_index_location': 3, 1505 } 1506 cmd = common.GetAvbPartitionArg( 1507 'recovery', '/path/to/recovery.img', info_dict) 1508 self.assertFalse(cmd) 1509 1510 @test_utils.SkipIfExternalToolsUnavailable() 1511 def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_ab(self): 1512 testdata_dir = test_utils.get_testdata_dir() 1513 pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem') 1514 info_dict = { 1515 'ab_update': 'true', 1516 'avb_avbtool': 'avbtool', 1517 'avb_recovery_key_path': pubkey, 1518 'avb_recovery_rollback_index_location': 3, 1519 } 1520 cmd = common.GetAvbPartitionArg( 1521 'recovery', '/path/to/recovery.img', info_dict) 1522 self.assertEqual(2, len(cmd)) 1523 self.assertEqual('--chain_partition', cmd[0]) 1524 chained_partition_args = cmd[1].split(':') 1525 self.assertEqual(3, len(chained_partition_args)) 1526 self.assertEqual('recovery', chained_partition_args[0]) 1527 self.assertEqual('3', chained_partition_args[1]) 1528 self.assertTrue(os.path.exists(chained_partition_args[2])) 1529 1530 1531class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase): 1532 """Checks the format of install-recovery.sh. 1533 1534 Its format should match between common.py and validate_target_files.py. 1535 """ 1536 1537 def setUp(self): 1538 self._tempdir = common.MakeTempDir() 1539 # Create a dummy dict that contains the fstab info for boot&recovery. 1540 self._info = {"fstab" : {}} 1541 dummy_fstab = [ 1542 "/dev/soc.0/by-name/boot /boot emmc defaults defaults", 1543 "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"] 1544 self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab) 1545 # Construct the gzipped recovery.img and boot.img 1546 self.recovery_data = bytearray([ 1547 0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a, 1548 0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3, 1549 0x08, 0x00, 0x00, 0x00 1550 ]) 1551 # echo -n "boot" | gzip -f | hd 1552 self.boot_data = bytearray([ 1553 0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca, 1554 0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00 1555 ]) 1556 1557 def _out_tmp_sink(self, name, data, prefix="SYSTEM"): 1558 loc = os.path.join(self._tempdir, prefix, name) 1559 if not os.path.exists(os.path.dirname(loc)): 1560 os.makedirs(os.path.dirname(loc)) 1561 with open(loc, "wb") as f: 1562 f.write(data) 1563 1564 def test_full_recovery(self): 1565 recovery_image = common.File("recovery.img", self.recovery_data) 1566 boot_image = common.File("boot.img", self.boot_data) 1567 self._info["full_recovery_image"] = "true" 1568 1569 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink, 1570 recovery_image, boot_image, self._info) 1571 validate_target_files.ValidateInstallRecoveryScript(self._tempdir, 1572 self._info) 1573 1574 @test_utils.SkipIfExternalToolsUnavailable() 1575 def test_recovery_from_boot(self): 1576 recovery_image = common.File("recovery.img", self.recovery_data) 1577 self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES") 1578 boot_image = common.File("boot.img", self.boot_data) 1579 self._out_tmp_sink("boot.img", boot_image.data, "IMAGES") 1580 1581 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink, 1582 recovery_image, boot_image, self._info) 1583 validate_target_files.ValidateInstallRecoveryScript(self._tempdir, 1584 self._info) 1585 # Validate 'recovery-from-boot' with bonus argument. 1586 self._out_tmp_sink("etc/recovery-resource.dat", b"bonus", "SYSTEM") 1587 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink, 1588 recovery_image, boot_image, self._info) 1589 validate_target_files.ValidateInstallRecoveryScript(self._tempdir, 1590 self._info) 1591 1592 1593class MockBlockDifference(object): 1594 1595 def __init__(self, partition, tgt, src=None): 1596 self.partition = partition 1597 self.tgt = tgt 1598 self.src = src 1599 1600 def WriteScript(self, script, _, progress=None, 1601 write_verify_script=False): 1602 if progress: 1603 script.AppendExtra("progress({})".format(progress)) 1604 script.AppendExtra("patch({});".format(self.partition)) 1605 if write_verify_script: 1606 self.WritePostInstallVerifyScript(script) 1607 1608 def WritePostInstallVerifyScript(self, script): 1609 script.AppendExtra("verify({});".format(self.partition)) 1610 1611 1612class FakeSparseImage(object): 1613 1614 def __init__(self, size): 1615 self.blocksize = 4096 1616 self.total_blocks = size // 4096 1617 assert size % 4096 == 0, "{} is not a multiple of 4096".format(size) 1618 1619 1620class DynamicPartitionsDifferenceTest(test_utils.ReleaseToolsTestCase): 1621 1622 @staticmethod 1623 def get_op_list(output_path): 1624 with zipfile.ZipFile(output_path) as output_zip: 1625 with output_zip.open('dynamic_partitions_op_list') as op_list: 1626 return [line.decode().strip() for line in op_list.readlines() 1627 if not line.startswith(b'#')] 1628 1629 def setUp(self): 1630 self.script = test_utils.MockScriptWriter() 1631 self.output_path = common.MakeTempFile(suffix='.zip') 1632 1633 def test_full(self): 1634 target_info = common.LoadDictionaryFromLines(""" 1635dynamic_partition_list=system vendor 1636super_partition_groups=group_foo 1637super_group_foo_group_size={group_size} 1638super_group_foo_partition_list=system vendor 1639""".format(group_size=4 * GiB).split("\n")) 1640 block_diffs = [MockBlockDifference("system", FakeSparseImage(3 * GiB)), 1641 MockBlockDifference("vendor", FakeSparseImage(1 * GiB))] 1642 1643 dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs) 1644 with zipfile.ZipFile(self.output_path, 'w') as output_zip: 1645 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True) 1646 1647 self.assertEqual(str(self.script).strip(), """ 1648assert(update_dynamic_partitions(package_extract_file("dynamic_partitions_op_list"))); 1649patch(system); 1650verify(system); 1651unmap_partition("system"); 1652patch(vendor); 1653verify(vendor); 1654unmap_partition("vendor"); 1655""".strip()) 1656 1657 lines = self.get_op_list(self.output_path) 1658 1659 remove_all_groups = lines.index("remove_all_groups") 1660 add_group = lines.index("add_group group_foo 4294967296") 1661 add_vendor = lines.index("add vendor group_foo") 1662 add_system = lines.index("add system group_foo") 1663 resize_vendor = lines.index("resize vendor 1073741824") 1664 resize_system = lines.index("resize system 3221225472") 1665 1666 self.assertLess(remove_all_groups, add_group, 1667 "Should add groups after removing all groups") 1668 self.assertLess(add_group, min(add_vendor, add_system), 1669 "Should add partitions after adding group") 1670 self.assertLess(add_system, resize_system, 1671 "Should resize system after adding it") 1672 self.assertLess(add_vendor, resize_vendor, 1673 "Should resize vendor after adding it") 1674 1675 def test_inc_groups(self): 1676 source_info = common.LoadDictionaryFromLines(""" 1677super_partition_groups=group_foo group_bar group_baz 1678super_group_foo_group_size={group_foo_size} 1679super_group_bar_group_size={group_bar_size} 1680""".format(group_foo_size=4 * GiB, group_bar_size=3 * GiB).split("\n")) 1681 target_info = common.LoadDictionaryFromLines(""" 1682super_partition_groups=group_foo group_baz group_qux 1683super_group_foo_group_size={group_foo_size} 1684super_group_baz_group_size={group_baz_size} 1685super_group_qux_group_size={group_qux_size} 1686""".format(group_foo_size=3 * GiB, group_baz_size=4 * GiB, 1687 group_qux_size=1 * GiB).split("\n")) 1688 1689 dp_diff = common.DynamicPartitionsDifference(target_info, 1690 block_diffs=[], 1691 source_info_dict=source_info) 1692 with zipfile.ZipFile(self.output_path, 'w') as output_zip: 1693 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True) 1694 1695 lines = self.get_op_list(self.output_path) 1696 1697 removed = lines.index("remove_group group_bar") 1698 shrunk = lines.index("resize_group group_foo 3221225472") 1699 grown = lines.index("resize_group group_baz 4294967296") 1700 added = lines.index("add_group group_qux 1073741824") 1701 1702 self.assertLess(max(removed, shrunk), 1703 min(grown, added), 1704 "ops that remove / shrink partitions must precede ops that " 1705 "grow / add partitions") 1706 1707 def test_incremental(self): 1708 source_info = common.LoadDictionaryFromLines(""" 1709dynamic_partition_list=system vendor product system_ext 1710super_partition_groups=group_foo 1711super_group_foo_group_size={group_foo_size} 1712super_group_foo_partition_list=system vendor product system_ext 1713""".format(group_foo_size=4 * GiB).split("\n")) 1714 target_info = common.LoadDictionaryFromLines(""" 1715dynamic_partition_list=system vendor product odm 1716super_partition_groups=group_foo group_bar 1717super_group_foo_group_size={group_foo_size} 1718super_group_foo_partition_list=system vendor odm 1719super_group_bar_group_size={group_bar_size} 1720super_group_bar_partition_list=product 1721""".format(group_foo_size=3 * GiB, group_bar_size=1 * GiB).split("\n")) 1722 1723 block_diffs = [MockBlockDifference("system", FakeSparseImage(1536 * MiB), 1724 src=FakeSparseImage(1024 * MiB)), 1725 MockBlockDifference("vendor", FakeSparseImage(512 * MiB), 1726 src=FakeSparseImage(1024 * MiB)), 1727 MockBlockDifference("product", FakeSparseImage(1024 * MiB), 1728 src=FakeSparseImage(1024 * MiB)), 1729 MockBlockDifference("system_ext", None, 1730 src=FakeSparseImage(1024 * MiB)), 1731 MockBlockDifference("odm", FakeSparseImage(1024 * MiB), 1732 src=None)] 1733 1734 dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs, 1735 source_info_dict=source_info) 1736 with zipfile.ZipFile(self.output_path, 'w') as output_zip: 1737 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True) 1738 1739 metadata_idx = self.script.lines.index( 1740 'assert(update_dynamic_partitions(package_extract_file(' 1741 '"dynamic_partitions_op_list")));') 1742 self.assertLess(self.script.lines.index('patch(vendor);'), metadata_idx) 1743 self.assertLess(metadata_idx, self.script.lines.index('verify(vendor);')) 1744 for p in ("product", "system", "odm"): 1745 patch_idx = self.script.lines.index("patch({});".format(p)) 1746 verify_idx = self.script.lines.index("verify({});".format(p)) 1747 self.assertLess(metadata_idx, patch_idx, 1748 "Should patch {} after updating metadata".format(p)) 1749 self.assertLess(patch_idx, verify_idx, 1750 "Should verify {} after patching".format(p)) 1751 1752 self.assertNotIn("patch(system_ext);", self.script.lines) 1753 1754 lines = self.get_op_list(self.output_path) 1755 1756 remove = lines.index("remove system_ext") 1757 move_product_out = lines.index("move product default") 1758 shrink = lines.index("resize vendor 536870912") 1759 shrink_group = lines.index("resize_group group_foo 3221225472") 1760 add_group_bar = lines.index("add_group group_bar 1073741824") 1761 add_odm = lines.index("add odm group_foo") 1762 grow_existing = lines.index("resize system 1610612736") 1763 grow_added = lines.index("resize odm 1073741824") 1764 move_product_in = lines.index("move product group_bar") 1765 1766 max_idx_move_partition_out_foo = max(remove, move_product_out, shrink) 1767 min_idx_move_partition_in_foo = min(add_odm, grow_existing, grow_added) 1768 1769 self.assertLess(max_idx_move_partition_out_foo, shrink_group, 1770 "Must shrink group after partitions inside group are shrunk" 1771 " / removed") 1772 1773 self.assertLess(add_group_bar, move_product_in, 1774 "Must add partitions to group after group is added") 1775 1776 self.assertLess(max_idx_move_partition_out_foo, 1777 min_idx_move_partition_in_foo, 1778 "Must shrink partitions / remove partitions from group" 1779 "before adding / moving partitions into group") 1780 1781 def test_remove_partition(self): 1782 source_info = common.LoadDictionaryFromLines(""" 1783blockimgdiff_versions=3,4 1784use_dynamic_partitions=true 1785dynamic_partition_list=foo 1786super_partition_groups=group_foo 1787super_group_foo_group_size={group_foo_size} 1788super_group_foo_partition_list=foo 1789""".format(group_foo_size=4 * GiB).split("\n")) 1790 target_info = common.LoadDictionaryFromLines(""" 1791blockimgdiff_versions=3,4 1792use_dynamic_partitions=true 1793super_partition_groups=group_foo 1794super_group_foo_group_size={group_foo_size} 1795""".format(group_foo_size=4 * GiB).split("\n")) 1796 1797 common.OPTIONS.info_dict = target_info 1798 common.OPTIONS.target_info_dict = target_info 1799 common.OPTIONS.source_info_dict = source_info 1800 common.OPTIONS.cache_size = 4 * 4096 1801 1802 block_diffs = [common.BlockDifference("foo", EmptyImage(), 1803 src=DataImage("source", pad=True))] 1804 1805 dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs, 1806 source_info_dict=source_info) 1807 with zipfile.ZipFile(self.output_path, 'w') as output_zip: 1808 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True) 1809 1810 self.assertNotIn("block_image_update", str(self.script), 1811 "Removed partition should not be patched.") 1812 1813 lines = self.get_op_list(self.output_path) 1814 self.assertEqual(lines, ["remove foo"]) 1815 1816 1817class PartitionBuildPropsTest(test_utils.ReleaseToolsTestCase): 1818 def setUp(self): 1819 self.odm_build_prop = [ 1820 'ro.odm.build.date.utc=1578430045', 1821 'ro.odm.build.fingerprint=' 1822 'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys', 1823 'ro.product.odm.device=coral', 1824 'import /odm/etc/build_${ro.boot.product.device_name}.prop', 1825 ] 1826 1827 @staticmethod 1828 def _BuildZipFile(entries): 1829 input_file = common.MakeTempFile(prefix='target_files-', suffix='.zip') 1830 with zipfile.ZipFile(input_file, 'w') as input_zip: 1831 for name, content in entries.items(): 1832 input_zip.writestr(name, content) 1833 1834 return input_file 1835 1836 def test_parseBuildProps_noImportStatement(self): 1837 build_prop = [ 1838 'ro.odm.build.date.utc=1578430045', 1839 'ro.odm.build.fingerprint=' 1840 'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys', 1841 'ro.product.odm.device=coral', 1842 ] 1843 input_file = self._BuildZipFile({ 1844 'ODM/etc/build.prop': '\n'.join(build_prop), 1845 }) 1846 1847 with zipfile.ZipFile(input_file, 'r') as input_zip: 1848 placeholder_values = { 1849 'ro.boot.product.device_name': ['std', 'pro'] 1850 } 1851 partition_props = common.PartitionBuildProps.FromInputFile( 1852 input_zip, 'odm', placeholder_values) 1853 1854 self.assertEqual({ 1855 'ro.odm.build.date.utc': '1578430045', 1856 'ro.odm.build.fingerprint': 1857 'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys', 1858 'ro.product.odm.device': 'coral', 1859 }, partition_props.build_props) 1860 1861 self.assertEqual(set(), partition_props.prop_overrides) 1862 1863 def test_parseBuildProps_singleImportStatement(self): 1864 build_std_prop = [ 1865 'ro.product.odm.device=coral', 1866 'ro.product.odm.name=product1', 1867 ] 1868 build_pro_prop = [ 1869 'ro.product.odm.device=coralpro', 1870 'ro.product.odm.name=product2', 1871 ] 1872 1873 input_file = self._BuildZipFile({ 1874 'ODM/etc/build.prop': '\n'.join(self.odm_build_prop), 1875 'ODM/etc/build_std.prop': '\n'.join(build_std_prop), 1876 'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop), 1877 }) 1878 1879 with zipfile.ZipFile(input_file, 'r') as input_zip: 1880 placeholder_values = { 1881 'ro.boot.product.device_name': 'std' 1882 } 1883 partition_props = common.PartitionBuildProps.FromInputFile( 1884 input_zip, 'odm', placeholder_values) 1885 1886 self.assertEqual({ 1887 'ro.odm.build.date.utc': '1578430045', 1888 'ro.odm.build.fingerprint': 1889 'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys', 1890 'ro.product.odm.device': 'coral', 1891 'ro.product.odm.name': 'product1', 1892 }, partition_props.build_props) 1893 1894 with zipfile.ZipFile(input_file, 'r') as input_zip: 1895 placeholder_values = { 1896 'ro.boot.product.device_name': 'pro' 1897 } 1898 partition_props = common.PartitionBuildProps.FromInputFile( 1899 input_zip, 'odm', placeholder_values) 1900 1901 self.assertEqual({ 1902 'ro.odm.build.date.utc': '1578430045', 1903 'ro.odm.build.fingerprint': 1904 'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys', 1905 'ro.product.odm.device': 'coralpro', 1906 'ro.product.odm.name': 'product2', 1907 }, partition_props.build_props) 1908 1909 def test_parseBuildProps_noPlaceHolders(self): 1910 build_prop = copy.copy(self.odm_build_prop) 1911 input_file = self._BuildZipFile({ 1912 'ODM/etc/build.prop': '\n'.join(build_prop), 1913 }) 1914 1915 with zipfile.ZipFile(input_file, 'r') as input_zip: 1916 partition_props = common.PartitionBuildProps.FromInputFile( 1917 input_zip, 'odm') 1918 1919 self.assertEqual({ 1920 'ro.odm.build.date.utc': '1578430045', 1921 'ro.odm.build.fingerprint': 1922 'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys', 1923 'ro.product.odm.device': 'coral', 1924 }, partition_props.build_props) 1925 1926 self.assertEqual(set(), partition_props.prop_overrides) 1927 1928 def test_parseBuildProps_multipleImportStatements(self): 1929 build_prop = copy.deepcopy(self.odm_build_prop) 1930 build_prop.append( 1931 'import /odm/etc/build_${ro.boot.product.product_name}.prop') 1932 1933 build_std_prop = [ 1934 'ro.product.odm.device=coral', 1935 ] 1936 build_pro_prop = [ 1937 'ro.product.odm.device=coralpro', 1938 ] 1939 1940 product1_prop = [ 1941 'ro.product.odm.name=product1', 1942 'ro.product.not_care=not_care', 1943 ] 1944 1945 product2_prop = [ 1946 'ro.product.odm.name=product2', 1947 'ro.product.not_care=not_care', 1948 ] 1949 1950 input_file = self._BuildZipFile({ 1951 'ODM/etc/build.prop': '\n'.join(build_prop), 1952 'ODM/etc/build_std.prop': '\n'.join(build_std_prop), 1953 'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop), 1954 'ODM/etc/build_product1.prop': '\n'.join(product1_prop), 1955 'ODM/etc/build_product2.prop': '\n'.join(product2_prop), 1956 }) 1957 1958 with zipfile.ZipFile(input_file, 'r') as input_zip: 1959 placeholder_values = { 1960 'ro.boot.product.device_name': 'std', 1961 'ro.boot.product.product_name': 'product1', 1962 'ro.boot.product.not_care': 'not_care', 1963 } 1964 partition_props = common.PartitionBuildProps.FromInputFile( 1965 input_zip, 'odm', placeholder_values) 1966 1967 self.assertEqual({ 1968 'ro.odm.build.date.utc': '1578430045', 1969 'ro.odm.build.fingerprint': 1970 'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys', 1971 'ro.product.odm.device': 'coral', 1972 'ro.product.odm.name': 'product1' 1973 }, partition_props.build_props) 1974 1975 with zipfile.ZipFile(input_file, 'r') as input_zip: 1976 placeholder_values = { 1977 'ro.boot.product.device_name': 'pro', 1978 'ro.boot.product.product_name': 'product2', 1979 'ro.boot.product.not_care': 'not_care', 1980 } 1981 partition_props = common.PartitionBuildProps.FromInputFile( 1982 input_zip, 'odm', placeholder_values) 1983 1984 self.assertEqual({ 1985 'ro.odm.build.date.utc': '1578430045', 1986 'ro.odm.build.fingerprint': 1987 'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys', 1988 'ro.product.odm.device': 'coralpro', 1989 'ro.product.odm.name': 'product2' 1990 }, partition_props.build_props) 1991 1992 def test_parseBuildProps_defineAfterOverride(self): 1993 build_prop = copy.deepcopy(self.odm_build_prop) 1994 build_prop.append('ro.product.odm.device=coral') 1995 1996 build_std_prop = [ 1997 'ro.product.odm.device=coral', 1998 ] 1999 build_pro_prop = [ 2000 'ro.product.odm.device=coralpro', 2001 ] 2002 2003 input_file = self._BuildZipFile({ 2004 'ODM/etc/build.prop': '\n'.join(build_prop), 2005 'ODM/etc/build_std.prop': '\n'.join(build_std_prop), 2006 'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop), 2007 }) 2008 2009 with zipfile.ZipFile(input_file, 'r') as input_zip: 2010 placeholder_values = { 2011 'ro.boot.product.device_name': 'std', 2012 } 2013 2014 self.assertRaises(ValueError, common.PartitionBuildProps.FromInputFile, 2015 input_zip, 'odm', placeholder_values) 2016 2017 def test_parseBuildProps_duplicateOverride(self): 2018 build_prop = copy.deepcopy(self.odm_build_prop) 2019 build_prop.append( 2020 'import /odm/etc/build_${ro.boot.product.product_name}.prop') 2021 2022 build_std_prop = [ 2023 'ro.product.odm.device=coral', 2024 'ro.product.odm.name=product1', 2025 ] 2026 build_pro_prop = [ 2027 'ro.product.odm.device=coralpro', 2028 ] 2029 2030 product1_prop = [ 2031 'ro.product.odm.name=product1', 2032 ] 2033 2034 product2_prop = [ 2035 'ro.product.odm.name=product2', 2036 ] 2037 2038 input_file = self._BuildZipFile({ 2039 'ODM/etc/build.prop': '\n'.join(build_prop), 2040 'ODM/etc/build_std.prop': '\n'.join(build_std_prop), 2041 'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop), 2042 'ODM/etc/build_product1.prop': '\n'.join(product1_prop), 2043 'ODM/etc/build_product2.prop': '\n'.join(product2_prop), 2044 }) 2045 2046 with zipfile.ZipFile(input_file, 'r') as input_zip: 2047 placeholder_values = { 2048 'ro.boot.product.device_name': 'std', 2049 'ro.boot.product.product_name': 'product1', 2050 } 2051 self.assertRaises(ValueError, common.PartitionBuildProps.FromInputFile, 2052 input_zip, 'odm', placeholder_values) 2053