• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import copy
18import json
19import os
20import subprocess
21import tempfile
22import time
23import unittest
24import zipfile
25from hashlib import sha1
26
27import common
28import test_utils
29import validate_target_files
30from images import EmptyImage, DataImage
31from rangelib import RangeSet
32
33
34KiB = 1024
35MiB = 1024 * KiB
36GiB = 1024 * MiB
37
38
39def get_2gb_string():
40  size = int(2 * GiB + 1)
41  block_size = 4 * KiB
42  step_size = 4 * MiB
43  # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
44  for _ in range(0, size, step_size):
45    yield os.urandom(block_size)
46    yield b'\0' * (step_size - block_size)
47
48
49class BuildInfoTest(test_utils.ReleaseToolsTestCase):
50
51  TEST_INFO_FINGERPRINT_DICT = {
52      'build.prop': common.PartitionBuildProps.FromDictionary(
53          'system', {
54              'ro.product.brand': 'product-brand',
55              'ro.product.name': 'product-name',
56              'ro.product.device': 'product-device',
57              'ro.build.version.release': 'version-release',
58              'ro.build.id': 'build-id',
59              'ro.build.version.incremental': 'version-incremental',
60              'ro.build.type': 'build-type',
61              'ro.build.tags': 'build-tags',
62              'ro.build.version.sdk': 30,
63          }
64      ),
65  }
66
67  TEST_INFO_DICT = {
68      'build.prop': common.PartitionBuildProps.FromDictionary(
69          'system', {
70              'ro.product.device': 'product-device',
71              'ro.product.name': 'product-name',
72              'ro.build.fingerprint': 'build-fingerprint',
73              'ro.build.foo': 'build-foo'}
74      ),
75      'system.build.prop': common.PartitionBuildProps.FromDictionary(
76          'system', {
77              'ro.product.system.brand': 'product-brand',
78              'ro.product.system.name': 'product-name',
79              'ro.product.system.device': 'product-device',
80              'ro.system.build.version.release': 'version-release',
81              'ro.system.build.id': 'build-id',
82              'ro.system.build.version.incremental': 'version-incremental',
83              'ro.system.build.type': 'build-type',
84              'ro.system.build.tags': 'build-tags',
85              'ro.system.build.foo': 'build-foo'}
86      ),
87      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
88          'vendor', {
89              'ro.product.vendor.brand': 'vendor-product-brand',
90              'ro.product.vendor.name': 'vendor-product-name',
91              'ro.product.vendor.device': 'vendor-product-device',
92              'ro.vendor.build.version.release': 'vendor-version-release',
93              'ro.vendor.build.id': 'vendor-build-id',
94              'ro.vendor.build.version.incremental':
95              'vendor-version-incremental',
96              'ro.vendor.build.type': 'vendor-build-type',
97              'ro.vendor.build.tags': 'vendor-build-tags'}
98      ),
99      'property1': 'value1',
100      'property2': 4096,
101  }
102
103  TEST_INFO_DICT_USES_OEM_PROPS = {
104      'build.prop': common.PartitionBuildProps.FromDictionary(
105          'system', {
106              'ro.product.name': 'product-name',
107              'ro.build.thumbprint': 'build-thumbprint',
108              'ro.build.bar': 'build-bar'}
109      ),
110      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
111          'vendor', {
112              'ro.vendor.build.fingerprint': 'vendor-build-fingerprint'}
113      ),
114      'property1': 'value1',
115      'property2': 4096,
116      'oem_fingerprint_properties': 'ro.product.device ro.product.brand',
117  }
118
119  TEST_OEM_DICTS = [
120      {
121          'ro.product.brand': 'brand1',
122          'ro.product.device': 'device1',
123      },
124      {
125          'ro.product.brand': 'brand2',
126          'ro.product.device': 'device2',
127      },
128      {
129          'ro.product.brand': 'brand3',
130          'ro.product.device': 'device3',
131      },
132  ]
133
134  TEST_INFO_DICT_PROPERTY_SOURCE_ORDER = {
135      'build.prop': common.PartitionBuildProps.FromDictionary(
136          'system', {
137              'ro.build.fingerprint': 'build-fingerprint',
138              'ro.product.property_source_order':
139                  'product,odm,vendor,system_ext,system'}
140      ),
141      'system.build.prop': common.PartitionBuildProps.FromDictionary(
142          'system', {
143              'ro.product.system.device': 'system-product-device'}
144      ),
145      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
146          'vendor', {
147              'ro.product.vendor.device': 'vendor-product-device'}
148      ),
149  }
150
151  TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_10 = {
152      'build.prop': common.PartitionBuildProps.FromDictionary(
153          'system', {
154              'ro.build.fingerprint': 'build-fingerprint',
155              'ro.product.property_source_order':
156                  'product,product_services,odm,vendor,system',
157              'ro.build.version.release': '10',
158              'ro.build.version.codename': 'REL'}
159      ),
160      'system.build.prop': common.PartitionBuildProps.FromDictionary(
161          'system', {
162              'ro.product.system.device': 'system-product-device'}
163      ),
164      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
165          'vendor', {
166              'ro.product.vendor.device': 'vendor-product-device'}
167      ),
168  }
169
170  TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_9 = {
171      'build.prop': common.PartitionBuildProps.FromDictionary(
172          'system', {
173              'ro.product.device': 'product-device',
174              'ro.build.fingerprint': 'build-fingerprint',
175              'ro.build.version.release': '9',
176              'ro.build.version.codename': 'REL'}
177      ),
178      'system.build.prop': common.PartitionBuildProps.FromDictionary(
179          'system', {
180              'ro.product.system.device': 'system-product-device'}
181      ),
182      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
183          'vendor', {
184              'ro.product.vendor.device': 'vendor-product-device'}
185      ),
186  }
187
188  def test_init(self):
189    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
190    self.assertEqual('product-device', target_info.device)
191    self.assertEqual('build-fingerprint', target_info.fingerprint)
192    self.assertFalse(target_info.is_ab)
193    self.assertIsNone(target_info.oem_props)
194
195  def test_init_with_oem_props(self):
196    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
197                                   self.TEST_OEM_DICTS)
198    self.assertEqual('device1', target_info.device)
199    self.assertEqual('brand1/product-name/device1:build-thumbprint',
200                     target_info.fingerprint)
201
202    # Swap the order in oem_dicts, which would lead to different BuildInfo.
203    oem_dicts = copy.copy(self.TEST_OEM_DICTS)
204    oem_dicts[0], oem_dicts[2] = oem_dicts[2], oem_dicts[0]
205    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
206                                   oem_dicts)
207    self.assertEqual('device3', target_info.device)
208    self.assertEqual('brand3/product-name/device3:build-thumbprint',
209                     target_info.fingerprint)
210
211  def test_init_badFingerprint(self):
212    info_dict = copy.deepcopy(self.TEST_INFO_DICT)
213    info_dict['build.prop'].build_props[
214        'ro.build.fingerprint'] = 'bad fingerprint'
215    self.assertRaises(ValueError, common.BuildInfo, info_dict, None)
216
217    info_dict['build.prop'].build_props[
218        'ro.build.fingerprint'] = 'bad\x80fingerprint'
219    self.assertRaises(ValueError, common.BuildInfo, info_dict, None)
220
221  def test_init_goodFingerprint(self):
222    info_dict = copy.deepcopy(self.TEST_INFO_FINGERPRINT_DICT)
223    build_info = common.BuildInfo(info_dict)
224    self.assertEqual(
225      'product-brand/product-name/product-device:version-release/build-id/'
226      'version-incremental:build-type/build-tags', build_info.fingerprint)
227
228    build_props = info_dict['build.prop'].build_props
229    del build_props['ro.build.id']
230    build_props['ro.build.legacy.id'] = 'legacy-build-id'
231    build_info = common.BuildInfo(info_dict, use_legacy_id=True)
232    self.assertEqual(
233      'product-brand/product-name/product-device:version-release/'
234      'legacy-build-id/version-incremental:build-type/build-tags',
235      build_info.fingerprint)
236
237    self.assertRaises(common.ExternalError, common.BuildInfo, info_dict, None,
238                      False)
239
240    info_dict['avb_enable'] = 'true'
241    info_dict['vbmeta_digest'] = 'abcde12345'
242    build_info = common.BuildInfo(info_dict, use_legacy_id=False)
243    self.assertEqual(
244      'product-brand/product-name/product-device:version-release/'
245      'legacy-build-id.abcde123/version-incremental:build-type/build-tags',
246      build_info.fingerprint)
247
248  def test___getitem__(self):
249    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
250    self.assertEqual('value1', target_info['property1'])
251    self.assertEqual(4096, target_info['property2'])
252    self.assertEqual('build-foo',
253                     target_info['build.prop'].GetProp('ro.build.foo'))
254
255  def test___getitem__with_oem_props(self):
256    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
257                                   self.TEST_OEM_DICTS)
258    self.assertEqual('value1', target_info['property1'])
259    self.assertEqual(4096, target_info['property2'])
260    self.assertIsNone(target_info['build.prop'].GetProp('ro.build.foo'))
261
262  def test___setitem__(self):
263    target_info = common.BuildInfo(copy.deepcopy(self.TEST_INFO_DICT), None)
264    self.assertEqual('value1', target_info['property1'])
265    target_info['property1'] = 'value2'
266    self.assertEqual('value2', target_info['property1'])
267
268    self.assertEqual('build-foo',
269                     target_info['build.prop'].GetProp('ro.build.foo'))
270    target_info['build.prop'].build_props['ro.build.foo'] = 'build-bar'
271    self.assertEqual('build-bar',
272                     target_info['build.prop'].GetProp('ro.build.foo'))
273
274  def test_get(self):
275    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
276    self.assertEqual('value1', target_info.get('property1'))
277    self.assertEqual(4096, target_info.get('property2'))
278    self.assertEqual(4096, target_info.get('property2', 1024))
279    self.assertEqual(1024, target_info.get('property-nonexistent', 1024))
280    self.assertEqual('build-foo',
281                     target_info.get('build.prop').GetProp('ro.build.foo'))
282
283  def test_get_with_oem_props(self):
284    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
285                                   self.TEST_OEM_DICTS)
286    self.assertEqual('value1', target_info.get('property1'))
287    self.assertEqual(4096, target_info.get('property2'))
288    self.assertEqual(4096, target_info.get('property2', 1024))
289    self.assertEqual(1024, target_info.get('property-nonexistent', 1024))
290    self.assertIsNone(target_info.get('build.prop').GetProp('ro.build.foo'))
291
292  def test_items(self):
293    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
294    items = target_info.items()
295    self.assertIn(('property1', 'value1'), items)
296    self.assertIn(('property2', 4096), items)
297
298  def test_GetBuildProp(self):
299    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
300    self.assertEqual('build-foo', target_info.GetBuildProp('ro.build.foo'))
301    self.assertRaises(common.ExternalError, target_info.GetBuildProp,
302                      'ro.build.nonexistent')
303
304  def test_GetBuildProp_with_oem_props(self):
305    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
306                                   self.TEST_OEM_DICTS)
307    self.assertEqual('build-bar', target_info.GetBuildProp('ro.build.bar'))
308    self.assertRaises(common.ExternalError, target_info.GetBuildProp,
309                      'ro.build.nonexistent')
310
311  def test_GetPartitionFingerprint(self):
312    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
313    self.assertEqual(
314        target_info.GetPartitionFingerprint('vendor'),
315        'vendor-product-brand/vendor-product-name/vendor-product-device'
316        ':vendor-version-release/vendor-build-id/vendor-version-incremental'
317        ':vendor-build-type/vendor-build-tags')
318
319  def test_GetPartitionFingerprint_system_other_uses_system(self):
320    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
321    self.assertEqual(
322        target_info.GetPartitionFingerprint('system_other'),
323        target_info.GetPartitionFingerprint('system'))
324
325  def test_GetPartitionFingerprint_uses_fingerprint_prop_if_available(self):
326    info_dict = copy.deepcopy(self.TEST_INFO_DICT)
327    info_dict['vendor.build.prop'].build_props[
328        'ro.vendor.build.fingerprint'] = 'vendor:fingerprint'
329    target_info = common.BuildInfo(info_dict, None)
330    self.assertEqual(
331        target_info.GetPartitionFingerprint('vendor'),
332        'vendor:fingerprint')
333
334  def test_WriteMountOemScript(self):
335    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
336                                   self.TEST_OEM_DICTS)
337    script_writer = test_utils.MockScriptWriter()
338    target_info.WriteMountOemScript(script_writer)
339    self.assertEqual([('Mount', '/oem', None)], script_writer.lines)
340
341  def test_WriteDeviceAssertions(self):
342    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
343    script_writer = test_utils.MockScriptWriter()
344    target_info.WriteDeviceAssertions(script_writer, False)
345    self.assertEqual([('AssertDevice', 'product-device')], script_writer.lines)
346
347  def test_WriteDeviceAssertions_with_oem_props(self):
348    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
349                                   self.TEST_OEM_DICTS)
350    script_writer = test_utils.MockScriptWriter()
351    target_info.WriteDeviceAssertions(script_writer, False)
352    self.assertEqual(
353        [
354            ('AssertOemProperty', 'ro.product.device',
355             ['device1', 'device2', 'device3'], False),
356            ('AssertOemProperty', 'ro.product.brand',
357             ['brand1', 'brand2', 'brand3'], False),
358        ],
359        script_writer.lines)
360
361  def test_ResolveRoProductProperty_FromVendor(self):
362    info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER)
363    info = common.BuildInfo(info_dict, None)
364    self.assertEqual('vendor-product-device',
365                     info.GetBuildProp('ro.product.device'))
366
367  def test_ResolveRoProductProperty_FromSystem(self):
368    info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER)
369    del info_dict['vendor.build.prop'].build_props['ro.product.vendor.device']
370    info = common.BuildInfo(info_dict, None)
371    self.assertEqual('system-product-device',
372                     info.GetBuildProp('ro.product.device'))
373
374  def test_ResolveRoProductProperty_InvalidPropertySearchOrder(self):
375    info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER)
376    info_dict['build.prop'].build_props[
377        'ro.product.property_source_order'] = 'bad-source'
378    with self.assertRaisesRegexp(common.ExternalError,
379        'Invalid ro.product.property_source_order'):
380      info = common.BuildInfo(info_dict, None)
381      info.GetBuildProp('ro.product.device')
382
383  def test_ResolveRoProductProperty_Android10PropertySearchOrder(self):
384    info_dict = copy.deepcopy(
385        self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_10)
386    info = common.BuildInfo(info_dict, None)
387    self.assertEqual('vendor-product-device',
388                     info.GetBuildProp('ro.product.device'))
389
390  def test_ResolveRoProductProperty_Android9PropertySearchOrder(self):
391    info_dict = copy.deepcopy(
392        self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_9)
393    info = common.BuildInfo(info_dict, None)
394    self.assertEqual('product-device',
395                     info.GetBuildProp('ro.product.device'))
396
397
398class CommonZipTest(test_utils.ReleaseToolsTestCase):
399
400  def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
401              test_file_name=None, expected_stat=None, expected_mode=0o644,
402              expected_compress_type=zipfile.ZIP_STORED):
403    # Verify the stat if present.
404    if test_file_name is not None:
405      new_stat = os.stat(test_file_name)
406      self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
407      self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
408
409    # Reopen the zip file to verify.
410    zip_file = zipfile.ZipFile(zip_file_name, "r", allowZip64=True)
411
412    # Verify the timestamp.
413    info = zip_file.getinfo(arcname)
414    self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
415
416    # Verify the file mode.
417    mode = (info.external_attr >> 16) & 0o777
418    self.assertEqual(mode, expected_mode)
419
420    # Verify the compress type.
421    self.assertEqual(info.compress_type, expected_compress_type)
422
423    # Verify the zip contents.
424    entry = zip_file.open(arcname)
425    sha1_hash = sha1()
426    for chunk in iter(lambda: entry.read(4 * MiB), b''):
427      sha1_hash.update(chunk)
428    self.assertEqual(expected_hash, sha1_hash.hexdigest())
429    self.assertIsNone(zip_file.testzip())
430
431  def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
432    extra_zipwrite_args = dict(extra_zipwrite_args or {})
433
434    test_file = tempfile.NamedTemporaryFile(delete=False)
435    test_file_name = test_file.name
436
437    zip_file = tempfile.NamedTemporaryFile(delete=False)
438    zip_file_name = zip_file.name
439
440    # File names within an archive strip the leading slash.
441    arcname = extra_zipwrite_args.get("arcname", test_file_name)
442    if arcname[0] == "/":
443      arcname = arcname[1:]
444
445    zip_file.close()
446    zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True)
447
448    try:
449      sha1_hash = sha1()
450      for data in contents:
451        sha1_hash.update(bytes(data))
452        test_file.write(bytes(data))
453      test_file.close()
454
455      expected_mode = extra_zipwrite_args.get("perms", 0o644)
456      expected_compress_type = extra_zipwrite_args.get("compress_type",
457                                                       zipfile.ZIP_STORED)
458
459      # Arbitrary timestamp, just to make sure common.ZipWrite() restores
460      # the timestamp after writing.
461      os.utime(test_file_name, (1234567, 1234567))
462      expected_stat = os.stat(test_file_name)
463      common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
464      common.ZipClose(zip_file)
465
466      self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
467                   test_file_name, expected_stat, expected_mode,
468                   expected_compress_type)
469    finally:
470      os.remove(test_file_name)
471      os.remove(zip_file_name)
472
473  def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
474    extra_args = dict(extra_args or {})
475
476    zip_file = tempfile.NamedTemporaryFile(delete=False)
477    zip_file_name = zip_file.name
478    zip_file.close()
479
480    zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True)
481
482    try:
483      expected_compress_type = extra_args.get("compress_type",
484                                              zipfile.ZIP_STORED)
485      if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
486        arcname = zinfo_or_arcname
487        expected_mode = extra_args.get("perms", 0o644)
488      else:
489        arcname = zinfo_or_arcname.filename
490        if zinfo_or_arcname.external_attr:
491          zinfo_perms = zinfo_or_arcname.external_attr >> 16
492        else:
493          zinfo_perms = 0o600
494        expected_mode = extra_args.get("perms", zinfo_perms)
495
496      common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
497      common.ZipClose(zip_file)
498
499      self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
500                   expected_mode=expected_mode,
501                   expected_compress_type=expected_compress_type)
502    finally:
503      os.remove(zip_file_name)
504
505  def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
506    extra_args = dict(extra_args or {})
507
508    zip_file = tempfile.NamedTemporaryFile(delete=False)
509    zip_file_name = zip_file.name
510
511    test_file = tempfile.NamedTemporaryFile(delete=False)
512    test_file_name = test_file.name
513
514    arcname_large = test_file_name
515    arcname_small = "bar"
516
517    # File names within an archive strip the leading slash.
518    if arcname_large[0] == "/":
519      arcname_large = arcname_large[1:]
520
521    zip_file.close()
522    zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True)
523
524    try:
525      sha1_hash = sha1()
526      for data in large:
527        sha1_hash.update(data)
528        test_file.write(data)
529      test_file.close()
530
531      # Arbitrary timestamp, just to make sure common.ZipWrite() restores
532      # the timestamp after writing.
533      os.utime(test_file_name, (1234567, 1234567))
534      expected_stat = os.stat(test_file_name)
535      expected_mode = 0o644
536      expected_compress_type = extra_args.get("compress_type",
537                                              zipfile.ZIP_STORED)
538
539      common.ZipWrite(zip_file, test_file_name, **extra_args)
540      common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
541      common.ZipClose(zip_file)
542
543      # Verify the contents written by ZipWrite().
544      self._verify(zip_file, zip_file_name, arcname_large,
545                   sha1_hash.hexdigest(), test_file_name, expected_stat,
546                   expected_mode, expected_compress_type)
547
548      # Verify the contents written by ZipWriteStr().
549      self._verify(zip_file, zip_file_name, arcname_small,
550                   sha1(small).hexdigest(),
551                   expected_compress_type=expected_compress_type)
552    finally:
553      os.remove(zip_file_name)
554      os.remove(test_file_name)
555
556  def _test_reset_ZIP64_LIMIT(self, func, *args):
557    default_limit = (1 << 31) - 1
558    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
559    func(*args)
560    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
561
562  def test_ZipWrite(self):
563    file_contents = os.urandom(1024)
564    self._test_ZipWrite(file_contents)
565
566  def test_ZipWrite_with_opts(self):
567    file_contents = os.urandom(1024)
568    self._test_ZipWrite(file_contents, {
569        "arcname": "foobar",
570        "perms": 0o777,
571        "compress_type": zipfile.ZIP_DEFLATED,
572    })
573    self._test_ZipWrite(file_contents, {
574        "arcname": "foobar",
575        "perms": 0o700,
576        "compress_type": zipfile.ZIP_STORED,
577    })
578
579  def test_ZipWrite_large_file(self):
580    file_contents = get_2gb_string()
581    self._test_ZipWrite(file_contents, {
582        "compress_type": zipfile.ZIP_DEFLATED,
583    })
584
585  def test_ZipWrite_resets_ZIP64_LIMIT(self):
586    self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
587
588  def test_ZipWriteStr(self):
589    random_string = os.urandom(1024)
590    # Passing arcname
591    self._test_ZipWriteStr("foo", random_string)
592
593    # Passing zinfo
594    zinfo = zipfile.ZipInfo(filename="foo")
595    self._test_ZipWriteStr(zinfo, random_string)
596
597    # Timestamp in the zinfo should be overwritten.
598    zinfo.date_time = (2015, 3, 1, 15, 30, 0)
599    self._test_ZipWriteStr(zinfo, random_string)
600
601  def test_ZipWriteStr_with_opts(self):
602    random_string = os.urandom(1024)
603    # Passing arcname
604    self._test_ZipWriteStr("foo", random_string, {
605        "perms": 0o700,
606        "compress_type": zipfile.ZIP_DEFLATED,
607    })
608    self._test_ZipWriteStr("bar", random_string, {
609        "compress_type": zipfile.ZIP_STORED,
610    })
611
612    # Passing zinfo
613    zinfo = zipfile.ZipInfo(filename="foo")
614    self._test_ZipWriteStr(zinfo, random_string, {
615        "compress_type": zipfile.ZIP_DEFLATED,
616    })
617    self._test_ZipWriteStr(zinfo, random_string, {
618        "perms": 0o600,
619        "compress_type": zipfile.ZIP_STORED,
620    })
621    self._test_ZipWriteStr(zinfo, random_string, {
622        "perms": 0o000,
623        "compress_type": zipfile.ZIP_STORED,
624    })
625
626  def test_ZipWriteStr_large_file(self):
627    # zipfile.writestr() doesn't work when the str size is over 2GiB even with
628    # the workaround. We will only test the case of writing a string into a
629    # large archive.
630    long_string = get_2gb_string()
631    short_string = os.urandom(1024)
632    self._test_ZipWriteStr_large_file(long_string, short_string, {
633        "compress_type": zipfile.ZIP_DEFLATED,
634    })
635
636  def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
637    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, 'foo', b'')
638    zinfo = zipfile.ZipInfo(filename="foo")
639    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, b'')
640
641  def test_bug21309935(self):
642    zip_file = tempfile.NamedTemporaryFile(delete=False)
643    zip_file_name = zip_file.name
644    zip_file.close()
645
646    try:
647      random_string = os.urandom(1024)
648      zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True)
649      # Default perms should be 0o644 when passing the filename.
650      common.ZipWriteStr(zip_file, "foo", random_string)
651      # Honor the specified perms.
652      common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
653      # The perms in zinfo should be untouched.
654      zinfo = zipfile.ZipInfo(filename="baz")
655      zinfo.external_attr = 0o740 << 16
656      common.ZipWriteStr(zip_file, zinfo, random_string)
657      # Explicitly specified perms has the priority.
658      zinfo = zipfile.ZipInfo(filename="qux")
659      zinfo.external_attr = 0o700 << 16
660      common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
661      common.ZipClose(zip_file)
662
663      self._verify(zip_file, zip_file_name, "foo",
664                   sha1(random_string).hexdigest(),
665                   expected_mode=0o644)
666      self._verify(zip_file, zip_file_name, "bar",
667                   sha1(random_string).hexdigest(),
668                   expected_mode=0o755)
669      self._verify(zip_file, zip_file_name, "baz",
670                   sha1(random_string).hexdigest(),
671                   expected_mode=0o740)
672      self._verify(zip_file, zip_file_name, "qux",
673                   sha1(random_string).hexdigest(),
674                   expected_mode=0o400)
675    finally:
676      os.remove(zip_file_name)
677
678  @test_utils.SkipIfExternalToolsUnavailable()
679  def test_ZipDelete(self):
680    zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
681    output_zip = zipfile.ZipFile(zip_file.name, 'w',
682                                 compression=zipfile.ZIP_DEFLATED)
683    with tempfile.NamedTemporaryFile() as entry_file:
684      entry_file.write(os.urandom(1024))
685      common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
686      common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
687      common.ZipWrite(output_zip, entry_file.name, arcname='Test3')
688      common.ZipClose(output_zip)
689    zip_file.close()
690
691    try:
692      common.ZipDelete(zip_file.name, 'Test2')
693      with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip:
694        entries = check_zip.namelist()
695        self.assertTrue('Test1' in entries)
696        self.assertFalse('Test2' in entries)
697        self.assertTrue('Test3' in entries)
698
699      self.assertRaises(
700          common.ExternalError, common.ZipDelete, zip_file.name, 'Test2')
701      with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip:
702        entries = check_zip.namelist()
703        self.assertTrue('Test1' in entries)
704        self.assertFalse('Test2' in entries)
705        self.assertTrue('Test3' in entries)
706
707      common.ZipDelete(zip_file.name, ['Test3'])
708      with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip:
709        entries = check_zip.namelist()
710        self.assertTrue('Test1' in entries)
711        self.assertFalse('Test2' in entries)
712        self.assertFalse('Test3' in entries)
713
714      common.ZipDelete(zip_file.name, ['Test1', 'Test2'])
715      with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip:
716        entries = check_zip.namelist()
717        self.assertFalse('Test1' in entries)
718        self.assertFalse('Test2' in entries)
719        self.assertFalse('Test3' in entries)
720    finally:
721      os.remove(zip_file.name)
722
723  @staticmethod
724  def _test_UnzipTemp_createZipFile():
725    zip_file = common.MakeTempFile(suffix='.zip')
726    output_zip = zipfile.ZipFile(
727        zip_file, 'w', compression=zipfile.ZIP_DEFLATED)
728    contents = os.urandom(1024)
729    with tempfile.NamedTemporaryFile() as entry_file:
730      entry_file.write(contents)
731      common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
732      common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
733      common.ZipWrite(output_zip, entry_file.name, arcname='Foo3')
734      common.ZipWrite(output_zip, entry_file.name, arcname='Bar4')
735      common.ZipWrite(output_zip, entry_file.name, arcname='Dir5/Baz5')
736      common.ZipClose(output_zip)
737    common.ZipClose(output_zip)
738    return zip_file
739
740  @test_utils.SkipIfExternalToolsUnavailable()
741  def test_UnzipTemp(self):
742    zip_file = self._test_UnzipTemp_createZipFile()
743    unzipped_dir = common.UnzipTemp(zip_file)
744    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
745    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
746    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
747    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
748    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
749
750  @test_utils.SkipIfExternalToolsUnavailable()
751  def test_UnzipTemp_withPatterns(self):
752    zip_file = self._test_UnzipTemp_createZipFile()
753
754    unzipped_dir = common.UnzipTemp(zip_file, ['Test1'])
755    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
756    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
757    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
758    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
759    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
760
761    unzipped_dir = common.UnzipTemp(zip_file, ['Test1', 'Foo3'])
762    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
763    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
764    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
765    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
766    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
767
768    unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Foo3*'])
769    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
770    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
771    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
772    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
773    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
774
775    unzipped_dir = common.UnzipTemp(zip_file, ['*Test1', '*Baz*'])
776    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
777    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
778    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
779    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
780    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
781
782  def test_UnzipTemp_withEmptyPatterns(self):
783    zip_file = self._test_UnzipTemp_createZipFile()
784    unzipped_dir = common.UnzipTemp(zip_file, [])
785    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
786    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
787    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
788    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
789    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
790
791  @test_utils.SkipIfExternalToolsUnavailable()
792  def test_UnzipTemp_withPartiallyMatchingPatterns(self):
793    zip_file = self._test_UnzipTemp_createZipFile()
794    unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Nonexistent*'])
795    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
796    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
797    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
798    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
799    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
800
801  def test_UnzipTemp_withNoMatchingPatterns(self):
802    zip_file = self._test_UnzipTemp_createZipFile()
803    unzipped_dir = common.UnzipTemp(zip_file, ['Foo4', 'Nonexistent*'])
804    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
805    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
806    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
807    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
808    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
809
810
811class CommonApkUtilsTest(test_utils.ReleaseToolsTestCase):
812  """Tests the APK utils related functions."""
813
814  APKCERTS_TXT1 = (
815      'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
816      ' private_key="certs/devkey.pk8"\n'
817      'name="Settings.apk"'
818      ' certificate="build/make/target/product/security/platform.x509.pem"'
819      ' private_key="build/make/target/product/security/platform.pk8"\n'
820      'name="TV.apk" certificate="PRESIGNED" private_key=""\n'
821  )
822
823  APKCERTS_CERTMAP1 = {
824      'RecoveryLocalizer.apk' : 'certs/devkey',
825      'Settings.apk' : 'build/make/target/product/security/platform',
826      'TV.apk' : 'PRESIGNED',
827  }
828
829  APKCERTS_TXT2 = (
830      'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"'
831      ' private_key="certs/compressed1.pk8" compressed="gz"\n'
832      'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"'
833      ' private_key="certs/compressed2.pk8" compressed="gz"\n'
834      'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"'
835      ' private_key="certs/compressed2.pk8" compressed="gz"\n'
836      'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"'
837      ' private_key="certs/compressed3.pk8" compressed="gz"\n'
838  )
839
840  APKCERTS_CERTMAP2 = {
841      'Compressed1.apk' : 'certs/compressed1',
842      'Compressed2a.apk' : 'certs/compressed2',
843      'Compressed2b.apk' : 'certs/compressed2',
844      'Compressed3.apk' : 'certs/compressed3',
845  }
846
847  APKCERTS_TXT3 = (
848      'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"'
849      ' private_key="certs/compressed4.pk8" compressed="xz"\n'
850  )
851
852  APKCERTS_CERTMAP3 = {
853      'Compressed4.apk' : 'certs/compressed4',
854  }
855
856  # Test parsing with no optional fields, both optional fields, and only the
857  # partition optional field.
858  APKCERTS_TXT4 = (
859      'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
860      ' private_key="certs/devkey.pk8"\n'
861      'name="Settings.apk"'
862      ' certificate="build/make/target/product/security/platform.x509.pem"'
863      ' private_key="build/make/target/product/security/platform.pk8"'
864      ' compressed="gz" partition="system"\n'
865      'name="TV.apk" certificate="PRESIGNED" private_key=""'
866      ' partition="product"\n'
867  )
868
869  APKCERTS_CERTMAP4 = {
870      'RecoveryLocalizer.apk' : 'certs/devkey',
871      'Settings.apk' : 'build/make/target/product/security/platform',
872      'TV.apk' : 'PRESIGNED',
873  }
874
875  def setUp(self):
876    self.testdata_dir = test_utils.get_testdata_dir()
877
878  @staticmethod
879  def _write_apkcerts_txt(apkcerts_txt, additional=None):
880    if additional is None:
881      additional = []
882    target_files = common.MakeTempFile(suffix='.zip')
883    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
884      target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt)
885      for entry in additional:
886        target_files_zip.writestr(entry, '')
887    return target_files
888
889  def test_ReadApkCerts_NoncompressedApks(self):
890    target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1)
891    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
892      certmap, ext = common.ReadApkCerts(input_zip)
893
894    self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap)
895    self.assertIsNone(ext)
896
897  def test_ReadApkCerts_CompressedApks(self):
898    # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is
899    # not stored in '.gz' format, so it shouldn't be considered as installed.
900    target_files = self._write_apkcerts_txt(
901        self.APKCERTS_TXT2,
902        ['Compressed1.apk.gz', 'Compressed3.apk'])
903
904    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
905      certmap, ext = common.ReadApkCerts(input_zip)
906
907    self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap)
908    self.assertEqual('.gz', ext)
909
910    # Alternative case with '.xz'.
911    target_files = self._write_apkcerts_txt(
912        self.APKCERTS_TXT3, ['Compressed4.apk.xz'])
913
914    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
915      certmap, ext = common.ReadApkCerts(input_zip)
916
917    self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap)
918    self.assertEqual('.xz', ext)
919
920  def test_ReadApkCerts_CompressedAndNoncompressedApks(self):
921    target_files = self._write_apkcerts_txt(
922        self.APKCERTS_TXT1 + self.APKCERTS_TXT2,
923        ['Compressed1.apk.gz', 'Compressed3.apk'])
924
925    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
926      certmap, ext = common.ReadApkCerts(input_zip)
927
928    certmap_merged = self.APKCERTS_CERTMAP1.copy()
929    certmap_merged.update(self.APKCERTS_CERTMAP2)
930    self.assertDictEqual(certmap_merged, certmap)
931    self.assertEqual('.gz', ext)
932
933  def test_ReadApkCerts_MultipleCompressionMethods(self):
934    target_files = self._write_apkcerts_txt(
935        self.APKCERTS_TXT2 + self.APKCERTS_TXT3,
936        ['Compressed1.apk.gz', 'Compressed4.apk.xz'])
937
938    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
939      self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
940
941  def test_ReadApkCerts_MismatchingKeys(self):
942    malformed_apkcerts_txt = (
943        'name="App1.apk" certificate="certs/cert1.x509.pem"'
944        ' private_key="certs/cert2.pk8"\n'
945    )
946    target_files = self._write_apkcerts_txt(malformed_apkcerts_txt)
947
948    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
949      self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
950
951  def test_ReadApkCerts_WithWithoutOptionalFields(self):
952    target_files = self._write_apkcerts_txt(self.APKCERTS_TXT4)
953    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
954      certmap, ext = common.ReadApkCerts(input_zip)
955
956    self.assertDictEqual(self.APKCERTS_CERTMAP4, certmap)
957    self.assertIsNone(ext)
958
959  def test_ExtractPublicKey(self):
960    cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
961    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
962    with open(pubkey) as pubkey_fp:
963      self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert))
964
965  def test_ExtractPublicKey_invalidInput(self):
966    wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8')
967    self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input)
968
969  @test_utils.SkipIfExternalToolsUnavailable()
970  def test_ExtractAvbPublicKey(self):
971    privkey = os.path.join(self.testdata_dir, 'testkey.key')
972    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
973    extracted_from_privkey = common.ExtractAvbPublicKey('avbtool', privkey)
974    extracted_from_pubkey = common.ExtractAvbPublicKey('avbtool', pubkey)
975    with open(extracted_from_privkey, 'rb') as privkey_fp, \
976        open(extracted_from_pubkey, 'rb') as pubkey_fp:
977      self.assertEqual(privkey_fp.read(), pubkey_fp.read())
978
979  def test_ParseCertificate(self):
980    cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
981
982    cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER']
983    proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
984                      universal_newlines=False)
985    expected, _ = proc.communicate()
986    self.assertEqual(0, proc.returncode)
987
988    with open(cert) as cert_fp:
989      actual = common.ParseCertificate(cert_fp.read())
990    self.assertEqual(expected, actual)
991
992  @test_utils.SkipIfExternalToolsUnavailable()
993  def test_GetMinSdkVersion(self):
994    test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
995    self.assertEqual('24', common.GetMinSdkVersion(test_app))
996
997  @test_utils.SkipIfExternalToolsUnavailable()
998  def test_GetMinSdkVersion_invalidInput(self):
999    self.assertRaises(
1000        common.ExternalError, common.GetMinSdkVersion, 'does-not-exist.apk')
1001
1002  @test_utils.SkipIfExternalToolsUnavailable()
1003  def test_GetMinSdkVersionInt(self):
1004    test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
1005    self.assertEqual(24, common.GetMinSdkVersionInt(test_app, {}))
1006
1007  @test_utils.SkipIfExternalToolsUnavailable()
1008  def test_GetMinSdkVersionInt_invalidInput(self):
1009    self.assertRaises(
1010        common.ExternalError, common.GetMinSdkVersionInt, 'does-not-exist.apk',
1011        {})
1012
1013
1014class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
1015
1016  def setUp(self):
1017    self.testdata_dir = test_utils.get_testdata_dir()
1018
1019  @test_utils.SkipIfExternalToolsUnavailable()
1020  def test_GetSparseImage_emptyBlockMapFile(self):
1021    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1022    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1023      target_files_zip.write(
1024          test_utils.construct_sparse_image([
1025              (0xCAC1, 6),
1026              (0xCAC3, 3),
1027              (0xCAC1, 4)]),
1028          arcname='IMAGES/system.img')
1029      target_files_zip.writestr('IMAGES/system.map', '')
1030      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
1031      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1032
1033    tempdir = common.UnzipTemp(target_files)
1034    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1035      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1036
1037    self.assertDictEqual(
1038        {
1039            '__COPY': RangeSet("0"),
1040            '__NONZERO-0': RangeSet("1-5 9-12"),
1041        },
1042        sparse_image.file_map)
1043
1044  def test_PartitionMapFromTargetFiles(self):
1045    target_files_dir = common.MakeTempDir()
1046    os.makedirs(os.path.join(target_files_dir, 'SYSTEM'))
1047    os.makedirs(os.path.join(target_files_dir, 'SYSTEM', 'vendor'))
1048    os.makedirs(os.path.join(target_files_dir, 'PRODUCT'))
1049    os.makedirs(os.path.join(target_files_dir, 'SYSTEM', 'product'))
1050    os.makedirs(os.path.join(target_files_dir, 'SYSTEM', 'vendor', 'odm'))
1051    os.makedirs(os.path.join(target_files_dir, 'VENDOR_DLKM'))
1052    partition_map = common.PartitionMapFromTargetFiles(target_files_dir)
1053    self.assertDictEqual(
1054        partition_map,
1055        {
1056            'system': 'SYSTEM',
1057            'vendor': 'SYSTEM/vendor',
1058            # Prefer PRODUCT over SYSTEM/product
1059            'product': 'PRODUCT',
1060            'odm': 'SYSTEM/vendor/odm',
1061            'vendor_dlkm': 'VENDOR_DLKM',
1062            # No system_ext or odm_dlkm
1063        })
1064
1065  def test_SharedUidPartitionViolations(self):
1066    uid_dict = {
1067        'android.uid.phone': {
1068            'system': ['system_phone.apk'],
1069            'system_ext': ['system_ext_phone.apk'],
1070        },
1071        'android.uid.wifi': {
1072            'vendor': ['vendor_wifi.apk'],
1073            'odm': ['odm_wifi.apk'],
1074        },
1075    }
1076    errors = common.SharedUidPartitionViolations(
1077        uid_dict, [('system', 'system_ext'), ('vendor', 'odm')])
1078    self.assertEqual(errors, [])
1079
1080  def test_SharedUidPartitionViolations_Violation(self):
1081    uid_dict = {
1082        'android.uid.phone': {
1083            'system': ['system_phone.apk'],
1084            'vendor': ['vendor_phone.apk'],
1085        },
1086    }
1087    errors = common.SharedUidPartitionViolations(
1088        uid_dict, [('system', 'system_ext'), ('vendor', 'odm')])
1089    self.assertIn(
1090        ('APK sharedUserId "android.uid.phone" found across partition groups '
1091         'in partitions "system,vendor"'), errors)
1092
1093  def test_GetSparseImage_missingImageFile(self):
1094    self.assertRaises(
1095        AssertionError, common.GetSparseImage, 'system2', self.testdata_dir,
1096        None, False)
1097    self.assertRaises(
1098        AssertionError, common.GetSparseImage, 'unknown', self.testdata_dir,
1099        None, False)
1100
1101  @test_utils.SkipIfExternalToolsUnavailable()
1102  def test_GetSparseImage_missingBlockMapFile(self):
1103    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1104    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1105      target_files_zip.write(
1106          test_utils.construct_sparse_image([
1107              (0xCAC1, 6),
1108              (0xCAC3, 3),
1109              (0xCAC1, 4)]),
1110          arcname='IMAGES/system.img')
1111      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
1112      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1113
1114    tempdir = common.UnzipTemp(target_files)
1115    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1116      self.assertRaises(
1117          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
1118          False)
1119
1120  @test_utils.SkipIfExternalToolsUnavailable()
1121  def test_GetSparseImage_sharedBlocks_notAllowed(self):
1122    """Tests the case of having overlapping blocks but disallowed."""
1123    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1124    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1125      target_files_zip.write(
1126          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1127          arcname='IMAGES/system.img')
1128      # Block 10 is shared between two files.
1129      target_files_zip.writestr(
1130          'IMAGES/system.map',
1131          '\n'.join([
1132              '/system/file1 1-5 9-10',
1133              '/system/file2 10-12']))
1134      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1135      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1136
1137    tempdir = common.UnzipTemp(target_files)
1138    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1139      self.assertRaises(
1140          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
1141          False)
1142
1143  @test_utils.SkipIfExternalToolsUnavailable()
1144  def test_GetSparseImage_sharedBlocks_allowed(self):
1145    """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true."""
1146    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1147    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1148      # Construct an image with a care_map of "0-5 9-12".
1149      target_files_zip.write(
1150          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1151          arcname='IMAGES/system.img')
1152      # Block 10 is shared between two files.
1153      target_files_zip.writestr(
1154          'IMAGES/system.map',
1155          '\n'.join([
1156              '/system/file1 1-5 9-10',
1157              '/system/file2 10-12']))
1158      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1159      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1160
1161    tempdir = common.UnzipTemp(target_files)
1162    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1163      sparse_image = common.GetSparseImage('system', tempdir, input_zip, True)
1164
1165    self.assertDictEqual(
1166        {
1167            '__COPY': RangeSet("0"),
1168            '__NONZERO-0': RangeSet("6-8 13-15"),
1169            '/system/file1': RangeSet("1-5 9-10"),
1170            '/system/file2': RangeSet("11-12"),
1171        },
1172        sparse_image.file_map)
1173
1174    # '/system/file2' should be marked with 'uses_shared_blocks', but not with
1175    # 'incomplete'.
1176    self.assertTrue(
1177        sparse_image.file_map['/system/file2'].extra['uses_shared_blocks'])
1178    self.assertNotIn(
1179        'incomplete', sparse_image.file_map['/system/file2'].extra)
1180
1181    # '/system/file1' will only contain one field -- a copy of the input text.
1182    self.assertEqual(1, len(sparse_image.file_map['/system/file1'].extra))
1183
1184    # Meta entries should not have any extra tag.
1185    self.assertFalse(sparse_image.file_map['__COPY'].extra)
1186    self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra)
1187
1188  @test_utils.SkipIfExternalToolsUnavailable()
1189  def test_GetSparseImage_incompleteRanges(self):
1190    """Tests the case of ext4 images with holes."""
1191    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1192    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1193      target_files_zip.write(
1194          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1195          arcname='IMAGES/system.img')
1196      target_files_zip.writestr(
1197          'IMAGES/system.map',
1198          '\n'.join([
1199              '/system/file1 1-5 9-10',
1200              '/system/file2 11-12']))
1201      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1202      # '/system/file2' has less blocks listed (2) than actual (3).
1203      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1204
1205    tempdir = common.UnzipTemp(target_files)
1206    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1207      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1208
1209    self.assertEqual(
1210        '1-5 9-10',
1211        sparse_image.file_map['/system/file1'].extra['text_str'])
1212    self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete'])
1213
1214  @test_utils.SkipIfExternalToolsUnavailable()
1215  def test_GetSparseImage_systemRootImage_filenameWithExtraLeadingSlash(self):
1216    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1217    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1218      target_files_zip.write(
1219          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1220          arcname='IMAGES/system.img')
1221      target_files_zip.writestr(
1222          'IMAGES/system.map',
1223          '\n'.join([
1224              '//system/file1 1-5 9-10',
1225              '//system/file2 11-12',
1226              '/system/app/file3 13-15']))
1227      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1228      # '/system/file2' has less blocks listed (2) than actual (3).
1229      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1230      # '/system/app/file3' has less blocks listed (3) than actual (4).
1231      target_files_zip.writestr('SYSTEM/app/file3', os.urandom(4096 * 4))
1232
1233    tempdir = common.UnzipTemp(target_files)
1234    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1235      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1236
1237    self.assertEqual(
1238        '1-5 9-10',
1239        sparse_image.file_map['//system/file1'].extra['text_str'])
1240    self.assertTrue(sparse_image.file_map['//system/file2'].extra['incomplete'])
1241    self.assertTrue(
1242        sparse_image.file_map['/system/app/file3'].extra['incomplete'])
1243
1244  @test_utils.SkipIfExternalToolsUnavailable()
1245  def test_GetSparseImage_systemRootImage_nonSystemFiles(self):
1246    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1247    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1248      target_files_zip.write(
1249          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1250          arcname='IMAGES/system.img')
1251      target_files_zip.writestr(
1252          'IMAGES/system.map',
1253          '\n'.join([
1254              '//system/file1 1-5 9-10',
1255              '//init.rc 13-15']))
1256      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1257      # '/init.rc' has less blocks listed (3) than actual (4).
1258      target_files_zip.writestr('ROOT/init.rc', os.urandom(4096 * 4))
1259
1260    tempdir = common.UnzipTemp(target_files)
1261    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1262      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1263
1264    self.assertEqual(
1265        '1-5 9-10',
1266        sparse_image.file_map['//system/file1'].extra['text_str'])
1267    self.assertTrue(sparse_image.file_map['//init.rc'].extra['incomplete'])
1268
1269  @test_utils.SkipIfExternalToolsUnavailable()
1270  def test_GetSparseImage_fileNotFound(self):
1271    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1272    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1273      target_files_zip.write(
1274          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1275          arcname='IMAGES/system.img')
1276      target_files_zip.writestr(
1277          'IMAGES/system.map',
1278          '\n'.join([
1279              '//system/file1 1-5 9-10',
1280              '//system/file2 11-12']))
1281      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1282
1283    tempdir = common.UnzipTemp(target_files)
1284    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1285      self.assertRaises(
1286          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
1287          False)
1288
1289  @test_utils.SkipIfExternalToolsUnavailable()
1290  def test_GetAvbChainedPartitionArg(self):
1291    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
1292    info_dict = {
1293        'avb_avbtool': 'avbtool',
1294        'avb_system_key_path': pubkey,
1295        'avb_system_rollback_index_location': 2,
1296    }
1297    args = common.GetAvbChainedPartitionArg('system', info_dict).split(':')
1298    self.assertEqual(3, len(args))
1299    self.assertEqual('system', args[0])
1300    self.assertEqual('2', args[1])
1301    self.assertTrue(os.path.exists(args[2]))
1302
1303  @test_utils.SkipIfExternalToolsUnavailable()
1304  def test_GetAvbChainedPartitionArg_withPrivateKey(self):
1305    key = os.path.join(self.testdata_dir, 'testkey.key')
1306    info_dict = {
1307        'avb_avbtool': 'avbtool',
1308        'avb_product_key_path': key,
1309        'avb_product_rollback_index_location': 2,
1310    }
1311    args = common.GetAvbChainedPartitionArg('product', info_dict).split(':')
1312    self.assertEqual(3, len(args))
1313    self.assertEqual('product', args[0])
1314    self.assertEqual('2', args[1])
1315    self.assertTrue(os.path.exists(args[2]))
1316
1317  @test_utils.SkipIfExternalToolsUnavailable()
1318  def test_GetAvbChainedPartitionArg_withSpecifiedKey(self):
1319    info_dict = {
1320        'avb_avbtool': 'avbtool',
1321        'avb_system_key_path': 'does-not-exist',
1322        'avb_system_rollback_index_location': 2,
1323    }
1324    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
1325    args = common.GetAvbChainedPartitionArg(
1326        'system', info_dict, pubkey).split(':')
1327    self.assertEqual(3, len(args))
1328    self.assertEqual('system', args[0])
1329    self.assertEqual('2', args[1])
1330    self.assertTrue(os.path.exists(args[2]))
1331
1332  @test_utils.SkipIfExternalToolsUnavailable()
1333  def test_GetAvbChainedPartitionArg_invalidKey(self):
1334    pubkey = os.path.join(self.testdata_dir, 'testkey_with_passwd.x509.pem')
1335    info_dict = {
1336        'avb_avbtool': 'avbtool',
1337        'avb_system_key_path': pubkey,
1338        'avb_system_rollback_index_location': 2,
1339    }
1340    self.assertRaises(
1341        common.ExternalError, common.GetAvbChainedPartitionArg, 'system',
1342        info_dict)
1343
1344  INFO_DICT_DEFAULT = {
1345      'recovery_api_version': 3,
1346      'fstab_version': 2,
1347      'system_root_image': 'true',
1348      'no_recovery' : 'true',
1349      'recovery_as_boot': 'true',
1350  }
1351
1352  def test_LoadListFromFile(self):
1353    file_path = os.path.join(self.testdata_dir,
1354                             'merge_config_framework_item_list')
1355    contents = common.LoadListFromFile(file_path)
1356    expected_contents = [
1357        'META/apkcerts.txt',
1358        'META/filesystem_config.txt',
1359        'META/root_filesystem_config.txt',
1360        'META/system_manifest.xml',
1361        'META/system_matrix.xml',
1362        'META/update_engine_config.txt',
1363        'PRODUCT/*',
1364        'ROOT/*',
1365        'SYSTEM/*',
1366    ]
1367    self.assertEqual(sorted(contents), sorted(expected_contents))
1368
1369  @staticmethod
1370  def _test_LoadInfoDict_createTargetFiles(info_dict, fstab_path):
1371    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1372    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1373      info_values = ''.join(
1374          ['{}={}\n'.format(k, v) for k, v in sorted(info_dict.items())])
1375      common.ZipWriteStr(target_files_zip, 'META/misc_info.txt', info_values)
1376
1377      FSTAB_TEMPLATE = "/dev/block/system {} ext4 ro,barrier=1 defaults"
1378      if info_dict.get('system_root_image') == 'true':
1379        fstab_values = FSTAB_TEMPLATE.format('/')
1380      else:
1381        fstab_values = FSTAB_TEMPLATE.format('/system')
1382      common.ZipWriteStr(target_files_zip, fstab_path, fstab_values)
1383
1384      common.ZipWriteStr(
1385          target_files_zip, 'META/file_contexts', 'file-contexts')
1386    return target_files
1387
1388  def test_LoadInfoDict(self):
1389    target_files = self._test_LoadInfoDict_createTargetFiles(
1390        self.INFO_DICT_DEFAULT,
1391        'BOOT/RAMDISK/system/etc/recovery.fstab')
1392    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1393      loaded_dict = common.LoadInfoDict(target_files_zip)
1394      self.assertEqual(3, loaded_dict['recovery_api_version'])
1395      self.assertEqual(2, loaded_dict['fstab_version'])
1396      self.assertIn('/', loaded_dict['fstab'])
1397      self.assertIn('/system', loaded_dict['fstab'])
1398
1399  def test_LoadInfoDict_legacyRecoveryFstabPath(self):
1400    target_files = self._test_LoadInfoDict_createTargetFiles(
1401        self.INFO_DICT_DEFAULT,
1402        'BOOT/RAMDISK/etc/recovery.fstab')
1403    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1404      loaded_dict = common.LoadInfoDict(target_files_zip)
1405      self.assertEqual(3, loaded_dict['recovery_api_version'])
1406      self.assertEqual(2, loaded_dict['fstab_version'])
1407      self.assertIn('/', loaded_dict['fstab'])
1408      self.assertIn('/system', loaded_dict['fstab'])
1409
1410  @test_utils.SkipIfExternalToolsUnavailable()
1411  def test_LoadInfoDict_dirInput(self):
1412    target_files = self._test_LoadInfoDict_createTargetFiles(
1413        self.INFO_DICT_DEFAULT,
1414        'BOOT/RAMDISK/system/etc/recovery.fstab')
1415    unzipped = common.UnzipTemp(target_files)
1416    loaded_dict = common.LoadInfoDict(unzipped)
1417    self.assertEqual(3, loaded_dict['recovery_api_version'])
1418    self.assertEqual(2, loaded_dict['fstab_version'])
1419    self.assertIn('/', loaded_dict['fstab'])
1420    self.assertIn('/system', loaded_dict['fstab'])
1421
1422  @test_utils.SkipIfExternalToolsUnavailable()
1423  def test_LoadInfoDict_dirInput_legacyRecoveryFstabPath(self):
1424    target_files = self._test_LoadInfoDict_createTargetFiles(
1425        self.INFO_DICT_DEFAULT,
1426        'BOOT/RAMDISK/system/etc/recovery.fstab')
1427    unzipped = common.UnzipTemp(target_files)
1428    loaded_dict = common.LoadInfoDict(unzipped)
1429    self.assertEqual(3, loaded_dict['recovery_api_version'])
1430    self.assertEqual(2, loaded_dict['fstab_version'])
1431    self.assertIn('/', loaded_dict['fstab'])
1432    self.assertIn('/system', loaded_dict['fstab'])
1433
1434  def test_LoadInfoDict_systemRootImageFalse(self):
1435    # Devices not using system-as-root nor recovery-as-boot. Non-A/B devices
1436    # launched prior to P will likely have this config.
1437    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
1438    del info_dict['no_recovery']
1439    del info_dict['system_root_image']
1440    del info_dict['recovery_as_boot']
1441    target_files = self._test_LoadInfoDict_createTargetFiles(
1442        info_dict,
1443        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
1444    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1445      loaded_dict = common.LoadInfoDict(target_files_zip)
1446      self.assertEqual(3, loaded_dict['recovery_api_version'])
1447      self.assertEqual(2, loaded_dict['fstab_version'])
1448      self.assertNotIn('/', loaded_dict['fstab'])
1449      self.assertIn('/system', loaded_dict['fstab'])
1450
1451  def test_LoadInfoDict_recoveryAsBootFalse(self):
1452    # Devices using system-as-root, but with standalone recovery image. Non-A/B
1453    # devices launched since P will likely have this config.
1454    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
1455    del info_dict['no_recovery']
1456    del info_dict['recovery_as_boot']
1457    target_files = self._test_LoadInfoDict_createTargetFiles(
1458        info_dict,
1459        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
1460    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1461      loaded_dict = common.LoadInfoDict(target_files_zip)
1462      self.assertEqual(3, loaded_dict['recovery_api_version'])
1463      self.assertEqual(2, loaded_dict['fstab_version'])
1464      self.assertIn('/', loaded_dict['fstab'])
1465      self.assertIn('/system', loaded_dict['fstab'])
1466
1467  def test_LoadInfoDict_noRecoveryTrue(self):
1468    # Device doesn't have a recovery partition at all.
1469    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
1470    del info_dict['recovery_as_boot']
1471    target_files = self._test_LoadInfoDict_createTargetFiles(
1472        info_dict,
1473        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
1474    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1475      loaded_dict = common.LoadInfoDict(target_files_zip)
1476      self.assertEqual(3, loaded_dict['recovery_api_version'])
1477      self.assertEqual(2, loaded_dict['fstab_version'])
1478      self.assertIsNone(loaded_dict['fstab'])
1479
1480  @test_utils.SkipIfExternalToolsUnavailable()
1481  def test_LoadInfoDict_missingMetaMiscInfoTxt(self):
1482    target_files = self._test_LoadInfoDict_createTargetFiles(
1483        self.INFO_DICT_DEFAULT,
1484        'BOOT/RAMDISK/system/etc/recovery.fstab')
1485    common.ZipDelete(target_files, 'META/misc_info.txt')
1486    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1487      self.assertRaises(ValueError, common.LoadInfoDict, target_files_zip)
1488
1489  @test_utils.SkipIfExternalToolsUnavailable()
1490  def test_LoadInfoDict_repacking(self):
1491    target_files = self._test_LoadInfoDict_createTargetFiles(
1492        self.INFO_DICT_DEFAULT,
1493        'BOOT/RAMDISK/system/etc/recovery.fstab')
1494    unzipped = common.UnzipTemp(target_files)
1495    loaded_dict = common.LoadInfoDict(unzipped, True)
1496    self.assertEqual(3, loaded_dict['recovery_api_version'])
1497    self.assertEqual(2, loaded_dict['fstab_version'])
1498    self.assertIn('/', loaded_dict['fstab'])
1499    self.assertIn('/system', loaded_dict['fstab'])
1500    self.assertEqual(
1501        os.path.join(unzipped, 'ROOT'), loaded_dict['root_dir'])
1502    self.assertEqual(
1503        os.path.join(unzipped, 'META', 'root_filesystem_config.txt'),
1504        loaded_dict['root_fs_config'])
1505
1506  def test_LoadInfoDict_repackingWithZipFileInput(self):
1507    target_files = self._test_LoadInfoDict_createTargetFiles(
1508        self.INFO_DICT_DEFAULT,
1509        'BOOT/RAMDISK/system/etc/recovery.fstab')
1510    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1511      self.assertRaises(
1512          AssertionError, common.LoadInfoDict, target_files_zip, True)
1513
1514  def test_MergeDynamicPartitionInfoDicts_ReturnsMergedDict(self):
1515    framework_dict = {
1516        'use_dynamic_partitions': 'true',
1517        'super_partition_groups': 'group_a',
1518        'dynamic_partition_list': 'system',
1519        'super_group_a_partition_list': 'system',
1520    }
1521    vendor_dict = {
1522        'use_dynamic_partitions': 'true',
1523        'super_partition_groups': 'group_a group_b',
1524        'dynamic_partition_list': 'vendor product',
1525        'super_block_devices': 'super',
1526        'super_super_device_size': '3000',
1527        'super_group_a_partition_list': 'vendor',
1528        'super_group_a_group_size': '1000',
1529        'super_group_b_partition_list': 'product',
1530        'super_group_b_group_size': '2000',
1531    }
1532    merged_dict = common.MergeDynamicPartitionInfoDicts(
1533        framework_dict=framework_dict,
1534        vendor_dict=vendor_dict)
1535    expected_merged_dict = {
1536        'use_dynamic_partitions': 'true',
1537        'super_partition_groups': 'group_a group_b',
1538        'dynamic_partition_list': 'product system vendor',
1539        'super_block_devices': 'super',
1540        'super_super_device_size': '3000',
1541        'super_group_a_partition_list': 'system vendor',
1542        'super_group_a_group_size': '1000',
1543        'super_group_b_partition_list': 'product',
1544        'super_group_b_group_size': '2000',
1545    }
1546    self.assertEqual(merged_dict, expected_merged_dict)
1547
1548  def test_MergeDynamicPartitionInfoDicts_IgnoringFrameworkGroupSize(self):
1549    framework_dict = {
1550        'use_dynamic_partitions': 'true',
1551        'super_partition_groups': 'group_a',
1552        'dynamic_partition_list': 'system',
1553        'super_group_a_partition_list': 'system',
1554        'super_group_a_group_size': '5000',
1555    }
1556    vendor_dict = {
1557        'use_dynamic_partitions': 'true',
1558        'super_partition_groups': 'group_a group_b',
1559        'dynamic_partition_list': 'vendor product',
1560        'super_group_a_partition_list': 'vendor',
1561        'super_group_a_group_size': '1000',
1562        'super_group_b_partition_list': 'product',
1563        'super_group_b_group_size': '2000',
1564    }
1565    merged_dict = common.MergeDynamicPartitionInfoDicts(
1566        framework_dict=framework_dict,
1567        vendor_dict=vendor_dict)
1568    expected_merged_dict = {
1569        'use_dynamic_partitions': 'true',
1570        'super_partition_groups': 'group_a group_b',
1571        'dynamic_partition_list': 'product system vendor',
1572        'super_group_a_partition_list': 'system vendor',
1573        'super_group_a_group_size': '1000',
1574        'super_group_b_partition_list': 'product',
1575        'super_group_b_group_size': '2000',
1576    }
1577    self.assertEqual(merged_dict, expected_merged_dict)
1578
1579  def test_GetAvbPartitionArg(self):
1580    info_dict = {}
1581    cmd = common.GetAvbPartitionArg('system', '/path/to/system.img', info_dict)
1582    self.assertEqual(
1583        ['--include_descriptors_from_image', '/path/to/system.img'], cmd)
1584
1585  @test_utils.SkipIfExternalToolsUnavailable()
1586  def test_AppendVBMetaArgsForPartition_vendorAsChainedPartition(self):
1587    testdata_dir = test_utils.get_testdata_dir()
1588    pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
1589    info_dict = {
1590        'avb_avbtool': 'avbtool',
1591        'avb_vendor_key_path': pubkey,
1592        'avb_vendor_rollback_index_location': 5,
1593    }
1594    cmd = common.GetAvbPartitionArg('vendor', '/path/to/vendor.img', info_dict)
1595    self.assertEqual(2, len(cmd))
1596    self.assertEqual('--chain_partition', cmd[0])
1597    chained_partition_args = cmd[1].split(':')
1598    self.assertEqual(3, len(chained_partition_args))
1599    self.assertEqual('vendor', chained_partition_args[0])
1600    self.assertEqual('5', chained_partition_args[1])
1601    self.assertTrue(os.path.exists(chained_partition_args[2]))
1602
1603  @test_utils.SkipIfExternalToolsUnavailable()
1604  def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_nonAb(self):
1605    testdata_dir = test_utils.get_testdata_dir()
1606    pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
1607    info_dict = {
1608        'avb_avbtool': 'avbtool',
1609        'avb_recovery_key_path': pubkey,
1610        'avb_recovery_rollback_index_location': 3,
1611    }
1612    cmd = common.GetAvbPartitionArg(
1613        'recovery', '/path/to/recovery.img', info_dict)
1614    self.assertFalse(cmd)
1615
1616  @test_utils.SkipIfExternalToolsUnavailable()
1617  def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_ab(self):
1618    testdata_dir = test_utils.get_testdata_dir()
1619    pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
1620    info_dict = {
1621        'ab_update': 'true',
1622        'avb_avbtool': 'avbtool',
1623        'avb_recovery_key_path': pubkey,
1624        'avb_recovery_rollback_index_location': 3,
1625    }
1626    cmd = common.GetAvbPartitionArg(
1627        'recovery', '/path/to/recovery.img', info_dict)
1628    self.assertEqual(2, len(cmd))
1629    self.assertEqual('--chain_partition', cmd[0])
1630    chained_partition_args = cmd[1].split(':')
1631    self.assertEqual(3, len(chained_partition_args))
1632    self.assertEqual('recovery', chained_partition_args[0])
1633    self.assertEqual('3', chained_partition_args[1])
1634    self.assertTrue(os.path.exists(chained_partition_args[2]))
1635
1636  def test_GenerateGkiCertificate_KeyPathNotFound(self):
1637    pubkey = os.path.join(self.testdata_dir, 'no_testkey_gki.pem')
1638    self.assertFalse(os.path.exists(pubkey))
1639
1640    common.OPTIONS.info_dict = {
1641        'gki_signing_key_path': pubkey,
1642        'gki_signing_algorithm': 'SHA256_RSA4096',
1643        'gki_signing_signature_args': '--prop foo:bar',
1644    }
1645    test_file = tempfile.NamedTemporaryFile()
1646    self.assertRaises(common.ExternalError, common._GenerateGkiCertificate,
1647                      test_file.name, 'generic_kernel')
1648
1649  def test_GenerateGkiCertificate_SearchKeyPathNotFound(self):
1650    pubkey = 'no_testkey_gki.pem'
1651    self.assertFalse(os.path.exists(pubkey))
1652
1653    # Tests it should raise ExternalError if no key found under
1654    # OPTIONS.search_path.
1655    search_path_dir = common.MakeTempDir()
1656    search_pubkey = os.path.join(search_path_dir, pubkey)
1657    self.assertFalse(os.path.exists(search_pubkey))
1658
1659    common.OPTIONS.search_path = search_path_dir
1660    common.OPTIONS.info_dict = {
1661        'gki_signing_key_path': pubkey,
1662        'gki_signing_algorithm': 'SHA256_RSA4096',
1663        'gki_signing_signature_args': '--prop foo:bar',
1664    }
1665    test_file = tempfile.NamedTemporaryFile()
1666    self.assertRaises(common.ExternalError, common._GenerateGkiCertificate,
1667                      test_file.name, 'generic_kernel')
1668
1669class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase):
1670  """Checks the format of install-recovery.sh.
1671
1672  Its format should match between common.py and validate_target_files.py.
1673  """
1674
1675  def setUp(self):
1676    self._tempdir = common.MakeTempDir()
1677    # Create a fake dict that contains the fstab info for boot&recovery.
1678    self._info = {"fstab" : {}}
1679    fake_fstab = [
1680        "/dev/soc.0/by-name/boot /boot emmc defaults defaults",
1681        "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
1682    self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, fake_fstab)
1683    # Construct the gzipped recovery.img and boot.img
1684    self.recovery_data = bytearray([
1685        0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
1686        0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
1687        0x08, 0x00, 0x00, 0x00
1688    ])
1689    # echo -n "boot" | gzip -f | hd
1690    self.boot_data = bytearray([
1691        0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
1692        0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
1693    ])
1694
1695  def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
1696    loc = os.path.join(self._tempdir, prefix, name)
1697    if not os.path.exists(os.path.dirname(loc)):
1698      os.makedirs(os.path.dirname(loc))
1699    with open(loc, "wb") as f:
1700      f.write(data)
1701
1702  def test_full_recovery(self):
1703    recovery_image = common.File("recovery.img", self.recovery_data)
1704    boot_image = common.File("boot.img", self.boot_data)
1705    self._info["full_recovery_image"] = "true"
1706
1707    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1708                             recovery_image, boot_image, self._info)
1709    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1710                                                        self._info)
1711
1712  @test_utils.SkipIfExternalToolsUnavailable()
1713  def test_recovery_from_boot(self):
1714    recovery_image = common.File("recovery.img", self.recovery_data)
1715    self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
1716    boot_image = common.File("boot.img", self.boot_data)
1717    self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
1718
1719    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1720                             recovery_image, boot_image, self._info)
1721    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1722                                                        self._info)
1723    # Validate 'recovery-from-boot' with bonus argument.
1724    self._out_tmp_sink("etc/recovery-resource.dat", b"bonus", "SYSTEM")
1725    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1726                             recovery_image, boot_image, self._info)
1727    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1728                                                        self._info)
1729
1730
1731class MockBlockDifference(object):
1732
1733  def __init__(self, partition, tgt, src=None):
1734    self.partition = partition
1735    self.tgt = tgt
1736    self.src = src
1737
1738  def WriteScript(self, script, _, progress=None,
1739                  write_verify_script=False):
1740    if progress:
1741      script.AppendExtra("progress({})".format(progress))
1742    script.AppendExtra("patch({});".format(self.partition))
1743    if write_verify_script:
1744      self.WritePostInstallVerifyScript(script)
1745
1746  def WritePostInstallVerifyScript(self, script):
1747    script.AppendExtra("verify({});".format(self.partition))
1748
1749
1750class FakeSparseImage(object):
1751
1752  def __init__(self, size):
1753    self.blocksize = 4096
1754    self.total_blocks = size // 4096
1755    assert size % 4096 == 0, "{} is not a multiple of 4096".format(size)
1756
1757
1758class DynamicPartitionsDifferenceTest(test_utils.ReleaseToolsTestCase):
1759
1760  @staticmethod
1761  def get_op_list(output_path):
1762    with zipfile.ZipFile(output_path, allowZip64=True) as output_zip:
1763      with output_zip.open('dynamic_partitions_op_list') as op_list:
1764        return [line.decode().strip() for line in op_list.readlines()
1765                if not line.startswith(b'#')]
1766
1767  def setUp(self):
1768    self.script = test_utils.MockScriptWriter()
1769    self.output_path = common.MakeTempFile(suffix='.zip')
1770
1771  def test_full(self):
1772    target_info = common.LoadDictionaryFromLines("""
1773dynamic_partition_list=system vendor
1774super_partition_groups=group_foo
1775super_group_foo_group_size={group_size}
1776super_group_foo_partition_list=system vendor
1777""".format(group_size=4 * GiB).split("\n"))
1778    block_diffs = [MockBlockDifference("system", FakeSparseImage(3 * GiB)),
1779                   MockBlockDifference("vendor", FakeSparseImage(1 * GiB))]
1780
1781    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs)
1782    with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
1783      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1784
1785    self.assertEqual(str(self.script).strip(), """
1786assert(update_dynamic_partitions(package_extract_file("dynamic_partitions_op_list")));
1787patch(system);
1788verify(system);
1789unmap_partition("system");
1790patch(vendor);
1791verify(vendor);
1792unmap_partition("vendor");
1793""".strip())
1794
1795    lines = self.get_op_list(self.output_path)
1796
1797    remove_all_groups = lines.index("remove_all_groups")
1798    add_group = lines.index("add_group group_foo 4294967296")
1799    add_vendor = lines.index("add vendor group_foo")
1800    add_system = lines.index("add system group_foo")
1801    resize_vendor = lines.index("resize vendor 1073741824")
1802    resize_system = lines.index("resize system 3221225472")
1803
1804    self.assertLess(remove_all_groups, add_group,
1805                    "Should add groups after removing all groups")
1806    self.assertLess(add_group, min(add_vendor, add_system),
1807                    "Should add partitions after adding group")
1808    self.assertLess(add_system, resize_system,
1809                    "Should resize system after adding it")
1810    self.assertLess(add_vendor, resize_vendor,
1811                    "Should resize vendor after adding it")
1812
1813  def test_inc_groups(self):
1814    source_info = common.LoadDictionaryFromLines("""
1815super_partition_groups=group_foo group_bar group_baz
1816super_group_foo_group_size={group_foo_size}
1817super_group_bar_group_size={group_bar_size}
1818""".format(group_foo_size=4 * GiB, group_bar_size=3 * GiB).split("\n"))
1819    target_info = common.LoadDictionaryFromLines("""
1820super_partition_groups=group_foo group_baz group_qux
1821super_group_foo_group_size={group_foo_size}
1822super_group_baz_group_size={group_baz_size}
1823super_group_qux_group_size={group_qux_size}
1824""".format(group_foo_size=3 * GiB, group_baz_size=4 * GiB,
1825           group_qux_size=1 * GiB).split("\n"))
1826
1827    dp_diff = common.DynamicPartitionsDifference(target_info,
1828                                                 block_diffs=[],
1829                                                 source_info_dict=source_info)
1830    with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
1831      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1832
1833    lines = self.get_op_list(self.output_path)
1834
1835    removed = lines.index("remove_group group_bar")
1836    shrunk = lines.index("resize_group group_foo 3221225472")
1837    grown = lines.index("resize_group group_baz 4294967296")
1838    added = lines.index("add_group group_qux 1073741824")
1839
1840    self.assertLess(max(removed, shrunk),
1841                    min(grown, added),
1842                    "ops that remove / shrink partitions must precede ops that "
1843                    "grow / add partitions")
1844
1845  def test_incremental(self):
1846    source_info = common.LoadDictionaryFromLines("""
1847dynamic_partition_list=system vendor product system_ext
1848super_partition_groups=group_foo
1849super_group_foo_group_size={group_foo_size}
1850super_group_foo_partition_list=system vendor product system_ext
1851""".format(group_foo_size=4 * GiB).split("\n"))
1852    target_info = common.LoadDictionaryFromLines("""
1853dynamic_partition_list=system vendor product odm
1854super_partition_groups=group_foo group_bar
1855super_group_foo_group_size={group_foo_size}
1856super_group_foo_partition_list=system vendor odm
1857super_group_bar_group_size={group_bar_size}
1858super_group_bar_partition_list=product
1859""".format(group_foo_size=3 * GiB, group_bar_size=1 * GiB).split("\n"))
1860
1861    block_diffs = [MockBlockDifference("system", FakeSparseImage(1536 * MiB),
1862                                       src=FakeSparseImage(1024 * MiB)),
1863                   MockBlockDifference("vendor", FakeSparseImage(512 * MiB),
1864                                       src=FakeSparseImage(1024 * MiB)),
1865                   MockBlockDifference("product", FakeSparseImage(1024 * MiB),
1866                                       src=FakeSparseImage(1024 * MiB)),
1867                   MockBlockDifference("system_ext", None,
1868                                       src=FakeSparseImage(1024 * MiB)),
1869                   MockBlockDifference("odm", FakeSparseImage(1024 * MiB),
1870                                       src=None)]
1871
1872    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
1873                                                 source_info_dict=source_info)
1874    with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
1875      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1876
1877    metadata_idx = self.script.lines.index(
1878        'assert(update_dynamic_partitions(package_extract_file('
1879        '"dynamic_partitions_op_list")));')
1880    self.assertLess(self.script.lines.index('patch(vendor);'), metadata_idx)
1881    self.assertLess(metadata_idx, self.script.lines.index('verify(vendor);'))
1882    for p in ("product", "system", "odm"):
1883      patch_idx = self.script.lines.index("patch({});".format(p))
1884      verify_idx = self.script.lines.index("verify({});".format(p))
1885      self.assertLess(metadata_idx, patch_idx,
1886                      "Should patch {} after updating metadata".format(p))
1887      self.assertLess(patch_idx, verify_idx,
1888                      "Should verify {} after patching".format(p))
1889
1890    self.assertNotIn("patch(system_ext);", self.script.lines)
1891
1892    lines = self.get_op_list(self.output_path)
1893
1894    remove = lines.index("remove system_ext")
1895    move_product_out = lines.index("move product default")
1896    shrink = lines.index("resize vendor 536870912")
1897    shrink_group = lines.index("resize_group group_foo 3221225472")
1898    add_group_bar = lines.index("add_group group_bar 1073741824")
1899    add_odm = lines.index("add odm group_foo")
1900    grow_existing = lines.index("resize system 1610612736")
1901    grow_added = lines.index("resize odm 1073741824")
1902    move_product_in = lines.index("move product group_bar")
1903
1904    max_idx_move_partition_out_foo = max(remove, move_product_out, shrink)
1905    min_idx_move_partition_in_foo = min(add_odm, grow_existing, grow_added)
1906
1907    self.assertLess(max_idx_move_partition_out_foo, shrink_group,
1908                    "Must shrink group after partitions inside group are shrunk"
1909                    " / removed")
1910
1911    self.assertLess(add_group_bar, move_product_in,
1912                    "Must add partitions to group after group is added")
1913
1914    self.assertLess(max_idx_move_partition_out_foo,
1915                    min_idx_move_partition_in_foo,
1916                    "Must shrink partitions / remove partitions from group"
1917                    "before adding / moving partitions into group")
1918
1919  def test_remove_partition(self):
1920    source_info = common.LoadDictionaryFromLines("""
1921blockimgdiff_versions=3,4
1922use_dynamic_partitions=true
1923dynamic_partition_list=foo
1924super_partition_groups=group_foo
1925super_group_foo_group_size={group_foo_size}
1926super_group_foo_partition_list=foo
1927""".format(group_foo_size=4 * GiB).split("\n"))
1928    target_info = common.LoadDictionaryFromLines("""
1929blockimgdiff_versions=3,4
1930use_dynamic_partitions=true
1931super_partition_groups=group_foo
1932super_group_foo_group_size={group_foo_size}
1933""".format(group_foo_size=4 * GiB).split("\n"))
1934
1935    common.OPTIONS.info_dict = target_info
1936    common.OPTIONS.target_info_dict = target_info
1937    common.OPTIONS.source_info_dict = source_info
1938    common.OPTIONS.cache_size = 4 * 4096
1939
1940    block_diffs = [common.BlockDifference("foo", EmptyImage(),
1941                                          src=DataImage("source", pad=True))]
1942
1943    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
1944                                                 source_info_dict=source_info)
1945    with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
1946      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1947
1948    self.assertNotIn("block_image_update", str(self.script),
1949                     "Removed partition should not be patched.")
1950
1951    lines = self.get_op_list(self.output_path)
1952    self.assertEqual(lines, ["remove foo"])
1953
1954
1955class PartitionBuildPropsTest(test_utils.ReleaseToolsTestCase):
1956  def setUp(self):
1957    self.odm_build_prop = [
1958        'ro.odm.build.date.utc=1578430045',
1959        'ro.odm.build.fingerprint='
1960        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1961        'ro.product.odm.device=coral',
1962        'import /odm/etc/build_${ro.boot.product.device_name}.prop',
1963    ]
1964
1965  @staticmethod
1966  def _BuildZipFile(entries):
1967    input_file = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1968    with zipfile.ZipFile(input_file, 'w', allowZip64=True) as input_zip:
1969      for name, content in entries.items():
1970        input_zip.writestr(name, content)
1971
1972    return input_file
1973
1974  def test_parseBuildProps_noImportStatement(self):
1975    build_prop = [
1976        'ro.odm.build.date.utc=1578430045',
1977        'ro.odm.build.fingerprint='
1978        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1979        'ro.product.odm.device=coral',
1980    ]
1981    input_file = self._BuildZipFile({
1982        'ODM/etc/build.prop': '\n'.join(build_prop),
1983    })
1984
1985    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
1986      placeholder_values = {
1987          'ro.boot.product.device_name': ['std', 'pro']
1988      }
1989      partition_props = common.PartitionBuildProps.FromInputFile(
1990          input_zip, 'odm', placeholder_values)
1991
1992    self.assertEqual({
1993        'ro.odm.build.date.utc': '1578430045',
1994        'ro.odm.build.fingerprint':
1995        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1996        'ro.product.odm.device': 'coral',
1997    }, partition_props.build_props)
1998
1999    self.assertEqual(set(), partition_props.prop_overrides)
2000
2001  def test_parseBuildProps_singleImportStatement(self):
2002    build_std_prop = [
2003        'ro.product.odm.device=coral',
2004        'ro.product.odm.name=product1',
2005    ]
2006    build_pro_prop = [
2007        'ro.product.odm.device=coralpro',
2008        'ro.product.odm.name=product2',
2009    ]
2010
2011    input_file = self._BuildZipFile({
2012        'ODM/etc/build.prop': '\n'.join(self.odm_build_prop),
2013        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2014        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2015    })
2016
2017    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2018      placeholder_values = {
2019          'ro.boot.product.device_name': 'std'
2020      }
2021      partition_props = common.PartitionBuildProps.FromInputFile(
2022          input_zip, 'odm', placeholder_values)
2023
2024    self.assertEqual({
2025      'ro.odm.build.date.utc': '1578430045',
2026      'ro.odm.build.fingerprint':
2027      'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2028      'ro.product.odm.device': 'coral',
2029      'ro.product.odm.name': 'product1',
2030    }, partition_props.build_props)
2031
2032    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2033      placeholder_values = {
2034          'ro.boot.product.device_name': 'pro'
2035      }
2036      partition_props = common.PartitionBuildProps.FromInputFile(
2037          input_zip, 'odm', placeholder_values)
2038
2039    self.assertEqual({
2040        'ro.odm.build.date.utc': '1578430045',
2041        'ro.odm.build.fingerprint':
2042        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2043        'ro.product.odm.device': 'coralpro',
2044        'ro.product.odm.name': 'product2',
2045    }, partition_props.build_props)
2046
2047  def test_parseBuildProps_noPlaceHolders(self):
2048    build_prop = copy.copy(self.odm_build_prop)
2049    input_file = self._BuildZipFile({
2050        'ODM/etc/build.prop': '\n'.join(build_prop),
2051    })
2052
2053    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2054      partition_props = common.PartitionBuildProps.FromInputFile(
2055          input_zip, 'odm')
2056
2057    self.assertEqual({
2058        'ro.odm.build.date.utc': '1578430045',
2059        'ro.odm.build.fingerprint':
2060        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2061        'ro.product.odm.device': 'coral',
2062    }, partition_props.build_props)
2063
2064    self.assertEqual(set(), partition_props.prop_overrides)
2065
2066  def test_parseBuildProps_multipleImportStatements(self):
2067    build_prop = copy.deepcopy(self.odm_build_prop)
2068    build_prop.append(
2069        'import /odm/etc/build_${ro.boot.product.product_name}.prop')
2070
2071    build_std_prop = [
2072        'ro.product.odm.device=coral',
2073    ]
2074    build_pro_prop = [
2075        'ro.product.odm.device=coralpro',
2076    ]
2077
2078    product1_prop = [
2079        'ro.product.odm.name=product1',
2080        'ro.product.not_care=not_care',
2081    ]
2082
2083    product2_prop = [
2084        'ro.product.odm.name=product2',
2085        'ro.product.not_care=not_care',
2086    ]
2087
2088    input_file = self._BuildZipFile({
2089        'ODM/etc/build.prop': '\n'.join(build_prop),
2090        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2091        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2092        'ODM/etc/build_product1.prop': '\n'.join(product1_prop),
2093        'ODM/etc/build_product2.prop': '\n'.join(product2_prop),
2094    })
2095
2096    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2097      placeholder_values = {
2098          'ro.boot.product.device_name': 'std',
2099          'ro.boot.product.product_name': 'product1',
2100          'ro.boot.product.not_care': 'not_care',
2101      }
2102      partition_props = common.PartitionBuildProps.FromInputFile(
2103          input_zip, 'odm', placeholder_values)
2104
2105    self.assertEqual({
2106        'ro.odm.build.date.utc': '1578430045',
2107        'ro.odm.build.fingerprint':
2108        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2109        'ro.product.odm.device': 'coral',
2110        'ro.product.odm.name': 'product1'
2111    }, partition_props.build_props)
2112
2113    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2114      placeholder_values = {
2115          'ro.boot.product.device_name': 'pro',
2116          'ro.boot.product.product_name': 'product2',
2117          'ro.boot.product.not_care': 'not_care',
2118      }
2119      partition_props = common.PartitionBuildProps.FromInputFile(
2120          input_zip, 'odm', placeholder_values)
2121
2122    self.assertEqual({
2123        'ro.odm.build.date.utc': '1578430045',
2124        'ro.odm.build.fingerprint':
2125        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2126        'ro.product.odm.device': 'coralpro',
2127        'ro.product.odm.name': 'product2'
2128    }, partition_props.build_props)
2129
2130  def test_parseBuildProps_defineAfterOverride(self):
2131    build_prop = copy.deepcopy(self.odm_build_prop)
2132    build_prop.append('ro.product.odm.device=coral')
2133
2134    build_std_prop = [
2135        'ro.product.odm.device=coral',
2136    ]
2137    build_pro_prop = [
2138        'ro.product.odm.device=coralpro',
2139    ]
2140
2141    input_file = self._BuildZipFile({
2142        'ODM/etc/build.prop': '\n'.join(build_prop),
2143        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2144        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2145    })
2146
2147    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2148      placeholder_values = {
2149          'ro.boot.product.device_name': 'std',
2150      }
2151
2152      self.assertRaises(ValueError, common.PartitionBuildProps.FromInputFile,
2153                        input_zip, 'odm', placeholder_values)
2154
2155  def test_parseBuildProps_duplicateOverride(self):
2156    build_prop = copy.deepcopy(self.odm_build_prop)
2157    build_prop.append(
2158        'import /odm/etc/build_${ro.boot.product.product_name}.prop')
2159
2160    build_std_prop = [
2161        'ro.product.odm.device=coral',
2162        'ro.product.odm.name=product1',
2163    ]
2164    build_pro_prop = [
2165        'ro.product.odm.device=coralpro',
2166    ]
2167
2168    product1_prop = [
2169        'ro.product.odm.name=product1',
2170    ]
2171
2172    product2_prop = [
2173        'ro.product.odm.name=product2',
2174    ]
2175
2176    input_file = self._BuildZipFile({
2177        'ODM/etc/build.prop': '\n'.join(build_prop),
2178        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2179        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2180        'ODM/etc/build_product1.prop': '\n'.join(product1_prop),
2181        'ODM/etc/build_product2.prop': '\n'.join(product2_prop),
2182    })
2183
2184    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2185      placeholder_values = {
2186          'ro.boot.product.device_name': 'std',
2187          'ro.boot.product.product_name': 'product1',
2188      }
2189      self.assertRaises(ValueError, common.PartitionBuildProps.FromInputFile,
2190                        input_zip, 'odm', placeholder_values)
2191
2192  def test_partitionBuildProps_fromInputFile_deepcopy(self):
2193    build_prop = [
2194        'ro.odm.build.date.utc=1578430045',
2195        'ro.odm.build.fingerprint='
2196        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2197        'ro.product.odm.device=coral',
2198    ]
2199    input_file = self._BuildZipFile({
2200        'ODM/etc/build.prop': '\n'.join(build_prop),
2201    })
2202
2203    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2204      placeholder_values = {
2205          'ro.boot.product.device_name': ['std', 'pro']
2206      }
2207      partition_props = common.PartitionBuildProps.FromInputFile(
2208          input_zip, 'odm', placeholder_values)
2209
2210    copied_props = copy.deepcopy(partition_props)
2211    self.assertEqual({
2212      'ro.odm.build.date.utc': '1578430045',
2213      'ro.odm.build.fingerprint':
2214      'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2215      'ro.product.odm.device': 'coral',
2216    }, copied_props.build_props)
2217