1# 2# Copyright (C) 2015 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16 17import copy 18import json 19import os 20import subprocess 21import tempfile 22import time 23import unittest 24import zipfile 25from hashlib import sha1 26 27import common 28import test_utils 29import validate_target_files 30from images import EmptyImage, DataImage 31from rangelib import RangeSet 32 33 34KiB = 1024 35MiB = 1024 * KiB 36GiB = 1024 * MiB 37 38 39def get_2gb_string(): 40 size = int(2 * GiB + 1) 41 block_size = 4 * KiB 42 step_size = 4 * MiB 43 # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'. 44 for _ in range(0, size, step_size): 45 yield os.urandom(block_size) 46 yield b'\0' * (step_size - block_size) 47 48 49class BuildInfoTest(test_utils.ReleaseToolsTestCase): 50 51 TEST_INFO_FINGERPRINT_DICT = { 52 'build.prop': common.PartitionBuildProps.FromDictionary( 53 'system', { 54 'ro.product.brand': 'product-brand', 55 'ro.product.name': 'product-name', 56 'ro.product.device': 'product-device', 57 'ro.build.version.release': 'version-release', 58 'ro.build.id': 'build-id', 59 'ro.build.version.incremental': 'version-incremental', 60 'ro.build.type': 'build-type', 61 'ro.build.tags': 'build-tags', 62 'ro.build.version.sdk': 30, 63 } 64 ), 65 } 66 67 TEST_INFO_DICT = { 68 'build.prop': common.PartitionBuildProps.FromDictionary( 69 'system', { 70 'ro.product.device': 'product-device', 71 'ro.product.name': 'product-name', 72 'ro.build.fingerprint': 'build-fingerprint', 73 'ro.build.foo': 'build-foo'} 74 ), 75 'system.build.prop': common.PartitionBuildProps.FromDictionary( 76 'system', { 77 'ro.product.system.brand': 'product-brand', 78 'ro.product.system.name': 'product-name', 79 'ro.product.system.device': 'product-device', 80 'ro.system.build.version.release': 'version-release', 81 'ro.system.build.id': 'build-id', 82 'ro.system.build.version.incremental': 'version-incremental', 83 'ro.system.build.type': 'build-type', 84 'ro.system.build.tags': 'build-tags', 85 'ro.system.build.foo': 'build-foo'} 86 ), 87 'vendor.build.prop': common.PartitionBuildProps.FromDictionary( 88 'vendor', { 89 'ro.product.vendor.brand': 'vendor-product-brand', 90 'ro.product.vendor.name': 'vendor-product-name', 91 'ro.product.vendor.device': 'vendor-product-device', 92 'ro.vendor.build.version.release': 'vendor-version-release', 93 'ro.vendor.build.id': 'vendor-build-id', 94 'ro.vendor.build.version.incremental': 95 'vendor-version-incremental', 96 'ro.vendor.build.type': 'vendor-build-type', 97 'ro.vendor.build.tags': 'vendor-build-tags'} 98 ), 99 'property1': 'value1', 100 'property2': 4096, 101 } 102 103 TEST_INFO_DICT_USES_OEM_PROPS = { 104 'build.prop': common.PartitionBuildProps.FromDictionary( 105 'system', { 106 'ro.product.name': 'product-name', 107 'ro.build.thumbprint': 'build-thumbprint', 108 'ro.build.bar': 'build-bar'} 109 ), 110 'vendor.build.prop': common.PartitionBuildProps.FromDictionary( 111 'vendor', { 112 'ro.vendor.build.fingerprint': 'vendor-build-fingerprint'} 113 ), 114 'property1': 'value1', 115 'property2': 4096, 116 'oem_fingerprint_properties': 'ro.product.device ro.product.brand', 117 } 118 119 TEST_OEM_DICTS = [ 120 { 121 'ro.product.brand': 'brand1', 122 'ro.product.device': 'device1', 123 }, 124 { 125 'ro.product.brand': 'brand2', 126 'ro.product.device': 'device2', 127 }, 128 { 129 'ro.product.brand': 'brand3', 130 'ro.product.device': 'device3', 131 }, 132 ] 133 134 TEST_INFO_DICT_PROPERTY_SOURCE_ORDER = { 135 'build.prop': common.PartitionBuildProps.FromDictionary( 136 'system', { 137 'ro.build.fingerprint': 'build-fingerprint', 138 'ro.product.property_source_order': 139 'product,odm,vendor,system_ext,system'} 140 ), 141 'system.build.prop': common.PartitionBuildProps.FromDictionary( 142 'system', { 143 'ro.product.system.device': 'system-product-device'} 144 ), 145 'vendor.build.prop': common.PartitionBuildProps.FromDictionary( 146 'vendor', { 147 'ro.product.vendor.device': 'vendor-product-device'} 148 ), 149 } 150 151 TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_10 = { 152 'build.prop': common.PartitionBuildProps.FromDictionary( 153 'system', { 154 'ro.build.fingerprint': 'build-fingerprint', 155 'ro.product.property_source_order': 156 'product,product_services,odm,vendor,system', 157 'ro.build.version.release': '10', 158 'ro.build.version.codename': 'REL'} 159 ), 160 'system.build.prop': common.PartitionBuildProps.FromDictionary( 161 'system', { 162 'ro.product.system.device': 'system-product-device'} 163 ), 164 'vendor.build.prop': common.PartitionBuildProps.FromDictionary( 165 'vendor', { 166 'ro.product.vendor.device': 'vendor-product-device'} 167 ), 168 } 169 170 TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_9 = { 171 'build.prop': common.PartitionBuildProps.FromDictionary( 172 'system', { 173 'ro.product.device': 'product-device', 174 'ro.build.fingerprint': 'build-fingerprint', 175 'ro.build.version.release': '9', 176 'ro.build.version.codename': 'REL'} 177 ), 178 'system.build.prop': common.PartitionBuildProps.FromDictionary( 179 'system', { 180 'ro.product.system.device': 'system-product-device'} 181 ), 182 'vendor.build.prop': common.PartitionBuildProps.FromDictionary( 183 'vendor', { 184 'ro.product.vendor.device': 'vendor-product-device'} 185 ), 186 } 187 188 def test_init(self): 189 target_info = common.BuildInfo(self.TEST_INFO_DICT, None) 190 self.assertEqual('product-device', target_info.device) 191 self.assertEqual('build-fingerprint', target_info.fingerprint) 192 self.assertFalse(target_info.is_ab) 193 self.assertIsNone(target_info.oem_props) 194 195 def test_init_with_oem_props(self): 196 target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 197 self.TEST_OEM_DICTS) 198 self.assertEqual('device1', target_info.device) 199 self.assertEqual('brand1/product-name/device1:build-thumbprint', 200 target_info.fingerprint) 201 202 # Swap the order in oem_dicts, which would lead to different BuildInfo. 203 oem_dicts = copy.copy(self.TEST_OEM_DICTS) 204 oem_dicts[0], oem_dicts[2] = oem_dicts[2], oem_dicts[0] 205 target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 206 oem_dicts) 207 self.assertEqual('device3', target_info.device) 208 self.assertEqual('brand3/product-name/device3:build-thumbprint', 209 target_info.fingerprint) 210 211 def test_init_badFingerprint(self): 212 info_dict = copy.deepcopy(self.TEST_INFO_DICT) 213 info_dict['build.prop'].build_props[ 214 'ro.build.fingerprint'] = 'bad fingerprint' 215 self.assertRaises(ValueError, common.BuildInfo, info_dict, None) 216 217 info_dict['build.prop'].build_props[ 218 'ro.build.fingerprint'] = 'bad\x80fingerprint' 219 self.assertRaises(ValueError, common.BuildInfo, info_dict, None) 220 221 def test_init_goodFingerprint(self): 222 info_dict = copy.deepcopy(self.TEST_INFO_FINGERPRINT_DICT) 223 build_info = common.BuildInfo(info_dict) 224 self.assertEqual( 225 'product-brand/product-name/product-device:version-release/build-id/' 226 'version-incremental:build-type/build-tags', build_info.fingerprint) 227 228 build_props = info_dict['build.prop'].build_props 229 del build_props['ro.build.id'] 230 build_props['ro.build.legacy.id'] = 'legacy-build-id' 231 build_info = common.BuildInfo(info_dict, use_legacy_id=True) 232 self.assertEqual( 233 'product-brand/product-name/product-device:version-release/' 234 'legacy-build-id/version-incremental:build-type/build-tags', 235 build_info.fingerprint) 236 237 self.assertRaises(common.ExternalError, common.BuildInfo, info_dict, None, 238 False) 239 240 info_dict['avb_enable'] = 'true' 241 info_dict['vbmeta_digest'] = 'abcde12345' 242 build_info = common.BuildInfo(info_dict, use_legacy_id=False) 243 self.assertEqual( 244 'product-brand/product-name/product-device:version-release/' 245 'legacy-build-id.abcde123/version-incremental:build-type/build-tags', 246 build_info.fingerprint) 247 248 def test___getitem__(self): 249 target_info = common.BuildInfo(self.TEST_INFO_DICT, None) 250 self.assertEqual('value1', target_info['property1']) 251 self.assertEqual(4096, target_info['property2']) 252 self.assertEqual('build-foo', 253 target_info['build.prop'].GetProp('ro.build.foo')) 254 255 def test___getitem__with_oem_props(self): 256 target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 257 self.TEST_OEM_DICTS) 258 self.assertEqual('value1', target_info['property1']) 259 self.assertEqual(4096, target_info['property2']) 260 self.assertIsNone(target_info['build.prop'].GetProp('ro.build.foo')) 261 262 def test___setitem__(self): 263 target_info = common.BuildInfo(copy.deepcopy(self.TEST_INFO_DICT), None) 264 self.assertEqual('value1', target_info['property1']) 265 target_info['property1'] = 'value2' 266 self.assertEqual('value2', target_info['property1']) 267 268 self.assertEqual('build-foo', 269 target_info['build.prop'].GetProp('ro.build.foo')) 270 target_info['build.prop'].build_props['ro.build.foo'] = 'build-bar' 271 self.assertEqual('build-bar', 272 target_info['build.prop'].GetProp('ro.build.foo')) 273 274 def test_get(self): 275 target_info = common.BuildInfo(self.TEST_INFO_DICT, None) 276 self.assertEqual('value1', target_info.get('property1')) 277 self.assertEqual(4096, target_info.get('property2')) 278 self.assertEqual(4096, target_info.get('property2', 1024)) 279 self.assertEqual(1024, target_info.get('property-nonexistent', 1024)) 280 self.assertEqual('build-foo', 281 target_info.get('build.prop').GetProp('ro.build.foo')) 282 283 def test_get_with_oem_props(self): 284 target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 285 self.TEST_OEM_DICTS) 286 self.assertEqual('value1', target_info.get('property1')) 287 self.assertEqual(4096, target_info.get('property2')) 288 self.assertEqual(4096, target_info.get('property2', 1024)) 289 self.assertEqual(1024, target_info.get('property-nonexistent', 1024)) 290 self.assertIsNone(target_info.get('build.prop').GetProp('ro.build.foo')) 291 292 def test_items(self): 293 target_info = common.BuildInfo(self.TEST_INFO_DICT, None) 294 items = target_info.items() 295 self.assertIn(('property1', 'value1'), items) 296 self.assertIn(('property2', 4096), items) 297 298 def test_GetBuildProp(self): 299 target_info = common.BuildInfo(self.TEST_INFO_DICT, None) 300 self.assertEqual('build-foo', target_info.GetBuildProp('ro.build.foo')) 301 self.assertRaises(common.ExternalError, target_info.GetBuildProp, 302 'ro.build.nonexistent') 303 304 def test_GetBuildProp_with_oem_props(self): 305 target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 306 self.TEST_OEM_DICTS) 307 self.assertEqual('build-bar', target_info.GetBuildProp('ro.build.bar')) 308 self.assertRaises(common.ExternalError, target_info.GetBuildProp, 309 'ro.build.nonexistent') 310 311 def test_GetPartitionFingerprint(self): 312 target_info = common.BuildInfo(self.TEST_INFO_DICT, None) 313 self.assertEqual( 314 target_info.GetPartitionFingerprint('vendor'), 315 'vendor-product-brand/vendor-product-name/vendor-product-device' 316 ':vendor-version-release/vendor-build-id/vendor-version-incremental' 317 ':vendor-build-type/vendor-build-tags') 318 319 def test_GetPartitionFingerprint_system_other_uses_system(self): 320 target_info = common.BuildInfo(self.TEST_INFO_DICT, None) 321 self.assertEqual( 322 target_info.GetPartitionFingerprint('system_other'), 323 target_info.GetPartitionFingerprint('system')) 324 325 def test_GetPartitionFingerprint_uses_fingerprint_prop_if_available(self): 326 info_dict = copy.deepcopy(self.TEST_INFO_DICT) 327 info_dict['vendor.build.prop'].build_props[ 328 'ro.vendor.build.fingerprint'] = 'vendor:fingerprint' 329 target_info = common.BuildInfo(info_dict, None) 330 self.assertEqual( 331 target_info.GetPartitionFingerprint('vendor'), 332 'vendor:fingerprint') 333 334 def test_WriteMountOemScript(self): 335 target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 336 self.TEST_OEM_DICTS) 337 script_writer = test_utils.MockScriptWriter() 338 target_info.WriteMountOemScript(script_writer) 339 self.assertEqual([('Mount', '/oem', None)], script_writer.lines) 340 341 def test_WriteDeviceAssertions(self): 342 target_info = common.BuildInfo(self.TEST_INFO_DICT, None) 343 script_writer = test_utils.MockScriptWriter() 344 target_info.WriteDeviceAssertions(script_writer, False) 345 self.assertEqual([('AssertDevice', 'product-device')], script_writer.lines) 346 347 def test_WriteDeviceAssertions_with_oem_props(self): 348 target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 349 self.TEST_OEM_DICTS) 350 script_writer = test_utils.MockScriptWriter() 351 target_info.WriteDeviceAssertions(script_writer, False) 352 self.assertEqual( 353 [ 354 ('AssertOemProperty', 'ro.product.device', 355 ['device1', 'device2', 'device3'], False), 356 ('AssertOemProperty', 'ro.product.brand', 357 ['brand1', 'brand2', 'brand3'], False), 358 ], 359 script_writer.lines) 360 361 def test_ResolveRoProductProperty_FromVendor(self): 362 info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER) 363 info = common.BuildInfo(info_dict, None) 364 self.assertEqual('vendor-product-device', 365 info.GetBuildProp('ro.product.device')) 366 367 def test_ResolveRoProductProperty_FromSystem(self): 368 info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER) 369 del info_dict['vendor.build.prop'].build_props['ro.product.vendor.device'] 370 info = common.BuildInfo(info_dict, None) 371 self.assertEqual('system-product-device', 372 info.GetBuildProp('ro.product.device')) 373 374 def test_ResolveRoProductProperty_InvalidPropertySearchOrder(self): 375 info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER) 376 info_dict['build.prop'].build_props[ 377 'ro.product.property_source_order'] = 'bad-source' 378 with self.assertRaisesRegexp(common.ExternalError, 379 'Invalid ro.product.property_source_order'): 380 info = common.BuildInfo(info_dict, None) 381 info.GetBuildProp('ro.product.device') 382 383 def test_ResolveRoProductProperty_Android10PropertySearchOrder(self): 384 info_dict = copy.deepcopy( 385 self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_10) 386 info = common.BuildInfo(info_dict, None) 387 self.assertEqual('vendor-product-device', 388 info.GetBuildProp('ro.product.device')) 389 390 def test_ResolveRoProductProperty_Android9PropertySearchOrder(self): 391 info_dict = copy.deepcopy( 392 self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_9) 393 info = common.BuildInfo(info_dict, None) 394 self.assertEqual('product-device', 395 info.GetBuildProp('ro.product.device')) 396 397 398class CommonZipTest(test_utils.ReleaseToolsTestCase): 399 400 def _verify(self, zip_file, zip_file_name, arcname, expected_hash, 401 test_file_name=None, expected_stat=None, expected_mode=0o644, 402 expected_compress_type=zipfile.ZIP_STORED): 403 # Verify the stat if present. 404 if test_file_name is not None: 405 new_stat = os.stat(test_file_name) 406 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode)) 407 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime)) 408 409 # Reopen the zip file to verify. 410 zip_file = zipfile.ZipFile(zip_file_name, "r", allowZip64=True) 411 412 # Verify the timestamp. 413 info = zip_file.getinfo(arcname) 414 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0)) 415 416 # Verify the file mode. 417 mode = (info.external_attr >> 16) & 0o777 418 self.assertEqual(mode, expected_mode) 419 420 # Verify the compress type. 421 self.assertEqual(info.compress_type, expected_compress_type) 422 423 # Verify the zip contents. 424 entry = zip_file.open(arcname) 425 sha1_hash = sha1() 426 for chunk in iter(lambda: entry.read(4 * MiB), b''): 427 sha1_hash.update(chunk) 428 self.assertEqual(expected_hash, sha1_hash.hexdigest()) 429 self.assertIsNone(zip_file.testzip()) 430 431 def _test_ZipWrite(self, contents, extra_zipwrite_args=None): 432 extra_zipwrite_args = dict(extra_zipwrite_args or {}) 433 434 test_file = tempfile.NamedTemporaryFile(delete=False) 435 test_file_name = test_file.name 436 437 zip_file = tempfile.NamedTemporaryFile(delete=False) 438 zip_file_name = zip_file.name 439 440 # File names within an archive strip the leading slash. 441 arcname = extra_zipwrite_args.get("arcname", test_file_name) 442 if arcname[0] == "/": 443 arcname = arcname[1:] 444 445 zip_file.close() 446 zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True) 447 448 try: 449 sha1_hash = sha1() 450 for data in contents: 451 sha1_hash.update(bytes(data)) 452 test_file.write(bytes(data)) 453 test_file.close() 454 455 expected_stat = os.stat(test_file_name) 456 expected_mode = extra_zipwrite_args.get("perms", 0o644) 457 expected_compress_type = extra_zipwrite_args.get("compress_type", 458 zipfile.ZIP_STORED) 459 time.sleep(5) # Make sure the atime/mtime will change measurably. 460 461 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args) 462 common.ZipClose(zip_file) 463 464 self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(), 465 test_file_name, expected_stat, expected_mode, 466 expected_compress_type) 467 finally: 468 os.remove(test_file_name) 469 os.remove(zip_file_name) 470 471 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None): 472 extra_args = dict(extra_args or {}) 473 474 zip_file = tempfile.NamedTemporaryFile(delete=False) 475 zip_file_name = zip_file.name 476 zip_file.close() 477 478 zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True) 479 480 try: 481 expected_compress_type = extra_args.get("compress_type", 482 zipfile.ZIP_STORED) 483 time.sleep(5) # Make sure the atime/mtime will change measurably. 484 485 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo): 486 arcname = zinfo_or_arcname 487 expected_mode = extra_args.get("perms", 0o644) 488 else: 489 arcname = zinfo_or_arcname.filename 490 if zinfo_or_arcname.external_attr: 491 zinfo_perms = zinfo_or_arcname.external_attr >> 16 492 else: 493 zinfo_perms = 0o600 494 expected_mode = extra_args.get("perms", zinfo_perms) 495 496 common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args) 497 common.ZipClose(zip_file) 498 499 self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(), 500 expected_mode=expected_mode, 501 expected_compress_type=expected_compress_type) 502 finally: 503 os.remove(zip_file_name) 504 505 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None): 506 extra_args = dict(extra_args or {}) 507 508 zip_file = tempfile.NamedTemporaryFile(delete=False) 509 zip_file_name = zip_file.name 510 511 test_file = tempfile.NamedTemporaryFile(delete=False) 512 test_file_name = test_file.name 513 514 arcname_large = test_file_name 515 arcname_small = "bar" 516 517 # File names within an archive strip the leading slash. 518 if arcname_large[0] == "/": 519 arcname_large = arcname_large[1:] 520 521 zip_file.close() 522 zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True) 523 524 try: 525 sha1_hash = sha1() 526 for data in large: 527 sha1_hash.update(data) 528 test_file.write(data) 529 test_file.close() 530 531 expected_stat = os.stat(test_file_name) 532 expected_mode = 0o644 533 expected_compress_type = extra_args.get("compress_type", 534 zipfile.ZIP_STORED) 535 time.sleep(5) # Make sure the atime/mtime will change measurably. 536 537 common.ZipWrite(zip_file, test_file_name, **extra_args) 538 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args) 539 common.ZipClose(zip_file) 540 541 # Verify the contents written by ZipWrite(). 542 self._verify(zip_file, zip_file_name, arcname_large, 543 sha1_hash.hexdigest(), test_file_name, expected_stat, 544 expected_mode, expected_compress_type) 545 546 # Verify the contents written by ZipWriteStr(). 547 self._verify(zip_file, zip_file_name, arcname_small, 548 sha1(small).hexdigest(), 549 expected_compress_type=expected_compress_type) 550 finally: 551 os.remove(zip_file_name) 552 os.remove(test_file_name) 553 554 def _test_reset_ZIP64_LIMIT(self, func, *args): 555 default_limit = (1 << 31) - 1 556 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) 557 func(*args) 558 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) 559 560 def test_ZipWrite(self): 561 file_contents = os.urandom(1024) 562 self._test_ZipWrite(file_contents) 563 564 def test_ZipWrite_with_opts(self): 565 file_contents = os.urandom(1024) 566 self._test_ZipWrite(file_contents, { 567 "arcname": "foobar", 568 "perms": 0o777, 569 "compress_type": zipfile.ZIP_DEFLATED, 570 }) 571 self._test_ZipWrite(file_contents, { 572 "arcname": "foobar", 573 "perms": 0o700, 574 "compress_type": zipfile.ZIP_STORED, 575 }) 576 577 def test_ZipWrite_large_file(self): 578 file_contents = get_2gb_string() 579 self._test_ZipWrite(file_contents, { 580 "compress_type": zipfile.ZIP_DEFLATED, 581 }) 582 583 def test_ZipWrite_resets_ZIP64_LIMIT(self): 584 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "") 585 586 def test_ZipWriteStr(self): 587 random_string = os.urandom(1024) 588 # Passing arcname 589 self._test_ZipWriteStr("foo", random_string) 590 591 # Passing zinfo 592 zinfo = zipfile.ZipInfo(filename="foo") 593 self._test_ZipWriteStr(zinfo, random_string) 594 595 # Timestamp in the zinfo should be overwritten. 596 zinfo.date_time = (2015, 3, 1, 15, 30, 0) 597 self._test_ZipWriteStr(zinfo, random_string) 598 599 def test_ZipWriteStr_with_opts(self): 600 random_string = os.urandom(1024) 601 # Passing arcname 602 self._test_ZipWriteStr("foo", random_string, { 603 "perms": 0o700, 604 "compress_type": zipfile.ZIP_DEFLATED, 605 }) 606 self._test_ZipWriteStr("bar", random_string, { 607 "compress_type": zipfile.ZIP_STORED, 608 }) 609 610 # Passing zinfo 611 zinfo = zipfile.ZipInfo(filename="foo") 612 self._test_ZipWriteStr(zinfo, random_string, { 613 "compress_type": zipfile.ZIP_DEFLATED, 614 }) 615 self._test_ZipWriteStr(zinfo, random_string, { 616 "perms": 0o600, 617 "compress_type": zipfile.ZIP_STORED, 618 }) 619 self._test_ZipWriteStr(zinfo, random_string, { 620 "perms": 0o000, 621 "compress_type": zipfile.ZIP_STORED, 622 }) 623 624 def test_ZipWriteStr_large_file(self): 625 # zipfile.writestr() doesn't work when the str size is over 2GiB even with 626 # the workaround. We will only test the case of writing a string into a 627 # large archive. 628 long_string = get_2gb_string() 629 short_string = os.urandom(1024) 630 self._test_ZipWriteStr_large_file(long_string, short_string, { 631 "compress_type": zipfile.ZIP_DEFLATED, 632 }) 633 634 def test_ZipWriteStr_resets_ZIP64_LIMIT(self): 635 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, 'foo', b'') 636 zinfo = zipfile.ZipInfo(filename="foo") 637 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, b'') 638 639 def test_bug21309935(self): 640 zip_file = tempfile.NamedTemporaryFile(delete=False) 641 zip_file_name = zip_file.name 642 zip_file.close() 643 644 try: 645 random_string = os.urandom(1024) 646 zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True) 647 # Default perms should be 0o644 when passing the filename. 648 common.ZipWriteStr(zip_file, "foo", random_string) 649 # Honor the specified perms. 650 common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755) 651 # The perms in zinfo should be untouched. 652 zinfo = zipfile.ZipInfo(filename="baz") 653 zinfo.external_attr = 0o740 << 16 654 common.ZipWriteStr(zip_file, zinfo, random_string) 655 # Explicitly specified perms has the priority. 656 zinfo = zipfile.ZipInfo(filename="qux") 657 zinfo.external_attr = 0o700 << 16 658 common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400) 659 common.ZipClose(zip_file) 660 661 self._verify(zip_file, zip_file_name, "foo", 662 sha1(random_string).hexdigest(), 663 expected_mode=0o644) 664 self._verify(zip_file, zip_file_name, "bar", 665 sha1(random_string).hexdigest(), 666 expected_mode=0o755) 667 self._verify(zip_file, zip_file_name, "baz", 668 sha1(random_string).hexdigest(), 669 expected_mode=0o740) 670 self._verify(zip_file, zip_file_name, "qux", 671 sha1(random_string).hexdigest(), 672 expected_mode=0o400) 673 finally: 674 os.remove(zip_file_name) 675 676 @test_utils.SkipIfExternalToolsUnavailable() 677 def test_ZipDelete(self): 678 zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip') 679 output_zip = zipfile.ZipFile(zip_file.name, 'w', 680 compression=zipfile.ZIP_DEFLATED) 681 with tempfile.NamedTemporaryFile() as entry_file: 682 entry_file.write(os.urandom(1024)) 683 common.ZipWrite(output_zip, entry_file.name, arcname='Test1') 684 common.ZipWrite(output_zip, entry_file.name, arcname='Test2') 685 common.ZipWrite(output_zip, entry_file.name, arcname='Test3') 686 common.ZipClose(output_zip) 687 zip_file.close() 688 689 try: 690 common.ZipDelete(zip_file.name, 'Test2') 691 with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip: 692 entries = check_zip.namelist() 693 self.assertTrue('Test1' in entries) 694 self.assertFalse('Test2' in entries) 695 self.assertTrue('Test3' in entries) 696 697 self.assertRaises( 698 common.ExternalError, common.ZipDelete, zip_file.name, 'Test2') 699 with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip: 700 entries = check_zip.namelist() 701 self.assertTrue('Test1' in entries) 702 self.assertFalse('Test2' in entries) 703 self.assertTrue('Test3' in entries) 704 705 common.ZipDelete(zip_file.name, ['Test3']) 706 with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip: 707 entries = check_zip.namelist() 708 self.assertTrue('Test1' in entries) 709 self.assertFalse('Test2' in entries) 710 self.assertFalse('Test3' in entries) 711 712 common.ZipDelete(zip_file.name, ['Test1', 'Test2']) 713 with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip: 714 entries = check_zip.namelist() 715 self.assertFalse('Test1' in entries) 716 self.assertFalse('Test2' in entries) 717 self.assertFalse('Test3' in entries) 718 finally: 719 os.remove(zip_file.name) 720 721 @staticmethod 722 def _test_UnzipTemp_createZipFile(): 723 zip_file = common.MakeTempFile(suffix='.zip') 724 output_zip = zipfile.ZipFile( 725 zip_file, 'w', compression=zipfile.ZIP_DEFLATED) 726 contents = os.urandom(1024) 727 with tempfile.NamedTemporaryFile() as entry_file: 728 entry_file.write(contents) 729 common.ZipWrite(output_zip, entry_file.name, arcname='Test1') 730 common.ZipWrite(output_zip, entry_file.name, arcname='Test2') 731 common.ZipWrite(output_zip, entry_file.name, arcname='Foo3') 732 common.ZipWrite(output_zip, entry_file.name, arcname='Bar4') 733 common.ZipWrite(output_zip, entry_file.name, arcname='Dir5/Baz5') 734 common.ZipClose(output_zip) 735 common.ZipClose(output_zip) 736 return zip_file 737 738 @test_utils.SkipIfExternalToolsUnavailable() 739 def test_UnzipTemp(self): 740 zip_file = self._test_UnzipTemp_createZipFile() 741 unzipped_dir = common.UnzipTemp(zip_file) 742 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 743 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 744 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 745 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 746 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 747 748 @test_utils.SkipIfExternalToolsUnavailable() 749 def test_UnzipTemp_withPatterns(self): 750 zip_file = self._test_UnzipTemp_createZipFile() 751 752 unzipped_dir = common.UnzipTemp(zip_file, ['Test1']) 753 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 754 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 755 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 756 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 757 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 758 759 unzipped_dir = common.UnzipTemp(zip_file, ['Test1', 'Foo3']) 760 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 761 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 762 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 763 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 764 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 765 766 unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Foo3*']) 767 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 768 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 769 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 770 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 771 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 772 773 unzipped_dir = common.UnzipTemp(zip_file, ['*Test1', '*Baz*']) 774 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 775 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 776 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 777 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 778 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 779 780 def test_UnzipTemp_withEmptyPatterns(self): 781 zip_file = self._test_UnzipTemp_createZipFile() 782 unzipped_dir = common.UnzipTemp(zip_file, []) 783 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 784 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 785 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 786 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 787 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 788 789 @test_utils.SkipIfExternalToolsUnavailable() 790 def test_UnzipTemp_withPartiallyMatchingPatterns(self): 791 zip_file = self._test_UnzipTemp_createZipFile() 792 unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Nonexistent*']) 793 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 794 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 795 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 796 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 797 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 798 799 def test_UnzipTemp_withNoMatchingPatterns(self): 800 zip_file = self._test_UnzipTemp_createZipFile() 801 unzipped_dir = common.UnzipTemp(zip_file, ['Foo4', 'Nonexistent*']) 802 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 803 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 804 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 805 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 806 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 807 808 809class CommonApkUtilsTest(test_utils.ReleaseToolsTestCase): 810 """Tests the APK utils related functions.""" 811 812 APKCERTS_TXT1 = ( 813 'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"' 814 ' private_key="certs/devkey.pk8"\n' 815 'name="Settings.apk"' 816 ' certificate="build/make/target/product/security/platform.x509.pem"' 817 ' private_key="build/make/target/product/security/platform.pk8"\n' 818 'name="TV.apk" certificate="PRESIGNED" private_key=""\n' 819 ) 820 821 APKCERTS_CERTMAP1 = { 822 'RecoveryLocalizer.apk' : 'certs/devkey', 823 'Settings.apk' : 'build/make/target/product/security/platform', 824 'TV.apk' : 'PRESIGNED', 825 } 826 827 APKCERTS_TXT2 = ( 828 'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"' 829 ' private_key="certs/compressed1.pk8" compressed="gz"\n' 830 'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"' 831 ' private_key="certs/compressed2.pk8" compressed="gz"\n' 832 'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"' 833 ' private_key="certs/compressed2.pk8" compressed="gz"\n' 834 'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"' 835 ' private_key="certs/compressed3.pk8" compressed="gz"\n' 836 ) 837 838 APKCERTS_CERTMAP2 = { 839 'Compressed1.apk' : 'certs/compressed1', 840 'Compressed2a.apk' : 'certs/compressed2', 841 'Compressed2b.apk' : 'certs/compressed2', 842 'Compressed3.apk' : 'certs/compressed3', 843 } 844 845 APKCERTS_TXT3 = ( 846 'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"' 847 ' private_key="certs/compressed4.pk8" compressed="xz"\n' 848 ) 849 850 APKCERTS_CERTMAP3 = { 851 'Compressed4.apk' : 'certs/compressed4', 852 } 853 854 # Test parsing with no optional fields, both optional fields, and only the 855 # partition optional field. 856 APKCERTS_TXT4 = ( 857 'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"' 858 ' private_key="certs/devkey.pk8"\n' 859 'name="Settings.apk"' 860 ' certificate="build/make/target/product/security/platform.x509.pem"' 861 ' private_key="build/make/target/product/security/platform.pk8"' 862 ' compressed="gz" partition="system"\n' 863 'name="TV.apk" certificate="PRESIGNED" private_key=""' 864 ' partition="product"\n' 865 ) 866 867 APKCERTS_CERTMAP4 = { 868 'RecoveryLocalizer.apk' : 'certs/devkey', 869 'Settings.apk' : 'build/make/target/product/security/platform', 870 'TV.apk' : 'PRESIGNED', 871 } 872 873 def setUp(self): 874 self.testdata_dir = test_utils.get_testdata_dir() 875 876 @staticmethod 877 def _write_apkcerts_txt(apkcerts_txt, additional=None): 878 if additional is None: 879 additional = [] 880 target_files = common.MakeTempFile(suffix='.zip') 881 with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip: 882 target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt) 883 for entry in additional: 884 target_files_zip.writestr(entry, '') 885 return target_files 886 887 def test_ReadApkCerts_NoncompressedApks(self): 888 target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1) 889 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip: 890 certmap, ext = common.ReadApkCerts(input_zip) 891 892 self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap) 893 self.assertIsNone(ext) 894 895 def test_ReadApkCerts_CompressedApks(self): 896 # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is 897 # not stored in '.gz' format, so it shouldn't be considered as installed. 898 target_files = self._write_apkcerts_txt( 899 self.APKCERTS_TXT2, 900 ['Compressed1.apk.gz', 'Compressed3.apk']) 901 902 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip: 903 certmap, ext = common.ReadApkCerts(input_zip) 904 905 self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap) 906 self.assertEqual('.gz', ext) 907 908 # Alternative case with '.xz'. 909 target_files = self._write_apkcerts_txt( 910 self.APKCERTS_TXT3, ['Compressed4.apk.xz']) 911 912 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip: 913 certmap, ext = common.ReadApkCerts(input_zip) 914 915 self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap) 916 self.assertEqual('.xz', ext) 917 918 def test_ReadApkCerts_CompressedAndNoncompressedApks(self): 919 target_files = self._write_apkcerts_txt( 920 self.APKCERTS_TXT1 + self.APKCERTS_TXT2, 921 ['Compressed1.apk.gz', 'Compressed3.apk']) 922 923 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip: 924 certmap, ext = common.ReadApkCerts(input_zip) 925 926 certmap_merged = self.APKCERTS_CERTMAP1.copy() 927 certmap_merged.update(self.APKCERTS_CERTMAP2) 928 self.assertDictEqual(certmap_merged, certmap) 929 self.assertEqual('.gz', ext) 930 931 def test_ReadApkCerts_MultipleCompressionMethods(self): 932 target_files = self._write_apkcerts_txt( 933 self.APKCERTS_TXT2 + self.APKCERTS_TXT3, 934 ['Compressed1.apk.gz', 'Compressed4.apk.xz']) 935 936 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip: 937 self.assertRaises(ValueError, common.ReadApkCerts, input_zip) 938 939 def test_ReadApkCerts_MismatchingKeys(self): 940 malformed_apkcerts_txt = ( 941 'name="App1.apk" certificate="certs/cert1.x509.pem"' 942 ' private_key="certs/cert2.pk8"\n' 943 ) 944 target_files = self._write_apkcerts_txt(malformed_apkcerts_txt) 945 946 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip: 947 self.assertRaises(ValueError, common.ReadApkCerts, input_zip) 948 949 def test_ReadApkCerts_WithWithoutOptionalFields(self): 950 target_files = self._write_apkcerts_txt(self.APKCERTS_TXT4) 951 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip: 952 certmap, ext = common.ReadApkCerts(input_zip) 953 954 self.assertDictEqual(self.APKCERTS_CERTMAP4, certmap) 955 self.assertIsNone(ext) 956 957 def test_ExtractPublicKey(self): 958 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem') 959 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem') 960 with open(pubkey) as pubkey_fp: 961 self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert)) 962 963 def test_ExtractPublicKey_invalidInput(self): 964 wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8') 965 self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input) 966 967 @test_utils.SkipIfExternalToolsUnavailable() 968 def test_ExtractAvbPublicKey(self): 969 privkey = os.path.join(self.testdata_dir, 'testkey.key') 970 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem') 971 extracted_from_privkey = common.ExtractAvbPublicKey('avbtool', privkey) 972 extracted_from_pubkey = common.ExtractAvbPublicKey('avbtool', pubkey) 973 with open(extracted_from_privkey, 'rb') as privkey_fp, \ 974 open(extracted_from_pubkey, 'rb') as pubkey_fp: 975 self.assertEqual(privkey_fp.read(), pubkey_fp.read()) 976 977 def test_ParseCertificate(self): 978 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem') 979 980 cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER'] 981 proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, 982 universal_newlines=False) 983 expected, _ = proc.communicate() 984 self.assertEqual(0, proc.returncode) 985 986 with open(cert) as cert_fp: 987 actual = common.ParseCertificate(cert_fp.read()) 988 self.assertEqual(expected, actual) 989 990 @test_utils.SkipIfExternalToolsUnavailable() 991 def test_GetMinSdkVersion(self): 992 test_app = os.path.join(self.testdata_dir, 'TestApp.apk') 993 self.assertEqual('24', common.GetMinSdkVersion(test_app)) 994 995 @test_utils.SkipIfExternalToolsUnavailable() 996 def test_GetMinSdkVersion_invalidInput(self): 997 self.assertRaises( 998 common.ExternalError, common.GetMinSdkVersion, 'does-not-exist.apk') 999 1000 @test_utils.SkipIfExternalToolsUnavailable() 1001 def test_GetMinSdkVersionInt(self): 1002 test_app = os.path.join(self.testdata_dir, 'TestApp.apk') 1003 self.assertEqual(24, common.GetMinSdkVersionInt(test_app, {})) 1004 1005 @test_utils.SkipIfExternalToolsUnavailable() 1006 def test_GetMinSdkVersionInt_invalidInput(self): 1007 self.assertRaises( 1008 common.ExternalError, common.GetMinSdkVersionInt, 'does-not-exist.apk', 1009 {}) 1010 1011 1012class CommonUtilsTest(test_utils.ReleaseToolsTestCase): 1013 1014 def setUp(self): 1015 self.testdata_dir = test_utils.get_testdata_dir() 1016 1017 @test_utils.SkipIfExternalToolsUnavailable() 1018 def test_GetSparseImage_emptyBlockMapFile(self): 1019 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 1020 with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip: 1021 target_files_zip.write( 1022 test_utils.construct_sparse_image([ 1023 (0xCAC1, 6), 1024 (0xCAC3, 3), 1025 (0xCAC1, 4)]), 1026 arcname='IMAGES/system.img') 1027 target_files_zip.writestr('IMAGES/system.map', '') 1028 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8)) 1029 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 1030 1031 tempdir = common.UnzipTemp(target_files) 1032 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip: 1033 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False) 1034 1035 self.assertDictEqual( 1036 { 1037 '__COPY': RangeSet("0"), 1038 '__NONZERO-0': RangeSet("1-5 9-12"), 1039 }, 1040 sparse_image.file_map) 1041 1042 def test_PartitionMapFromTargetFiles(self): 1043 target_files_dir = common.MakeTempDir() 1044 os.makedirs(os.path.join(target_files_dir, 'SYSTEM')) 1045 os.makedirs(os.path.join(target_files_dir, 'SYSTEM', 'vendor')) 1046 os.makedirs(os.path.join(target_files_dir, 'PRODUCT')) 1047 os.makedirs(os.path.join(target_files_dir, 'SYSTEM', 'product')) 1048 os.makedirs(os.path.join(target_files_dir, 'SYSTEM', 'vendor', 'odm')) 1049 os.makedirs(os.path.join(target_files_dir, 'VENDOR_DLKM')) 1050 partition_map = common.PartitionMapFromTargetFiles(target_files_dir) 1051 self.assertDictEqual( 1052 partition_map, 1053 { 1054 'system': 'SYSTEM', 1055 'vendor': 'SYSTEM/vendor', 1056 # Prefer PRODUCT over SYSTEM/product 1057 'product': 'PRODUCT', 1058 'odm': 'SYSTEM/vendor/odm', 1059 'vendor_dlkm': 'VENDOR_DLKM', 1060 # No system_ext or odm_dlkm 1061 }) 1062 1063 def test_SharedUidPartitionViolations(self): 1064 uid_dict = { 1065 'android.uid.phone': { 1066 'system': ['system_phone.apk'], 1067 'system_ext': ['system_ext_phone.apk'], 1068 }, 1069 'android.uid.wifi': { 1070 'vendor': ['vendor_wifi.apk'], 1071 'odm': ['odm_wifi.apk'], 1072 }, 1073 } 1074 errors = common.SharedUidPartitionViolations( 1075 uid_dict, [('system', 'system_ext'), ('vendor', 'odm')]) 1076 self.assertEqual(errors, []) 1077 1078 def test_SharedUidPartitionViolations_Violation(self): 1079 uid_dict = { 1080 'android.uid.phone': { 1081 'system': ['system_phone.apk'], 1082 'vendor': ['vendor_phone.apk'], 1083 }, 1084 } 1085 errors = common.SharedUidPartitionViolations( 1086 uid_dict, [('system', 'system_ext'), ('vendor', 'odm')]) 1087 self.assertIn( 1088 ('APK sharedUserId "android.uid.phone" found across partition groups ' 1089 'in partitions "system,vendor"'), errors) 1090 1091 def test_GetSparseImage_missingImageFile(self): 1092 self.assertRaises( 1093 AssertionError, common.GetSparseImage, 'system2', self.testdata_dir, 1094 None, False) 1095 self.assertRaises( 1096 AssertionError, common.GetSparseImage, 'unknown', self.testdata_dir, 1097 None, False) 1098 1099 @test_utils.SkipIfExternalToolsUnavailable() 1100 def test_GetSparseImage_missingBlockMapFile(self): 1101 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 1102 with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip: 1103 target_files_zip.write( 1104 test_utils.construct_sparse_image([ 1105 (0xCAC1, 6), 1106 (0xCAC3, 3), 1107 (0xCAC1, 4)]), 1108 arcname='IMAGES/system.img') 1109 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8)) 1110 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 1111 1112 tempdir = common.UnzipTemp(target_files) 1113 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip: 1114 self.assertRaises( 1115 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip, 1116 False) 1117 1118 @test_utils.SkipIfExternalToolsUnavailable() 1119 def test_GetSparseImage_sharedBlocks_notAllowed(self): 1120 """Tests the case of having overlapping blocks but disallowed.""" 1121 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 1122 with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip: 1123 target_files_zip.write( 1124 test_utils.construct_sparse_image([(0xCAC2, 16)]), 1125 arcname='IMAGES/system.img') 1126 # Block 10 is shared between two files. 1127 target_files_zip.writestr( 1128 'IMAGES/system.map', 1129 '\n'.join([ 1130 '/system/file1 1-5 9-10', 1131 '/system/file2 10-12'])) 1132 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 1133 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 1134 1135 tempdir = common.UnzipTemp(target_files) 1136 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip: 1137 self.assertRaises( 1138 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip, 1139 False) 1140 1141 @test_utils.SkipIfExternalToolsUnavailable() 1142 def test_GetSparseImage_sharedBlocks_allowed(self): 1143 """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true.""" 1144 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 1145 with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip: 1146 # Construct an image with a care_map of "0-5 9-12". 1147 target_files_zip.write( 1148 test_utils.construct_sparse_image([(0xCAC2, 16)]), 1149 arcname='IMAGES/system.img') 1150 # Block 10 is shared between two files. 1151 target_files_zip.writestr( 1152 'IMAGES/system.map', 1153 '\n'.join([ 1154 '/system/file1 1-5 9-10', 1155 '/system/file2 10-12'])) 1156 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 1157 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 1158 1159 tempdir = common.UnzipTemp(target_files) 1160 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip: 1161 sparse_image = common.GetSparseImage('system', tempdir, input_zip, True) 1162 1163 self.assertDictEqual( 1164 { 1165 '__COPY': RangeSet("0"), 1166 '__NONZERO-0': RangeSet("6-8 13-15"), 1167 '/system/file1': RangeSet("1-5 9-10"), 1168 '/system/file2': RangeSet("11-12"), 1169 }, 1170 sparse_image.file_map) 1171 1172 # '/system/file2' should be marked with 'uses_shared_blocks', but not with 1173 # 'incomplete'. 1174 self.assertTrue( 1175 sparse_image.file_map['/system/file2'].extra['uses_shared_blocks']) 1176 self.assertNotIn( 1177 'incomplete', sparse_image.file_map['/system/file2'].extra) 1178 1179 # '/system/file1' will only contain one field -- a copy of the input text. 1180 self.assertEqual(1, len(sparse_image.file_map['/system/file1'].extra)) 1181 1182 # Meta entries should not have any extra tag. 1183 self.assertFalse(sparse_image.file_map['__COPY'].extra) 1184 self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra) 1185 1186 @test_utils.SkipIfExternalToolsUnavailable() 1187 def test_GetSparseImage_incompleteRanges(self): 1188 """Tests the case of ext4 images with holes.""" 1189 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 1190 with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip: 1191 target_files_zip.write( 1192 test_utils.construct_sparse_image([(0xCAC2, 16)]), 1193 arcname='IMAGES/system.img') 1194 target_files_zip.writestr( 1195 'IMAGES/system.map', 1196 '\n'.join([ 1197 '/system/file1 1-5 9-10', 1198 '/system/file2 11-12'])) 1199 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 1200 # '/system/file2' has less blocks listed (2) than actual (3). 1201 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 1202 1203 tempdir = common.UnzipTemp(target_files) 1204 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip: 1205 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False) 1206 1207 self.assertEqual( 1208 '1-5 9-10', 1209 sparse_image.file_map['/system/file1'].extra['text_str']) 1210 self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete']) 1211 1212 @test_utils.SkipIfExternalToolsUnavailable() 1213 def test_GetSparseImage_systemRootImage_filenameWithExtraLeadingSlash(self): 1214 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 1215 with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip: 1216 target_files_zip.write( 1217 test_utils.construct_sparse_image([(0xCAC2, 16)]), 1218 arcname='IMAGES/system.img') 1219 target_files_zip.writestr( 1220 'IMAGES/system.map', 1221 '\n'.join([ 1222 '//system/file1 1-5 9-10', 1223 '//system/file2 11-12', 1224 '/system/app/file3 13-15'])) 1225 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 1226 # '/system/file2' has less blocks listed (2) than actual (3). 1227 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 1228 # '/system/app/file3' has less blocks listed (3) than actual (4). 1229 target_files_zip.writestr('SYSTEM/app/file3', os.urandom(4096 * 4)) 1230 1231 tempdir = common.UnzipTemp(target_files) 1232 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip: 1233 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False) 1234 1235 self.assertEqual( 1236 '1-5 9-10', 1237 sparse_image.file_map['//system/file1'].extra['text_str']) 1238 self.assertTrue(sparse_image.file_map['//system/file2'].extra['incomplete']) 1239 self.assertTrue( 1240 sparse_image.file_map['/system/app/file3'].extra['incomplete']) 1241 1242 @test_utils.SkipIfExternalToolsUnavailable() 1243 def test_GetSparseImage_systemRootImage_nonSystemFiles(self): 1244 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 1245 with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip: 1246 target_files_zip.write( 1247 test_utils.construct_sparse_image([(0xCAC2, 16)]), 1248 arcname='IMAGES/system.img') 1249 target_files_zip.writestr( 1250 'IMAGES/system.map', 1251 '\n'.join([ 1252 '//system/file1 1-5 9-10', 1253 '//init.rc 13-15'])) 1254 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 1255 # '/init.rc' has less blocks listed (3) than actual (4). 1256 target_files_zip.writestr('ROOT/init.rc', os.urandom(4096 * 4)) 1257 1258 tempdir = common.UnzipTemp(target_files) 1259 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip: 1260 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False) 1261 1262 self.assertEqual( 1263 '1-5 9-10', 1264 sparse_image.file_map['//system/file1'].extra['text_str']) 1265 self.assertTrue(sparse_image.file_map['//init.rc'].extra['incomplete']) 1266 1267 @test_utils.SkipIfExternalToolsUnavailable() 1268 def test_GetSparseImage_fileNotFound(self): 1269 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 1270 with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip: 1271 target_files_zip.write( 1272 test_utils.construct_sparse_image([(0xCAC2, 16)]), 1273 arcname='IMAGES/system.img') 1274 target_files_zip.writestr( 1275 'IMAGES/system.map', 1276 '\n'.join([ 1277 '//system/file1 1-5 9-10', 1278 '//system/file2 11-12'])) 1279 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 1280 1281 tempdir = common.UnzipTemp(target_files) 1282 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip: 1283 self.assertRaises( 1284 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip, 1285 False) 1286 1287 @test_utils.SkipIfExternalToolsUnavailable() 1288 def test_GetAvbChainedPartitionArg(self): 1289 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem') 1290 info_dict = { 1291 'avb_avbtool': 'avbtool', 1292 'avb_system_key_path': pubkey, 1293 'avb_system_rollback_index_location': 2, 1294 } 1295 args = common.GetAvbChainedPartitionArg('system', info_dict).split(':') 1296 self.assertEqual(3, len(args)) 1297 self.assertEqual('system', args[0]) 1298 self.assertEqual('2', args[1]) 1299 self.assertTrue(os.path.exists(args[2])) 1300 1301 @test_utils.SkipIfExternalToolsUnavailable() 1302 def test_GetAvbChainedPartitionArg_withPrivateKey(self): 1303 key = os.path.join(self.testdata_dir, 'testkey.key') 1304 info_dict = { 1305 'avb_avbtool': 'avbtool', 1306 'avb_product_key_path': key, 1307 'avb_product_rollback_index_location': 2, 1308 } 1309 args = common.GetAvbChainedPartitionArg('product', info_dict).split(':') 1310 self.assertEqual(3, len(args)) 1311 self.assertEqual('product', args[0]) 1312 self.assertEqual('2', args[1]) 1313 self.assertTrue(os.path.exists(args[2])) 1314 1315 @test_utils.SkipIfExternalToolsUnavailable() 1316 def test_GetAvbChainedPartitionArg_withSpecifiedKey(self): 1317 info_dict = { 1318 'avb_avbtool': 'avbtool', 1319 'avb_system_key_path': 'does-not-exist', 1320 'avb_system_rollback_index_location': 2, 1321 } 1322 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem') 1323 args = common.GetAvbChainedPartitionArg( 1324 'system', info_dict, pubkey).split(':') 1325 self.assertEqual(3, len(args)) 1326 self.assertEqual('system', args[0]) 1327 self.assertEqual('2', args[1]) 1328 self.assertTrue(os.path.exists(args[2])) 1329 1330 @test_utils.SkipIfExternalToolsUnavailable() 1331 def test_GetAvbChainedPartitionArg_invalidKey(self): 1332 pubkey = os.path.join(self.testdata_dir, 'testkey_with_passwd.x509.pem') 1333 info_dict = { 1334 'avb_avbtool': 'avbtool', 1335 'avb_system_key_path': pubkey, 1336 'avb_system_rollback_index_location': 2, 1337 } 1338 self.assertRaises( 1339 common.ExternalError, common.GetAvbChainedPartitionArg, 'system', 1340 info_dict) 1341 1342 INFO_DICT_DEFAULT = { 1343 'recovery_api_version': 3, 1344 'fstab_version': 2, 1345 'system_root_image': 'true', 1346 'no_recovery' : 'true', 1347 'recovery_as_boot': 'true', 1348 } 1349 1350 def test_LoadListFromFile(self): 1351 file_path = os.path.join(self.testdata_dir, 1352 'merge_config_framework_item_list') 1353 contents = common.LoadListFromFile(file_path) 1354 expected_contents = [ 1355 'META/apkcerts.txt', 1356 'META/filesystem_config.txt', 1357 'META/root_filesystem_config.txt', 1358 'META/system_manifest.xml', 1359 'META/system_matrix.xml', 1360 'META/update_engine_config.txt', 1361 'PRODUCT/*', 1362 'ROOT/*', 1363 'SYSTEM/*', 1364 ] 1365 self.assertEqual(sorted(contents), sorted(expected_contents)) 1366 1367 @staticmethod 1368 def _test_LoadInfoDict_createTargetFiles(info_dict, fstab_path): 1369 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 1370 with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip: 1371 info_values = ''.join( 1372 ['{}={}\n'.format(k, v) for k, v in sorted(info_dict.items())]) 1373 common.ZipWriteStr(target_files_zip, 'META/misc_info.txt', info_values) 1374 1375 FSTAB_TEMPLATE = "/dev/block/system {} ext4 ro,barrier=1 defaults" 1376 if info_dict.get('system_root_image') == 'true': 1377 fstab_values = FSTAB_TEMPLATE.format('/') 1378 else: 1379 fstab_values = FSTAB_TEMPLATE.format('/system') 1380 common.ZipWriteStr(target_files_zip, fstab_path, fstab_values) 1381 1382 common.ZipWriteStr( 1383 target_files_zip, 'META/file_contexts', 'file-contexts') 1384 return target_files 1385 1386 def test_LoadInfoDict(self): 1387 target_files = self._test_LoadInfoDict_createTargetFiles( 1388 self.INFO_DICT_DEFAULT, 1389 'BOOT/RAMDISK/system/etc/recovery.fstab') 1390 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip: 1391 loaded_dict = common.LoadInfoDict(target_files_zip) 1392 self.assertEqual(3, loaded_dict['recovery_api_version']) 1393 self.assertEqual(2, loaded_dict['fstab_version']) 1394 self.assertIn('/', loaded_dict['fstab']) 1395 self.assertIn('/system', loaded_dict['fstab']) 1396 1397 def test_LoadInfoDict_legacyRecoveryFstabPath(self): 1398 target_files = self._test_LoadInfoDict_createTargetFiles( 1399 self.INFO_DICT_DEFAULT, 1400 'BOOT/RAMDISK/etc/recovery.fstab') 1401 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip: 1402 loaded_dict = common.LoadInfoDict(target_files_zip) 1403 self.assertEqual(3, loaded_dict['recovery_api_version']) 1404 self.assertEqual(2, loaded_dict['fstab_version']) 1405 self.assertIn('/', loaded_dict['fstab']) 1406 self.assertIn('/system', loaded_dict['fstab']) 1407 1408 @test_utils.SkipIfExternalToolsUnavailable() 1409 def test_LoadInfoDict_dirInput(self): 1410 target_files = self._test_LoadInfoDict_createTargetFiles( 1411 self.INFO_DICT_DEFAULT, 1412 'BOOT/RAMDISK/system/etc/recovery.fstab') 1413 unzipped = common.UnzipTemp(target_files) 1414 loaded_dict = common.LoadInfoDict(unzipped) 1415 self.assertEqual(3, loaded_dict['recovery_api_version']) 1416 self.assertEqual(2, loaded_dict['fstab_version']) 1417 self.assertIn('/', loaded_dict['fstab']) 1418 self.assertIn('/system', loaded_dict['fstab']) 1419 1420 @test_utils.SkipIfExternalToolsUnavailable() 1421 def test_LoadInfoDict_dirInput_legacyRecoveryFstabPath(self): 1422 target_files = self._test_LoadInfoDict_createTargetFiles( 1423 self.INFO_DICT_DEFAULT, 1424 'BOOT/RAMDISK/system/etc/recovery.fstab') 1425 unzipped = common.UnzipTemp(target_files) 1426 loaded_dict = common.LoadInfoDict(unzipped) 1427 self.assertEqual(3, loaded_dict['recovery_api_version']) 1428 self.assertEqual(2, loaded_dict['fstab_version']) 1429 self.assertIn('/', loaded_dict['fstab']) 1430 self.assertIn('/system', loaded_dict['fstab']) 1431 1432 def test_LoadInfoDict_systemRootImageFalse(self): 1433 # Devices not using system-as-root nor recovery-as-boot. Non-A/B devices 1434 # launched prior to P will likely have this config. 1435 info_dict = copy.copy(self.INFO_DICT_DEFAULT) 1436 del info_dict['no_recovery'] 1437 del info_dict['system_root_image'] 1438 del info_dict['recovery_as_boot'] 1439 target_files = self._test_LoadInfoDict_createTargetFiles( 1440 info_dict, 1441 'RECOVERY/RAMDISK/system/etc/recovery.fstab') 1442 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip: 1443 loaded_dict = common.LoadInfoDict(target_files_zip) 1444 self.assertEqual(3, loaded_dict['recovery_api_version']) 1445 self.assertEqual(2, loaded_dict['fstab_version']) 1446 self.assertNotIn('/', loaded_dict['fstab']) 1447 self.assertIn('/system', loaded_dict['fstab']) 1448 1449 def test_LoadInfoDict_recoveryAsBootFalse(self): 1450 # Devices using system-as-root, but with standalone recovery image. Non-A/B 1451 # devices launched since P will likely have this config. 1452 info_dict = copy.copy(self.INFO_DICT_DEFAULT) 1453 del info_dict['no_recovery'] 1454 del info_dict['recovery_as_boot'] 1455 target_files = self._test_LoadInfoDict_createTargetFiles( 1456 info_dict, 1457 'RECOVERY/RAMDISK/system/etc/recovery.fstab') 1458 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip: 1459 loaded_dict = common.LoadInfoDict(target_files_zip) 1460 self.assertEqual(3, loaded_dict['recovery_api_version']) 1461 self.assertEqual(2, loaded_dict['fstab_version']) 1462 self.assertIn('/', loaded_dict['fstab']) 1463 self.assertIn('/system', loaded_dict['fstab']) 1464 1465 def test_LoadInfoDict_noRecoveryTrue(self): 1466 # Device doesn't have a recovery partition at all. 1467 info_dict = copy.copy(self.INFO_DICT_DEFAULT) 1468 del info_dict['recovery_as_boot'] 1469 target_files = self._test_LoadInfoDict_createTargetFiles( 1470 info_dict, 1471 'RECOVERY/RAMDISK/system/etc/recovery.fstab') 1472 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip: 1473 loaded_dict = common.LoadInfoDict(target_files_zip) 1474 self.assertEqual(3, loaded_dict['recovery_api_version']) 1475 self.assertEqual(2, loaded_dict['fstab_version']) 1476 self.assertIsNone(loaded_dict['fstab']) 1477 1478 @test_utils.SkipIfExternalToolsUnavailable() 1479 def test_LoadInfoDict_missingMetaMiscInfoTxt(self): 1480 target_files = self._test_LoadInfoDict_createTargetFiles( 1481 self.INFO_DICT_DEFAULT, 1482 'BOOT/RAMDISK/system/etc/recovery.fstab') 1483 common.ZipDelete(target_files, 'META/misc_info.txt') 1484 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip: 1485 self.assertRaises(ValueError, common.LoadInfoDict, target_files_zip) 1486 1487 @test_utils.SkipIfExternalToolsUnavailable() 1488 def test_LoadInfoDict_repacking(self): 1489 target_files = self._test_LoadInfoDict_createTargetFiles( 1490 self.INFO_DICT_DEFAULT, 1491 'BOOT/RAMDISK/system/etc/recovery.fstab') 1492 unzipped = common.UnzipTemp(target_files) 1493 loaded_dict = common.LoadInfoDict(unzipped, True) 1494 self.assertEqual(3, loaded_dict['recovery_api_version']) 1495 self.assertEqual(2, loaded_dict['fstab_version']) 1496 self.assertIn('/', loaded_dict['fstab']) 1497 self.assertIn('/system', loaded_dict['fstab']) 1498 self.assertEqual( 1499 os.path.join(unzipped, 'ROOT'), loaded_dict['root_dir']) 1500 self.assertEqual( 1501 os.path.join(unzipped, 'META', 'root_filesystem_config.txt'), 1502 loaded_dict['root_fs_config']) 1503 1504 def test_LoadInfoDict_repackingWithZipFileInput(self): 1505 target_files = self._test_LoadInfoDict_createTargetFiles( 1506 self.INFO_DICT_DEFAULT, 1507 'BOOT/RAMDISK/system/etc/recovery.fstab') 1508 with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip: 1509 self.assertRaises( 1510 AssertionError, common.LoadInfoDict, target_files_zip, True) 1511 1512 def test_MergeDynamicPartitionInfoDicts_ReturnsMergedDict(self): 1513 framework_dict = { 1514 'use_dynamic_partitions': 'true', 1515 'super_partition_groups': 'group_a', 1516 'dynamic_partition_list': 'system', 1517 'super_group_a_partition_list': 'system', 1518 } 1519 vendor_dict = { 1520 'use_dynamic_partitions': 'true', 1521 'super_partition_groups': 'group_a group_b', 1522 'dynamic_partition_list': 'vendor product', 1523 'super_block_devices': 'super', 1524 'super_super_device_size': '3000', 1525 'super_group_a_partition_list': 'vendor', 1526 'super_group_a_group_size': '1000', 1527 'super_group_b_partition_list': 'product', 1528 'super_group_b_group_size': '2000', 1529 } 1530 merged_dict = common.MergeDynamicPartitionInfoDicts( 1531 framework_dict=framework_dict, 1532 vendor_dict=vendor_dict) 1533 expected_merged_dict = { 1534 'use_dynamic_partitions': 'true', 1535 'super_partition_groups': 'group_a group_b', 1536 'dynamic_partition_list': 'product system vendor', 1537 'super_block_devices': 'super', 1538 'super_super_device_size': '3000', 1539 'super_group_a_partition_list': 'system vendor', 1540 'super_group_a_group_size': '1000', 1541 'super_group_b_partition_list': 'product', 1542 'super_group_b_group_size': '2000', 1543 } 1544 self.assertEqual(merged_dict, expected_merged_dict) 1545 1546 def test_MergeDynamicPartitionInfoDicts_IgnoringFrameworkGroupSize(self): 1547 framework_dict = { 1548 'use_dynamic_partitions': 'true', 1549 'super_partition_groups': 'group_a', 1550 'dynamic_partition_list': 'system', 1551 'super_group_a_partition_list': 'system', 1552 'super_group_a_group_size': '5000', 1553 } 1554 vendor_dict = { 1555 'use_dynamic_partitions': 'true', 1556 'super_partition_groups': 'group_a group_b', 1557 'dynamic_partition_list': 'vendor product', 1558 'super_group_a_partition_list': 'vendor', 1559 'super_group_a_group_size': '1000', 1560 'super_group_b_partition_list': 'product', 1561 'super_group_b_group_size': '2000', 1562 } 1563 merged_dict = common.MergeDynamicPartitionInfoDicts( 1564 framework_dict=framework_dict, 1565 vendor_dict=vendor_dict) 1566 expected_merged_dict = { 1567 'use_dynamic_partitions': 'true', 1568 'super_partition_groups': 'group_a group_b', 1569 'dynamic_partition_list': 'product system vendor', 1570 'super_group_a_partition_list': 'system vendor', 1571 'super_group_a_group_size': '1000', 1572 'super_group_b_partition_list': 'product', 1573 'super_group_b_group_size': '2000', 1574 } 1575 self.assertEqual(merged_dict, expected_merged_dict) 1576 1577 def test_GetAvbPartitionArg(self): 1578 info_dict = {} 1579 cmd = common.GetAvbPartitionArg('system', '/path/to/system.img', info_dict) 1580 self.assertEqual( 1581 ['--include_descriptors_from_image', '/path/to/system.img'], cmd) 1582 1583 @test_utils.SkipIfExternalToolsUnavailable() 1584 def test_AppendVBMetaArgsForPartition_vendorAsChainedPartition(self): 1585 testdata_dir = test_utils.get_testdata_dir() 1586 pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem') 1587 info_dict = { 1588 'avb_avbtool': 'avbtool', 1589 'avb_vendor_key_path': pubkey, 1590 'avb_vendor_rollback_index_location': 5, 1591 } 1592 cmd = common.GetAvbPartitionArg('vendor', '/path/to/vendor.img', info_dict) 1593 self.assertEqual(2, len(cmd)) 1594 self.assertEqual('--chain_partition', cmd[0]) 1595 chained_partition_args = cmd[1].split(':') 1596 self.assertEqual(3, len(chained_partition_args)) 1597 self.assertEqual('vendor', chained_partition_args[0]) 1598 self.assertEqual('5', chained_partition_args[1]) 1599 self.assertTrue(os.path.exists(chained_partition_args[2])) 1600 1601 @test_utils.SkipIfExternalToolsUnavailable() 1602 def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_nonAb(self): 1603 testdata_dir = test_utils.get_testdata_dir() 1604 pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem') 1605 info_dict = { 1606 'avb_avbtool': 'avbtool', 1607 'avb_recovery_key_path': pubkey, 1608 'avb_recovery_rollback_index_location': 3, 1609 } 1610 cmd = common.GetAvbPartitionArg( 1611 'recovery', '/path/to/recovery.img', info_dict) 1612 self.assertFalse(cmd) 1613 1614 @test_utils.SkipIfExternalToolsUnavailable() 1615 def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_ab(self): 1616 testdata_dir = test_utils.get_testdata_dir() 1617 pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem') 1618 info_dict = { 1619 'ab_update': 'true', 1620 'avb_avbtool': 'avbtool', 1621 'avb_recovery_key_path': pubkey, 1622 'avb_recovery_rollback_index_location': 3, 1623 } 1624 cmd = common.GetAvbPartitionArg( 1625 'recovery', '/path/to/recovery.img', info_dict) 1626 self.assertEqual(2, len(cmd)) 1627 self.assertEqual('--chain_partition', cmd[0]) 1628 chained_partition_args = cmd[1].split(':') 1629 self.assertEqual(3, len(chained_partition_args)) 1630 self.assertEqual('recovery', chained_partition_args[0]) 1631 self.assertEqual('3', chained_partition_args[1]) 1632 self.assertTrue(os.path.exists(chained_partition_args[2])) 1633 1634 def test_BuildVBMeta_appendAftlCommandSyntax(self): 1635 testdata_dir = test_utils.get_testdata_dir() 1636 common.OPTIONS.info_dict = { 1637 'ab_update': 'true', 1638 'avb_avbtool': 'avbtool', 1639 'build.prop': common.PartitionBuildProps.FromDictionary( 1640 'system', { 1641 'ro.build.version.incremental': '6285659', 1642 'ro.product.device': 'coral', 1643 'ro.build.fingerprint': 1644 'google/coral/coral:R/RP1A.200311.002/' 1645 '6285659:userdebug/dev-keys'} 1646 ), 1647 } 1648 common.OPTIONS.aftl_tool_path = 'aftltool' 1649 common.OPTIONS.aftl_server = 'log.endpoints.aftl-dev.cloud.goog:9000' 1650 common.OPTIONS.aftl_key_path = os.path.join(testdata_dir, 1651 'test_transparency_key.pub') 1652 common.OPTIONS.aftl_manufacturer_key_path = os.path.join( 1653 testdata_dir, 'test_aftl_rsa4096.pem') 1654 1655 vbmeta_image = tempfile.NamedTemporaryFile(delete=False) 1656 cmd = common.ConstructAftlMakeImageCommands(vbmeta_image.name) 1657 expected_cmd = [ 1658 'aftltool', 'make_icp_from_vbmeta', 1659 '--vbmeta_image_path', 'place_holder', 1660 '--output', vbmeta_image.name, 1661 '--version_incremental', '6285659', 1662 '--transparency_log_servers', 1663 'log.endpoints.aftl-dev.cloud.goog:9000,{}'.format( 1664 common.OPTIONS.aftl_key_path), 1665 '--manufacturer_key', common.OPTIONS.aftl_manufacturer_key_path, 1666 '--algorithm', 'SHA256_RSA4096', 1667 '--padding', '4096'] 1668 1669 # ignore the place holder, i.e. path to a temp file 1670 self.assertEqual(cmd[:3], expected_cmd[:3]) 1671 self.assertEqual(cmd[4:], expected_cmd[4:]) 1672 1673 @unittest.skip("enable after we have a server for public") 1674 def test_BuildVBMeta_appendAftlContactServer(self): 1675 testdata_dir = test_utils.get_testdata_dir() 1676 common.OPTIONS.info_dict = { 1677 'ab_update': 'true', 1678 'avb_avbtool': 'avbtool', 1679 'build.prop': common.PartitionBuildProps.FromDictionary( 1680 'system', { 1681 'ro.build.version.incremental': '6285659', 1682 'ro.product.device': 'coral', 1683 'ro.build.fingerprint': 1684 'google/coral/coral:R/RP1A.200311.002/' 1685 '6285659:userdebug/dev-keys'} 1686 ) 1687 } 1688 common.OPTIONS.aftl_tool_path = "aftltool" 1689 common.OPTIONS.aftl_server = "log.endpoints.aftl-dev.cloud.goog:9000" 1690 common.OPTIONS.aftl_key_path = os.path.join(testdata_dir, 1691 'test_transparency_key.pub') 1692 common.OPTIONS.aftl_manufacturer_key_path = os.path.join( 1693 testdata_dir, 'test_aftl_rsa4096.pem') 1694 1695 input_dir = common.MakeTempDir() 1696 system_image = common.MakeTempFile() 1697 build_image_cmd = ['mkuserimg_mke2fs', input_dir, system_image, 'ext4', 1698 '/system', str(4096 * 100), '-j', '0', '-s'] 1699 common.RunAndCheckOutput(build_image_cmd) 1700 1701 add_footer_cmd = ['avbtool', 'add_hashtree_footer', 1702 '--partition_size', str(4096 * 150), 1703 '--partition_name', 'system', 1704 '--image', system_image] 1705 common.RunAndCheckOutput(add_footer_cmd) 1706 1707 vbmeta_image = common.MakeTempFile() 1708 common.BuildVBMeta(vbmeta_image, {'system': system_image}, 'vbmeta', 1709 ['system']) 1710 1711 verify_cmd = ['aftltool', 'verify_image_icp', '--vbmeta_image_path', 1712 vbmeta_image, '--transparency_log_pub_keys', 1713 common.OPTIONS.aftl_key_path] 1714 common.RunAndCheckOutput(verify_cmd) 1715 1716 @test_utils.SkipIfExternalToolsUnavailable() 1717 def test_AppendGkiSigningArgs_NoSigningKeyPath(self): 1718 # A non-GKI boot.img has no gki_signing_key_path. 1719 common.OPTIONS.info_dict = { 1720 # 'gki_signing_key_path': pubkey, 1721 'gki_signing_algorithm': 'SHA256_RSA4096', 1722 'gki_signing_signature_args': '--prop foo:bar', 1723 } 1724 1725 # Tests no --gki_signing_* args are appended if there is no 1726 # gki_signing_key_path. 1727 cmd = ['mkbootimg', '--header_version', '4'] 1728 expected_cmd = ['mkbootimg', '--header_version', '4'] 1729 common.AppendGkiSigningArgs(cmd) 1730 self.assertEqual(cmd, expected_cmd) 1731 1732 def test_AppendGkiSigningArgs_NoSigningAlgorithm(self): 1733 pubkey = os.path.join(self.testdata_dir, 'testkey_gki.pem') 1734 with open(pubkey, 'wb') as f: 1735 f.write(b'\x00' * 100) 1736 self.assertTrue(os.path.exists(pubkey)) 1737 1738 # Tests no --gki_signing_* args are appended if there is no 1739 # gki_signing_algorithm. 1740 common.OPTIONS.info_dict = { 1741 'gki_signing_key_path': pubkey, 1742 # 'gki_signing_algorithm': 'SHA256_RSA4096', 1743 'gki_signing_signature_args': '--prop foo:bar', 1744 } 1745 1746 cmd = ['mkbootimg', '--header_version', '4'] 1747 expected_cmd = ['mkbootimg', '--header_version', '4'] 1748 common.AppendGkiSigningArgs(cmd) 1749 self.assertEqual(cmd, expected_cmd) 1750 1751 @test_utils.SkipIfExternalToolsUnavailable() 1752 def test_AppendGkiSigningArgs(self): 1753 pubkey = os.path.join(self.testdata_dir, 'testkey_gki.pem') 1754 with open(pubkey, 'wb') as f: 1755 f.write(b'\x00' * 100) 1756 self.assertTrue(os.path.exists(pubkey)) 1757 1758 common.OPTIONS.info_dict = { 1759 'gki_signing_key_path': pubkey, 1760 'gki_signing_algorithm': 'SHA256_RSA4096', 1761 'gki_signing_signature_args': '--prop foo:bar', 1762 } 1763 cmd = ['mkbootimg', '--header_version', '4'] 1764 common.AppendGkiSigningArgs(cmd) 1765 1766 expected_cmd = [ 1767 'mkbootimg', '--header_version', '4', 1768 '--gki_signing_key', pubkey, 1769 '--gki_signing_algorithm', 'SHA256_RSA4096', 1770 '--gki_signing_signature_args', '--prop foo:bar' 1771 ] 1772 self.assertEqual(cmd, expected_cmd) 1773 1774 @test_utils.SkipIfExternalToolsUnavailable() 1775 def test_AppendGkiSigningArgs_KeyPathNotFound(self): 1776 pubkey = os.path.join(self.testdata_dir, 'no_testkey_gki.pem') 1777 self.assertFalse(os.path.exists(pubkey)) 1778 1779 common.OPTIONS.info_dict = { 1780 'gki_signing_key_path': pubkey, 1781 'gki_signing_algorithm': 'SHA256_RSA4096', 1782 'gki_signing_signature_args': '--prop foo:bar', 1783 } 1784 cmd = ['mkbootimg', '--header_version', '4'] 1785 self.assertRaises(common.ExternalError, common.AppendGkiSigningArgs, cmd) 1786 1787 @test_utils.SkipIfExternalToolsUnavailable() 1788 def test_AppendGkiSigningArgs_SearchKeyPath(self): 1789 pubkey = 'testkey_gki.pem' 1790 self.assertFalse(os.path.exists(pubkey)) 1791 1792 # Tests it should replace the pubkey with an existed key under 1793 # OPTIONS.search_path, i.e., os.path.join(OPTIONS.search_path, pubkey). 1794 search_path_dir = common.MakeTempDir() 1795 search_pubkey = os.path.join(search_path_dir, pubkey) 1796 with open(search_pubkey, 'wb') as f: 1797 f.write(b'\x00' * 100) 1798 self.assertTrue(os.path.exists(search_pubkey)) 1799 1800 common.OPTIONS.search_path = search_path_dir 1801 common.OPTIONS.info_dict = { 1802 'gki_signing_key_path': pubkey, 1803 'gki_signing_algorithm': 'SHA256_RSA4096', 1804 'gki_signing_signature_args': '--prop foo:bar', 1805 } 1806 cmd = ['mkbootimg', '--header_version', '4'] 1807 common.AppendGkiSigningArgs(cmd) 1808 1809 expected_cmd = [ 1810 'mkbootimg', '--header_version', '4', 1811 '--gki_signing_key', search_pubkey, 1812 '--gki_signing_algorithm', 'SHA256_RSA4096', 1813 '--gki_signing_signature_args', '--prop foo:bar' 1814 ] 1815 self.assertEqual(cmd, expected_cmd) 1816 1817 @test_utils.SkipIfExternalToolsUnavailable() 1818 def test_AppendGkiSigningArgs_SearchKeyPathNotFound(self): 1819 pubkey = 'no_testkey_gki.pem' 1820 self.assertFalse(os.path.exists(pubkey)) 1821 1822 # Tests it should raise ExternalError if no key found under 1823 # OPTIONS.search_path. 1824 search_path_dir = common.MakeTempDir() 1825 search_pubkey = os.path.join(search_path_dir, pubkey) 1826 self.assertFalse(os.path.exists(search_pubkey)) 1827 1828 common.OPTIONS.search_path = search_path_dir 1829 common.OPTIONS.info_dict = { 1830 'gki_signing_key_path': pubkey, 1831 'gki_signing_algorithm': 'SHA256_RSA4096', 1832 'gki_signing_signature_args': '--prop foo:bar', 1833 } 1834 cmd = ['mkbootimg', '--header_version', '4'] 1835 self.assertRaises(common.ExternalError, common.AppendGkiSigningArgs, cmd) 1836 1837 1838class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase): 1839 """Checks the format of install-recovery.sh. 1840 1841 Its format should match between common.py and validate_target_files.py. 1842 """ 1843 1844 def setUp(self): 1845 self._tempdir = common.MakeTempDir() 1846 # Create a fake dict that contains the fstab info for boot&recovery. 1847 self._info = {"fstab" : {}} 1848 fake_fstab = [ 1849 "/dev/soc.0/by-name/boot /boot emmc defaults defaults", 1850 "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"] 1851 self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, fake_fstab) 1852 # Construct the gzipped recovery.img and boot.img 1853 self.recovery_data = bytearray([ 1854 0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a, 1855 0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3, 1856 0x08, 0x00, 0x00, 0x00 1857 ]) 1858 # echo -n "boot" | gzip -f | hd 1859 self.boot_data = bytearray([ 1860 0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca, 1861 0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00 1862 ]) 1863 1864 def _out_tmp_sink(self, name, data, prefix="SYSTEM"): 1865 loc = os.path.join(self._tempdir, prefix, name) 1866 if not os.path.exists(os.path.dirname(loc)): 1867 os.makedirs(os.path.dirname(loc)) 1868 with open(loc, "wb") as f: 1869 f.write(data) 1870 1871 def test_full_recovery(self): 1872 recovery_image = common.File("recovery.img", self.recovery_data) 1873 boot_image = common.File("boot.img", self.boot_data) 1874 self._info["full_recovery_image"] = "true" 1875 1876 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink, 1877 recovery_image, boot_image, self._info) 1878 validate_target_files.ValidateInstallRecoveryScript(self._tempdir, 1879 self._info) 1880 1881 @test_utils.SkipIfExternalToolsUnavailable() 1882 def test_recovery_from_boot(self): 1883 recovery_image = common.File("recovery.img", self.recovery_data) 1884 self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES") 1885 boot_image = common.File("boot.img", self.boot_data) 1886 self._out_tmp_sink("boot.img", boot_image.data, "IMAGES") 1887 1888 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink, 1889 recovery_image, boot_image, self._info) 1890 validate_target_files.ValidateInstallRecoveryScript(self._tempdir, 1891 self._info) 1892 # Validate 'recovery-from-boot' with bonus argument. 1893 self._out_tmp_sink("etc/recovery-resource.dat", b"bonus", "SYSTEM") 1894 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink, 1895 recovery_image, boot_image, self._info) 1896 validate_target_files.ValidateInstallRecoveryScript(self._tempdir, 1897 self._info) 1898 1899 1900class MockBlockDifference(object): 1901 1902 def __init__(self, partition, tgt, src=None): 1903 self.partition = partition 1904 self.tgt = tgt 1905 self.src = src 1906 1907 def WriteScript(self, script, _, progress=None, 1908 write_verify_script=False): 1909 if progress: 1910 script.AppendExtra("progress({})".format(progress)) 1911 script.AppendExtra("patch({});".format(self.partition)) 1912 if write_verify_script: 1913 self.WritePostInstallVerifyScript(script) 1914 1915 def WritePostInstallVerifyScript(self, script): 1916 script.AppendExtra("verify({});".format(self.partition)) 1917 1918 1919class FakeSparseImage(object): 1920 1921 def __init__(self, size): 1922 self.blocksize = 4096 1923 self.total_blocks = size // 4096 1924 assert size % 4096 == 0, "{} is not a multiple of 4096".format(size) 1925 1926 1927class DynamicPartitionsDifferenceTest(test_utils.ReleaseToolsTestCase): 1928 1929 @staticmethod 1930 def get_op_list(output_path): 1931 with zipfile.ZipFile(output_path, allowZip64=True) as output_zip: 1932 with output_zip.open('dynamic_partitions_op_list') as op_list: 1933 return [line.decode().strip() for line in op_list.readlines() 1934 if not line.startswith(b'#')] 1935 1936 def setUp(self): 1937 self.script = test_utils.MockScriptWriter() 1938 self.output_path = common.MakeTempFile(suffix='.zip') 1939 1940 def test_full(self): 1941 target_info = common.LoadDictionaryFromLines(""" 1942dynamic_partition_list=system vendor 1943super_partition_groups=group_foo 1944super_group_foo_group_size={group_size} 1945super_group_foo_partition_list=system vendor 1946""".format(group_size=4 * GiB).split("\n")) 1947 block_diffs = [MockBlockDifference("system", FakeSparseImage(3 * GiB)), 1948 MockBlockDifference("vendor", FakeSparseImage(1 * GiB))] 1949 1950 dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs) 1951 with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip: 1952 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True) 1953 1954 self.assertEqual(str(self.script).strip(), """ 1955assert(update_dynamic_partitions(package_extract_file("dynamic_partitions_op_list"))); 1956patch(system); 1957verify(system); 1958unmap_partition("system"); 1959patch(vendor); 1960verify(vendor); 1961unmap_partition("vendor"); 1962""".strip()) 1963 1964 lines = self.get_op_list(self.output_path) 1965 1966 remove_all_groups = lines.index("remove_all_groups") 1967 add_group = lines.index("add_group group_foo 4294967296") 1968 add_vendor = lines.index("add vendor group_foo") 1969 add_system = lines.index("add system group_foo") 1970 resize_vendor = lines.index("resize vendor 1073741824") 1971 resize_system = lines.index("resize system 3221225472") 1972 1973 self.assertLess(remove_all_groups, add_group, 1974 "Should add groups after removing all groups") 1975 self.assertLess(add_group, min(add_vendor, add_system), 1976 "Should add partitions after adding group") 1977 self.assertLess(add_system, resize_system, 1978 "Should resize system after adding it") 1979 self.assertLess(add_vendor, resize_vendor, 1980 "Should resize vendor after adding it") 1981 1982 def test_inc_groups(self): 1983 source_info = common.LoadDictionaryFromLines(""" 1984super_partition_groups=group_foo group_bar group_baz 1985super_group_foo_group_size={group_foo_size} 1986super_group_bar_group_size={group_bar_size} 1987""".format(group_foo_size=4 * GiB, group_bar_size=3 * GiB).split("\n")) 1988 target_info = common.LoadDictionaryFromLines(""" 1989super_partition_groups=group_foo group_baz group_qux 1990super_group_foo_group_size={group_foo_size} 1991super_group_baz_group_size={group_baz_size} 1992super_group_qux_group_size={group_qux_size} 1993""".format(group_foo_size=3 * GiB, group_baz_size=4 * GiB, 1994 group_qux_size=1 * GiB).split("\n")) 1995 1996 dp_diff = common.DynamicPartitionsDifference(target_info, 1997 block_diffs=[], 1998 source_info_dict=source_info) 1999 with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip: 2000 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True) 2001 2002 lines = self.get_op_list(self.output_path) 2003 2004 removed = lines.index("remove_group group_bar") 2005 shrunk = lines.index("resize_group group_foo 3221225472") 2006 grown = lines.index("resize_group group_baz 4294967296") 2007 added = lines.index("add_group group_qux 1073741824") 2008 2009 self.assertLess(max(removed, shrunk), 2010 min(grown, added), 2011 "ops that remove / shrink partitions must precede ops that " 2012 "grow / add partitions") 2013 2014 def test_incremental(self): 2015 source_info = common.LoadDictionaryFromLines(""" 2016dynamic_partition_list=system vendor product system_ext 2017super_partition_groups=group_foo 2018super_group_foo_group_size={group_foo_size} 2019super_group_foo_partition_list=system vendor product system_ext 2020""".format(group_foo_size=4 * GiB).split("\n")) 2021 target_info = common.LoadDictionaryFromLines(""" 2022dynamic_partition_list=system vendor product odm 2023super_partition_groups=group_foo group_bar 2024super_group_foo_group_size={group_foo_size} 2025super_group_foo_partition_list=system vendor odm 2026super_group_bar_group_size={group_bar_size} 2027super_group_bar_partition_list=product 2028""".format(group_foo_size=3 * GiB, group_bar_size=1 * GiB).split("\n")) 2029 2030 block_diffs = [MockBlockDifference("system", FakeSparseImage(1536 * MiB), 2031 src=FakeSparseImage(1024 * MiB)), 2032 MockBlockDifference("vendor", FakeSparseImage(512 * MiB), 2033 src=FakeSparseImage(1024 * MiB)), 2034 MockBlockDifference("product", FakeSparseImage(1024 * MiB), 2035 src=FakeSparseImage(1024 * MiB)), 2036 MockBlockDifference("system_ext", None, 2037 src=FakeSparseImage(1024 * MiB)), 2038 MockBlockDifference("odm", FakeSparseImage(1024 * MiB), 2039 src=None)] 2040 2041 dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs, 2042 source_info_dict=source_info) 2043 with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip: 2044 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True) 2045 2046 metadata_idx = self.script.lines.index( 2047 'assert(update_dynamic_partitions(package_extract_file(' 2048 '"dynamic_partitions_op_list")));') 2049 self.assertLess(self.script.lines.index('patch(vendor);'), metadata_idx) 2050 self.assertLess(metadata_idx, self.script.lines.index('verify(vendor);')) 2051 for p in ("product", "system", "odm"): 2052 patch_idx = self.script.lines.index("patch({});".format(p)) 2053 verify_idx = self.script.lines.index("verify({});".format(p)) 2054 self.assertLess(metadata_idx, patch_idx, 2055 "Should patch {} after updating metadata".format(p)) 2056 self.assertLess(patch_idx, verify_idx, 2057 "Should verify {} after patching".format(p)) 2058 2059 self.assertNotIn("patch(system_ext);", self.script.lines) 2060 2061 lines = self.get_op_list(self.output_path) 2062 2063 remove = lines.index("remove system_ext") 2064 move_product_out = lines.index("move product default") 2065 shrink = lines.index("resize vendor 536870912") 2066 shrink_group = lines.index("resize_group group_foo 3221225472") 2067 add_group_bar = lines.index("add_group group_bar 1073741824") 2068 add_odm = lines.index("add odm group_foo") 2069 grow_existing = lines.index("resize system 1610612736") 2070 grow_added = lines.index("resize odm 1073741824") 2071 move_product_in = lines.index("move product group_bar") 2072 2073 max_idx_move_partition_out_foo = max(remove, move_product_out, shrink) 2074 min_idx_move_partition_in_foo = min(add_odm, grow_existing, grow_added) 2075 2076 self.assertLess(max_idx_move_partition_out_foo, shrink_group, 2077 "Must shrink group after partitions inside group are shrunk" 2078 " / removed") 2079 2080 self.assertLess(add_group_bar, move_product_in, 2081 "Must add partitions to group after group is added") 2082 2083 self.assertLess(max_idx_move_partition_out_foo, 2084 min_idx_move_partition_in_foo, 2085 "Must shrink partitions / remove partitions from group" 2086 "before adding / moving partitions into group") 2087 2088 def test_remove_partition(self): 2089 source_info = common.LoadDictionaryFromLines(""" 2090blockimgdiff_versions=3,4 2091use_dynamic_partitions=true 2092dynamic_partition_list=foo 2093super_partition_groups=group_foo 2094super_group_foo_group_size={group_foo_size} 2095super_group_foo_partition_list=foo 2096""".format(group_foo_size=4 * GiB).split("\n")) 2097 target_info = common.LoadDictionaryFromLines(""" 2098blockimgdiff_versions=3,4 2099use_dynamic_partitions=true 2100super_partition_groups=group_foo 2101super_group_foo_group_size={group_foo_size} 2102""".format(group_foo_size=4 * GiB).split("\n")) 2103 2104 common.OPTIONS.info_dict = target_info 2105 common.OPTIONS.target_info_dict = target_info 2106 common.OPTIONS.source_info_dict = source_info 2107 common.OPTIONS.cache_size = 4 * 4096 2108 2109 block_diffs = [common.BlockDifference("foo", EmptyImage(), 2110 src=DataImage("source", pad=True))] 2111 2112 dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs, 2113 source_info_dict=source_info) 2114 with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip: 2115 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True) 2116 2117 self.assertNotIn("block_image_update", str(self.script), 2118 "Removed partition should not be patched.") 2119 2120 lines = self.get_op_list(self.output_path) 2121 self.assertEqual(lines, ["remove foo"]) 2122 2123 2124class PartitionBuildPropsTest(test_utils.ReleaseToolsTestCase): 2125 def setUp(self): 2126 self.odm_build_prop = [ 2127 'ro.odm.build.date.utc=1578430045', 2128 'ro.odm.build.fingerprint=' 2129 'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys', 2130 'ro.product.odm.device=coral', 2131 'import /odm/etc/build_${ro.boot.product.device_name}.prop', 2132 ] 2133 2134 @staticmethod 2135 def _BuildZipFile(entries): 2136 input_file = common.MakeTempFile(prefix='target_files-', suffix='.zip') 2137 with zipfile.ZipFile(input_file, 'w', allowZip64=True) as input_zip: 2138 for name, content in entries.items(): 2139 input_zip.writestr(name, content) 2140 2141 return input_file 2142 2143 def test_parseBuildProps_noImportStatement(self): 2144 build_prop = [ 2145 'ro.odm.build.date.utc=1578430045', 2146 'ro.odm.build.fingerprint=' 2147 'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys', 2148 'ro.product.odm.device=coral', 2149 ] 2150 input_file = self._BuildZipFile({ 2151 'ODM/etc/build.prop': '\n'.join(build_prop), 2152 }) 2153 2154 with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip: 2155 placeholder_values = { 2156 'ro.boot.product.device_name': ['std', 'pro'] 2157 } 2158 partition_props = common.PartitionBuildProps.FromInputFile( 2159 input_zip, 'odm', placeholder_values) 2160 2161 self.assertEqual({ 2162 'ro.odm.build.date.utc': '1578430045', 2163 'ro.odm.build.fingerprint': 2164 'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys', 2165 'ro.product.odm.device': 'coral', 2166 }, partition_props.build_props) 2167 2168 self.assertEqual(set(), partition_props.prop_overrides) 2169 2170 def test_parseBuildProps_singleImportStatement(self): 2171 build_std_prop = [ 2172 'ro.product.odm.device=coral', 2173 'ro.product.odm.name=product1', 2174 ] 2175 build_pro_prop = [ 2176 'ro.product.odm.device=coralpro', 2177 'ro.product.odm.name=product2', 2178 ] 2179 2180 input_file = self._BuildZipFile({ 2181 'ODM/etc/build.prop': '\n'.join(self.odm_build_prop), 2182 'ODM/etc/build_std.prop': '\n'.join(build_std_prop), 2183 'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop), 2184 }) 2185 2186 with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip: 2187 placeholder_values = { 2188 'ro.boot.product.device_name': 'std' 2189 } 2190 partition_props = common.PartitionBuildProps.FromInputFile( 2191 input_zip, 'odm', placeholder_values) 2192 2193 self.assertEqual({ 2194 'ro.odm.build.date.utc': '1578430045', 2195 'ro.odm.build.fingerprint': 2196 'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys', 2197 'ro.product.odm.device': 'coral', 2198 'ro.product.odm.name': 'product1', 2199 }, partition_props.build_props) 2200 2201 with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip: 2202 placeholder_values = { 2203 'ro.boot.product.device_name': 'pro' 2204 } 2205 partition_props = common.PartitionBuildProps.FromInputFile( 2206 input_zip, 'odm', placeholder_values) 2207 2208 self.assertEqual({ 2209 'ro.odm.build.date.utc': '1578430045', 2210 'ro.odm.build.fingerprint': 2211 'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys', 2212 'ro.product.odm.device': 'coralpro', 2213 'ro.product.odm.name': 'product2', 2214 }, partition_props.build_props) 2215 2216 def test_parseBuildProps_noPlaceHolders(self): 2217 build_prop = copy.copy(self.odm_build_prop) 2218 input_file = self._BuildZipFile({ 2219 'ODM/etc/build.prop': '\n'.join(build_prop), 2220 }) 2221 2222 with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip: 2223 partition_props = common.PartitionBuildProps.FromInputFile( 2224 input_zip, 'odm') 2225 2226 self.assertEqual({ 2227 'ro.odm.build.date.utc': '1578430045', 2228 'ro.odm.build.fingerprint': 2229 'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys', 2230 'ro.product.odm.device': 'coral', 2231 }, partition_props.build_props) 2232 2233 self.assertEqual(set(), partition_props.prop_overrides) 2234 2235 def test_parseBuildProps_multipleImportStatements(self): 2236 build_prop = copy.deepcopy(self.odm_build_prop) 2237 build_prop.append( 2238 'import /odm/etc/build_${ro.boot.product.product_name}.prop') 2239 2240 build_std_prop = [ 2241 'ro.product.odm.device=coral', 2242 ] 2243 build_pro_prop = [ 2244 'ro.product.odm.device=coralpro', 2245 ] 2246 2247 product1_prop = [ 2248 'ro.product.odm.name=product1', 2249 'ro.product.not_care=not_care', 2250 ] 2251 2252 product2_prop = [ 2253 'ro.product.odm.name=product2', 2254 'ro.product.not_care=not_care', 2255 ] 2256 2257 input_file = self._BuildZipFile({ 2258 'ODM/etc/build.prop': '\n'.join(build_prop), 2259 'ODM/etc/build_std.prop': '\n'.join(build_std_prop), 2260 'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop), 2261 'ODM/etc/build_product1.prop': '\n'.join(product1_prop), 2262 'ODM/etc/build_product2.prop': '\n'.join(product2_prop), 2263 }) 2264 2265 with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip: 2266 placeholder_values = { 2267 'ro.boot.product.device_name': 'std', 2268 'ro.boot.product.product_name': 'product1', 2269 'ro.boot.product.not_care': 'not_care', 2270 } 2271 partition_props = common.PartitionBuildProps.FromInputFile( 2272 input_zip, 'odm', placeholder_values) 2273 2274 self.assertEqual({ 2275 'ro.odm.build.date.utc': '1578430045', 2276 'ro.odm.build.fingerprint': 2277 'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys', 2278 'ro.product.odm.device': 'coral', 2279 'ro.product.odm.name': 'product1' 2280 }, partition_props.build_props) 2281 2282 with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip: 2283 placeholder_values = { 2284 'ro.boot.product.device_name': 'pro', 2285 'ro.boot.product.product_name': 'product2', 2286 'ro.boot.product.not_care': 'not_care', 2287 } 2288 partition_props = common.PartitionBuildProps.FromInputFile( 2289 input_zip, 'odm', placeholder_values) 2290 2291 self.assertEqual({ 2292 'ro.odm.build.date.utc': '1578430045', 2293 'ro.odm.build.fingerprint': 2294 'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys', 2295 'ro.product.odm.device': 'coralpro', 2296 'ro.product.odm.name': 'product2' 2297 }, partition_props.build_props) 2298 2299 def test_parseBuildProps_defineAfterOverride(self): 2300 build_prop = copy.deepcopy(self.odm_build_prop) 2301 build_prop.append('ro.product.odm.device=coral') 2302 2303 build_std_prop = [ 2304 'ro.product.odm.device=coral', 2305 ] 2306 build_pro_prop = [ 2307 'ro.product.odm.device=coralpro', 2308 ] 2309 2310 input_file = self._BuildZipFile({ 2311 'ODM/etc/build.prop': '\n'.join(build_prop), 2312 'ODM/etc/build_std.prop': '\n'.join(build_std_prop), 2313 'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop), 2314 }) 2315 2316 with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip: 2317 placeholder_values = { 2318 'ro.boot.product.device_name': 'std', 2319 } 2320 2321 self.assertRaises(ValueError, common.PartitionBuildProps.FromInputFile, 2322 input_zip, 'odm', placeholder_values) 2323 2324 def test_parseBuildProps_duplicateOverride(self): 2325 build_prop = copy.deepcopy(self.odm_build_prop) 2326 build_prop.append( 2327 'import /odm/etc/build_${ro.boot.product.product_name}.prop') 2328 2329 build_std_prop = [ 2330 'ro.product.odm.device=coral', 2331 'ro.product.odm.name=product1', 2332 ] 2333 build_pro_prop = [ 2334 'ro.product.odm.device=coralpro', 2335 ] 2336 2337 product1_prop = [ 2338 'ro.product.odm.name=product1', 2339 ] 2340 2341 product2_prop = [ 2342 'ro.product.odm.name=product2', 2343 ] 2344 2345 input_file = self._BuildZipFile({ 2346 'ODM/etc/build.prop': '\n'.join(build_prop), 2347 'ODM/etc/build_std.prop': '\n'.join(build_std_prop), 2348 'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop), 2349 'ODM/etc/build_product1.prop': '\n'.join(product1_prop), 2350 'ODM/etc/build_product2.prop': '\n'.join(product2_prop), 2351 }) 2352 2353 with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip: 2354 placeholder_values = { 2355 'ro.boot.product.device_name': 'std', 2356 'ro.boot.product.product_name': 'product1', 2357 } 2358 self.assertRaises(ValueError, common.PartitionBuildProps.FromInputFile, 2359 input_zip, 'odm', placeholder_values) 2360