• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import copy
18import json
19import os
20import subprocess
21import tempfile
22import time
23import unittest
24import zipfile
25from hashlib import sha1
26
27import common
28import test_utils
29import validate_target_files
30from images import EmptyImage, DataImage
31from rangelib import RangeSet
32
33
34KiB = 1024
35MiB = 1024 * KiB
36GiB = 1024 * MiB
37
38
39def get_2gb_string():
40  size = int(2 * GiB + 1)
41  block_size = 4 * KiB
42  step_size = 4 * MiB
43  # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
44  for _ in range(0, size, step_size):
45    yield os.urandom(block_size)
46    yield b'\0' * (step_size - block_size)
47
48
49class BuildInfoTest(test_utils.ReleaseToolsTestCase):
50
51  TEST_INFO_FINGERPRINT_DICT = {
52      'build.prop': common.PartitionBuildProps.FromDictionary(
53          'system', {
54              'ro.product.brand': 'product-brand',
55              'ro.product.name': 'product-name',
56              'ro.product.device': 'product-device',
57              'ro.build.version.release': 'version-release',
58              'ro.build.id': 'build-id',
59              'ro.build.version.incremental': 'version-incremental',
60              'ro.build.type': 'build-type',
61              'ro.build.tags': 'build-tags',
62              'ro.build.version.sdk': 30,
63          }
64      ),
65  }
66
67  TEST_INFO_DICT = {
68      'build.prop': common.PartitionBuildProps.FromDictionary(
69          'system', {
70              'ro.product.device': 'product-device',
71              'ro.product.name': 'product-name',
72              'ro.build.fingerprint': 'build-fingerprint',
73              'ro.build.foo': 'build-foo'}
74      ),
75      'system.build.prop': common.PartitionBuildProps.FromDictionary(
76          'system', {
77              'ro.product.system.brand': 'product-brand',
78              'ro.product.system.name': 'product-name',
79              'ro.product.system.device': 'product-device',
80              'ro.system.build.version.release': 'version-release',
81              'ro.system.build.id': 'build-id',
82              'ro.system.build.version.incremental': 'version-incremental',
83              'ro.system.build.type': 'build-type',
84              'ro.system.build.tags': 'build-tags',
85              'ro.system.build.foo': 'build-foo'}
86      ),
87      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
88          'vendor', {
89              'ro.product.vendor.brand': 'vendor-product-brand',
90              'ro.product.vendor.name': 'vendor-product-name',
91              'ro.product.vendor.device': 'vendor-product-device',
92              'ro.vendor.build.version.release': 'vendor-version-release',
93              'ro.vendor.build.id': 'vendor-build-id',
94              'ro.vendor.build.version.incremental':
95              'vendor-version-incremental',
96              'ro.vendor.build.type': 'vendor-build-type',
97              'ro.vendor.build.tags': 'vendor-build-tags'}
98      ),
99      'property1': 'value1',
100      'property2': 4096,
101  }
102
103  TEST_INFO_DICT_USES_OEM_PROPS = {
104      'build.prop': common.PartitionBuildProps.FromDictionary(
105          'system', {
106              'ro.product.name': 'product-name',
107              'ro.build.thumbprint': 'build-thumbprint',
108              'ro.build.bar': 'build-bar'}
109      ),
110      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
111          'vendor', {
112              'ro.vendor.build.fingerprint': 'vendor-build-fingerprint'}
113      ),
114      'property1': 'value1',
115      'property2': 4096,
116      'oem_fingerprint_properties': 'ro.product.device ro.product.brand',
117  }
118
119  TEST_OEM_DICTS = [
120      {
121          'ro.product.brand': 'brand1',
122          'ro.product.device': 'device1',
123      },
124      {
125          'ro.product.brand': 'brand2',
126          'ro.product.device': 'device2',
127      },
128      {
129          'ro.product.brand': 'brand3',
130          'ro.product.device': 'device3',
131      },
132  ]
133
134  TEST_INFO_DICT_PROPERTY_SOURCE_ORDER = {
135      'build.prop': common.PartitionBuildProps.FromDictionary(
136          'system', {
137              'ro.build.fingerprint': 'build-fingerprint',
138              'ro.product.property_source_order':
139                  'product,odm,vendor,system_ext,system'}
140      ),
141      'system.build.prop': common.PartitionBuildProps.FromDictionary(
142          'system', {
143              'ro.product.system.device': 'system-product-device'}
144      ),
145      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
146          'vendor', {
147              'ro.product.vendor.device': 'vendor-product-device'}
148      ),
149  }
150
151  TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_10 = {
152      'build.prop': common.PartitionBuildProps.FromDictionary(
153          'system', {
154              'ro.build.fingerprint': 'build-fingerprint',
155              'ro.product.property_source_order':
156                  'product,product_services,odm,vendor,system',
157              'ro.build.version.release': '10',
158              'ro.build.version.codename': 'REL'}
159      ),
160      'system.build.prop': common.PartitionBuildProps.FromDictionary(
161          'system', {
162              'ro.product.system.device': 'system-product-device'}
163      ),
164      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
165          'vendor', {
166              'ro.product.vendor.device': 'vendor-product-device'}
167      ),
168  }
169
170  TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_9 = {
171      'build.prop': common.PartitionBuildProps.FromDictionary(
172          'system', {
173              'ro.product.device': 'product-device',
174              'ro.build.fingerprint': 'build-fingerprint',
175              'ro.build.version.release': '9',
176              'ro.build.version.codename': 'REL'}
177      ),
178      'system.build.prop': common.PartitionBuildProps.FromDictionary(
179          'system', {
180              'ro.product.system.device': 'system-product-device'}
181      ),
182      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
183          'vendor', {
184              'ro.product.vendor.device': 'vendor-product-device'}
185      ),
186  }
187
188  def test_init(self):
189    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
190    self.assertEqual('product-device', target_info.device)
191    self.assertEqual('build-fingerprint', target_info.fingerprint)
192    self.assertFalse(target_info.is_ab)
193    self.assertIsNone(target_info.oem_props)
194
195  def test_init_with_oem_props(self):
196    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
197                                   self.TEST_OEM_DICTS)
198    self.assertEqual('device1', target_info.device)
199    self.assertEqual('brand1/product-name/device1:build-thumbprint',
200                     target_info.fingerprint)
201
202    # Swap the order in oem_dicts, which would lead to different BuildInfo.
203    oem_dicts = copy.copy(self.TEST_OEM_DICTS)
204    oem_dicts[0], oem_dicts[2] = oem_dicts[2], oem_dicts[0]
205    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
206                                   oem_dicts)
207    self.assertEqual('device3', target_info.device)
208    self.assertEqual('brand3/product-name/device3:build-thumbprint',
209                     target_info.fingerprint)
210
211  def test_init_badFingerprint(self):
212    info_dict = copy.deepcopy(self.TEST_INFO_DICT)
213    info_dict['build.prop'].build_props[
214        'ro.build.fingerprint'] = 'bad fingerprint'
215    self.assertRaises(ValueError, common.BuildInfo, info_dict, None)
216
217    info_dict['build.prop'].build_props[
218        'ro.build.fingerprint'] = 'bad\x80fingerprint'
219    self.assertRaises(ValueError, common.BuildInfo, info_dict, None)
220
221  def test_init_goodFingerprint(self):
222    info_dict = copy.deepcopy(self.TEST_INFO_FINGERPRINT_DICT)
223    build_info = common.BuildInfo(info_dict)
224    self.assertEqual(
225      'product-brand/product-name/product-device:version-release/build-id/'
226      'version-incremental:build-type/build-tags', build_info.fingerprint)
227
228    build_props = info_dict['build.prop'].build_props
229    del build_props['ro.build.id']
230    build_props['ro.build.legacy.id'] = 'legacy-build-id'
231    build_info = common.BuildInfo(info_dict, use_legacy_id=True)
232    self.assertEqual(
233      'product-brand/product-name/product-device:version-release/'
234      'legacy-build-id/version-incremental:build-type/build-tags',
235      build_info.fingerprint)
236
237    self.assertRaises(common.ExternalError, common.BuildInfo, info_dict, None,
238                      False)
239
240    info_dict['avb_enable'] = 'true'
241    info_dict['vbmeta_digest'] = 'abcde12345'
242    build_info = common.BuildInfo(info_dict, use_legacy_id=False)
243    self.assertEqual(
244      'product-brand/product-name/product-device:version-release/'
245      'legacy-build-id.abcde123/version-incremental:build-type/build-tags',
246      build_info.fingerprint)
247
248  def test___getitem__(self):
249    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
250    self.assertEqual('value1', target_info['property1'])
251    self.assertEqual(4096, target_info['property2'])
252    self.assertEqual('build-foo',
253                     target_info['build.prop'].GetProp('ro.build.foo'))
254
255  def test___getitem__with_oem_props(self):
256    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
257                                   self.TEST_OEM_DICTS)
258    self.assertEqual('value1', target_info['property1'])
259    self.assertEqual(4096, target_info['property2'])
260    self.assertIsNone(target_info['build.prop'].GetProp('ro.build.foo'))
261
262  def test___setitem__(self):
263    target_info = common.BuildInfo(copy.deepcopy(self.TEST_INFO_DICT), None)
264    self.assertEqual('value1', target_info['property1'])
265    target_info['property1'] = 'value2'
266    self.assertEqual('value2', target_info['property1'])
267
268    self.assertEqual('build-foo',
269                     target_info['build.prop'].GetProp('ro.build.foo'))
270    target_info['build.prop'].build_props['ro.build.foo'] = 'build-bar'
271    self.assertEqual('build-bar',
272                     target_info['build.prop'].GetProp('ro.build.foo'))
273
274  def test_get(self):
275    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
276    self.assertEqual('value1', target_info.get('property1'))
277    self.assertEqual(4096, target_info.get('property2'))
278    self.assertEqual(4096, target_info.get('property2', 1024))
279    self.assertEqual(1024, target_info.get('property-nonexistent', 1024))
280    self.assertEqual('build-foo',
281                     target_info.get('build.prop').GetProp('ro.build.foo'))
282
283  def test_get_with_oem_props(self):
284    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
285                                   self.TEST_OEM_DICTS)
286    self.assertEqual('value1', target_info.get('property1'))
287    self.assertEqual(4096, target_info.get('property2'))
288    self.assertEqual(4096, target_info.get('property2', 1024))
289    self.assertEqual(1024, target_info.get('property-nonexistent', 1024))
290    self.assertIsNone(target_info.get('build.prop').GetProp('ro.build.foo'))
291
292  def test_items(self):
293    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
294    items = target_info.items()
295    self.assertIn(('property1', 'value1'), items)
296    self.assertIn(('property2', 4096), items)
297
298  def test_GetBuildProp(self):
299    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
300    self.assertEqual('build-foo', target_info.GetBuildProp('ro.build.foo'))
301    self.assertRaises(common.ExternalError, target_info.GetBuildProp,
302                      'ro.build.nonexistent')
303
304  def test_GetBuildProp_with_oem_props(self):
305    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
306                                   self.TEST_OEM_DICTS)
307    self.assertEqual('build-bar', target_info.GetBuildProp('ro.build.bar'))
308    self.assertRaises(common.ExternalError, target_info.GetBuildProp,
309                      'ro.build.nonexistent')
310
311  def test_GetPartitionFingerprint(self):
312    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
313    self.assertEqual(
314        target_info.GetPartitionFingerprint('vendor'),
315        'vendor-product-brand/vendor-product-name/vendor-product-device'
316        ':vendor-version-release/vendor-build-id/vendor-version-incremental'
317        ':vendor-build-type/vendor-build-tags')
318
319  def test_GetPartitionFingerprint_system_other_uses_system(self):
320    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
321    self.assertEqual(
322        target_info.GetPartitionFingerprint('system_other'),
323        target_info.GetPartitionFingerprint('system'))
324
325  def test_GetPartitionFingerprint_uses_fingerprint_prop_if_available(self):
326    info_dict = copy.deepcopy(self.TEST_INFO_DICT)
327    info_dict['vendor.build.prop'].build_props[
328        'ro.vendor.build.fingerprint'] = 'vendor:fingerprint'
329    target_info = common.BuildInfo(info_dict, None)
330    self.assertEqual(
331        target_info.GetPartitionFingerprint('vendor'),
332        'vendor:fingerprint')
333
334  def test_WriteMountOemScript(self):
335    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
336                                   self.TEST_OEM_DICTS)
337    script_writer = test_utils.MockScriptWriter()
338    target_info.WriteMountOemScript(script_writer)
339    self.assertEqual([('Mount', '/oem', None)], script_writer.lines)
340
341  def test_WriteDeviceAssertions(self):
342    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
343    script_writer = test_utils.MockScriptWriter()
344    target_info.WriteDeviceAssertions(script_writer, False)
345    self.assertEqual([('AssertDevice', 'product-device')], script_writer.lines)
346
347  def test_WriteDeviceAssertions_with_oem_props(self):
348    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
349                                   self.TEST_OEM_DICTS)
350    script_writer = test_utils.MockScriptWriter()
351    target_info.WriteDeviceAssertions(script_writer, False)
352    self.assertEqual(
353        [
354            ('AssertOemProperty', 'ro.product.device',
355             ['device1', 'device2', 'device3'], False),
356            ('AssertOemProperty', 'ro.product.brand',
357             ['brand1', 'brand2', 'brand3'], False),
358        ],
359        script_writer.lines)
360
361  def test_ResolveRoProductProperty_FromVendor(self):
362    info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER)
363    info = common.BuildInfo(info_dict, None)
364    self.assertEqual('vendor-product-device',
365                     info.GetBuildProp('ro.product.device'))
366
367  def test_ResolveRoProductProperty_FromSystem(self):
368    info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER)
369    del info_dict['vendor.build.prop'].build_props['ro.product.vendor.device']
370    info = common.BuildInfo(info_dict, None)
371    self.assertEqual('system-product-device',
372                     info.GetBuildProp('ro.product.device'))
373
374  def test_ResolveRoProductProperty_InvalidPropertySearchOrder(self):
375    info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER)
376    info_dict['build.prop'].build_props[
377        'ro.product.property_source_order'] = 'bad-source'
378    with self.assertRaisesRegexp(common.ExternalError,
379        'Invalid ro.product.property_source_order'):
380      info = common.BuildInfo(info_dict, None)
381      info.GetBuildProp('ro.product.device')
382
383  def test_ResolveRoProductProperty_Android10PropertySearchOrder(self):
384    info_dict = copy.deepcopy(
385        self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_10)
386    info = common.BuildInfo(info_dict, None)
387    self.assertEqual('vendor-product-device',
388                     info.GetBuildProp('ro.product.device'))
389
390  def test_ResolveRoProductProperty_Android9PropertySearchOrder(self):
391    info_dict = copy.deepcopy(
392        self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_9)
393    info = common.BuildInfo(info_dict, None)
394    self.assertEqual('product-device',
395                     info.GetBuildProp('ro.product.device'))
396
397
398class CommonZipTest(test_utils.ReleaseToolsTestCase):
399
400  def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
401              test_file_name=None, expected_stat=None, expected_mode=0o644,
402              expected_compress_type=zipfile.ZIP_STORED):
403    # Verify the stat if present.
404    if test_file_name is not None:
405      new_stat = os.stat(test_file_name)
406      self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
407      self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
408
409    # Reopen the zip file to verify.
410    zip_file = zipfile.ZipFile(zip_file_name, "r", allowZip64=True)
411
412    # Verify the timestamp.
413    info = zip_file.getinfo(arcname)
414    self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
415
416    # Verify the file mode.
417    mode = (info.external_attr >> 16) & 0o777
418    self.assertEqual(mode, expected_mode)
419
420    # Verify the compress type.
421    self.assertEqual(info.compress_type, expected_compress_type)
422
423    # Verify the zip contents.
424    entry = zip_file.open(arcname)
425    sha1_hash = sha1()
426    for chunk in iter(lambda: entry.read(4 * MiB), b''):
427      sha1_hash.update(chunk)
428    self.assertEqual(expected_hash, sha1_hash.hexdigest())
429    self.assertIsNone(zip_file.testzip())
430
431  def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
432    extra_zipwrite_args = dict(extra_zipwrite_args or {})
433
434    test_file = tempfile.NamedTemporaryFile(delete=False)
435    test_file_name = test_file.name
436
437    zip_file = tempfile.NamedTemporaryFile(delete=False)
438    zip_file_name = zip_file.name
439
440    # File names within an archive strip the leading slash.
441    arcname = extra_zipwrite_args.get("arcname", test_file_name)
442    if arcname[0] == "/":
443      arcname = arcname[1:]
444
445    zip_file.close()
446    zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True)
447
448    try:
449      sha1_hash = sha1()
450      for data in contents:
451        sha1_hash.update(bytes(data))
452        test_file.write(bytes(data))
453      test_file.close()
454
455      expected_stat = os.stat(test_file_name)
456      expected_mode = extra_zipwrite_args.get("perms", 0o644)
457      expected_compress_type = extra_zipwrite_args.get("compress_type",
458                                                       zipfile.ZIP_STORED)
459      time.sleep(5)  # Make sure the atime/mtime will change measurably.
460
461      common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
462      common.ZipClose(zip_file)
463
464      self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
465                   test_file_name, expected_stat, expected_mode,
466                   expected_compress_type)
467    finally:
468      os.remove(test_file_name)
469      os.remove(zip_file_name)
470
471  def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
472    extra_args = dict(extra_args or {})
473
474    zip_file = tempfile.NamedTemporaryFile(delete=False)
475    zip_file_name = zip_file.name
476    zip_file.close()
477
478    zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True)
479
480    try:
481      expected_compress_type = extra_args.get("compress_type",
482                                              zipfile.ZIP_STORED)
483      time.sleep(5)  # Make sure the atime/mtime will change measurably.
484
485      if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
486        arcname = zinfo_or_arcname
487        expected_mode = extra_args.get("perms", 0o644)
488      else:
489        arcname = zinfo_or_arcname.filename
490        if zinfo_or_arcname.external_attr:
491          zinfo_perms = zinfo_or_arcname.external_attr >> 16
492        else:
493          zinfo_perms = 0o600
494        expected_mode = extra_args.get("perms", zinfo_perms)
495
496      common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
497      common.ZipClose(zip_file)
498
499      self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
500                   expected_mode=expected_mode,
501                   expected_compress_type=expected_compress_type)
502    finally:
503      os.remove(zip_file_name)
504
505  def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
506    extra_args = dict(extra_args or {})
507
508    zip_file = tempfile.NamedTemporaryFile(delete=False)
509    zip_file_name = zip_file.name
510
511    test_file = tempfile.NamedTemporaryFile(delete=False)
512    test_file_name = test_file.name
513
514    arcname_large = test_file_name
515    arcname_small = "bar"
516
517    # File names within an archive strip the leading slash.
518    if arcname_large[0] == "/":
519      arcname_large = arcname_large[1:]
520
521    zip_file.close()
522    zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True)
523
524    try:
525      sha1_hash = sha1()
526      for data in large:
527        sha1_hash.update(data)
528        test_file.write(data)
529      test_file.close()
530
531      expected_stat = os.stat(test_file_name)
532      expected_mode = 0o644
533      expected_compress_type = extra_args.get("compress_type",
534                                              zipfile.ZIP_STORED)
535      time.sleep(5)  # Make sure the atime/mtime will change measurably.
536
537      common.ZipWrite(zip_file, test_file_name, **extra_args)
538      common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
539      common.ZipClose(zip_file)
540
541      # Verify the contents written by ZipWrite().
542      self._verify(zip_file, zip_file_name, arcname_large,
543                   sha1_hash.hexdigest(), test_file_name, expected_stat,
544                   expected_mode, expected_compress_type)
545
546      # Verify the contents written by ZipWriteStr().
547      self._verify(zip_file, zip_file_name, arcname_small,
548                   sha1(small).hexdigest(),
549                   expected_compress_type=expected_compress_type)
550    finally:
551      os.remove(zip_file_name)
552      os.remove(test_file_name)
553
554  def _test_reset_ZIP64_LIMIT(self, func, *args):
555    default_limit = (1 << 31) - 1
556    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
557    func(*args)
558    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
559
560  def test_ZipWrite(self):
561    file_contents = os.urandom(1024)
562    self._test_ZipWrite(file_contents)
563
564  def test_ZipWrite_with_opts(self):
565    file_contents = os.urandom(1024)
566    self._test_ZipWrite(file_contents, {
567        "arcname": "foobar",
568        "perms": 0o777,
569        "compress_type": zipfile.ZIP_DEFLATED,
570    })
571    self._test_ZipWrite(file_contents, {
572        "arcname": "foobar",
573        "perms": 0o700,
574        "compress_type": zipfile.ZIP_STORED,
575    })
576
577  def test_ZipWrite_large_file(self):
578    file_contents = get_2gb_string()
579    self._test_ZipWrite(file_contents, {
580        "compress_type": zipfile.ZIP_DEFLATED,
581    })
582
583  def test_ZipWrite_resets_ZIP64_LIMIT(self):
584    self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
585
586  def test_ZipWriteStr(self):
587    random_string = os.urandom(1024)
588    # Passing arcname
589    self._test_ZipWriteStr("foo", random_string)
590
591    # Passing zinfo
592    zinfo = zipfile.ZipInfo(filename="foo")
593    self._test_ZipWriteStr(zinfo, random_string)
594
595    # Timestamp in the zinfo should be overwritten.
596    zinfo.date_time = (2015, 3, 1, 15, 30, 0)
597    self._test_ZipWriteStr(zinfo, random_string)
598
599  def test_ZipWriteStr_with_opts(self):
600    random_string = os.urandom(1024)
601    # Passing arcname
602    self._test_ZipWriteStr("foo", random_string, {
603        "perms": 0o700,
604        "compress_type": zipfile.ZIP_DEFLATED,
605    })
606    self._test_ZipWriteStr("bar", random_string, {
607        "compress_type": zipfile.ZIP_STORED,
608    })
609
610    # Passing zinfo
611    zinfo = zipfile.ZipInfo(filename="foo")
612    self._test_ZipWriteStr(zinfo, random_string, {
613        "compress_type": zipfile.ZIP_DEFLATED,
614    })
615    self._test_ZipWriteStr(zinfo, random_string, {
616        "perms": 0o600,
617        "compress_type": zipfile.ZIP_STORED,
618    })
619    self._test_ZipWriteStr(zinfo, random_string, {
620        "perms": 0o000,
621        "compress_type": zipfile.ZIP_STORED,
622    })
623
624  def test_ZipWriteStr_large_file(self):
625    # zipfile.writestr() doesn't work when the str size is over 2GiB even with
626    # the workaround. We will only test the case of writing a string into a
627    # large archive.
628    long_string = get_2gb_string()
629    short_string = os.urandom(1024)
630    self._test_ZipWriteStr_large_file(long_string, short_string, {
631        "compress_type": zipfile.ZIP_DEFLATED,
632    })
633
634  def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
635    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, 'foo', b'')
636    zinfo = zipfile.ZipInfo(filename="foo")
637    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, b'')
638
639  def test_bug21309935(self):
640    zip_file = tempfile.NamedTemporaryFile(delete=False)
641    zip_file_name = zip_file.name
642    zip_file.close()
643
644    try:
645      random_string = os.urandom(1024)
646      zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True)
647      # Default perms should be 0o644 when passing the filename.
648      common.ZipWriteStr(zip_file, "foo", random_string)
649      # Honor the specified perms.
650      common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
651      # The perms in zinfo should be untouched.
652      zinfo = zipfile.ZipInfo(filename="baz")
653      zinfo.external_attr = 0o740 << 16
654      common.ZipWriteStr(zip_file, zinfo, random_string)
655      # Explicitly specified perms has the priority.
656      zinfo = zipfile.ZipInfo(filename="qux")
657      zinfo.external_attr = 0o700 << 16
658      common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
659      common.ZipClose(zip_file)
660
661      self._verify(zip_file, zip_file_name, "foo",
662                   sha1(random_string).hexdigest(),
663                   expected_mode=0o644)
664      self._verify(zip_file, zip_file_name, "bar",
665                   sha1(random_string).hexdigest(),
666                   expected_mode=0o755)
667      self._verify(zip_file, zip_file_name, "baz",
668                   sha1(random_string).hexdigest(),
669                   expected_mode=0o740)
670      self._verify(zip_file, zip_file_name, "qux",
671                   sha1(random_string).hexdigest(),
672                   expected_mode=0o400)
673    finally:
674      os.remove(zip_file_name)
675
676  @test_utils.SkipIfExternalToolsUnavailable()
677  def test_ZipDelete(self):
678    zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
679    output_zip = zipfile.ZipFile(zip_file.name, 'w',
680                                 compression=zipfile.ZIP_DEFLATED)
681    with tempfile.NamedTemporaryFile() as entry_file:
682      entry_file.write(os.urandom(1024))
683      common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
684      common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
685      common.ZipWrite(output_zip, entry_file.name, arcname='Test3')
686      common.ZipClose(output_zip)
687    zip_file.close()
688
689    try:
690      common.ZipDelete(zip_file.name, 'Test2')
691      with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip:
692        entries = check_zip.namelist()
693        self.assertTrue('Test1' in entries)
694        self.assertFalse('Test2' in entries)
695        self.assertTrue('Test3' in entries)
696
697      self.assertRaises(
698          common.ExternalError, common.ZipDelete, zip_file.name, 'Test2')
699      with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip:
700        entries = check_zip.namelist()
701        self.assertTrue('Test1' in entries)
702        self.assertFalse('Test2' in entries)
703        self.assertTrue('Test3' in entries)
704
705      common.ZipDelete(zip_file.name, ['Test3'])
706      with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip:
707        entries = check_zip.namelist()
708        self.assertTrue('Test1' in entries)
709        self.assertFalse('Test2' in entries)
710        self.assertFalse('Test3' in entries)
711
712      common.ZipDelete(zip_file.name, ['Test1', 'Test2'])
713      with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip:
714        entries = check_zip.namelist()
715        self.assertFalse('Test1' in entries)
716        self.assertFalse('Test2' in entries)
717        self.assertFalse('Test3' in entries)
718    finally:
719      os.remove(zip_file.name)
720
721  @staticmethod
722  def _test_UnzipTemp_createZipFile():
723    zip_file = common.MakeTempFile(suffix='.zip')
724    output_zip = zipfile.ZipFile(
725        zip_file, 'w', compression=zipfile.ZIP_DEFLATED)
726    contents = os.urandom(1024)
727    with tempfile.NamedTemporaryFile() as entry_file:
728      entry_file.write(contents)
729      common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
730      common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
731      common.ZipWrite(output_zip, entry_file.name, arcname='Foo3')
732      common.ZipWrite(output_zip, entry_file.name, arcname='Bar4')
733      common.ZipWrite(output_zip, entry_file.name, arcname='Dir5/Baz5')
734      common.ZipClose(output_zip)
735    common.ZipClose(output_zip)
736    return zip_file
737
738  @test_utils.SkipIfExternalToolsUnavailable()
739  def test_UnzipTemp(self):
740    zip_file = self._test_UnzipTemp_createZipFile()
741    unzipped_dir = common.UnzipTemp(zip_file)
742    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
743    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
744    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
745    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
746    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
747
748  @test_utils.SkipIfExternalToolsUnavailable()
749  def test_UnzipTemp_withPatterns(self):
750    zip_file = self._test_UnzipTemp_createZipFile()
751
752    unzipped_dir = common.UnzipTemp(zip_file, ['Test1'])
753    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
754    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
755    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
756    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
757    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
758
759    unzipped_dir = common.UnzipTemp(zip_file, ['Test1', 'Foo3'])
760    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
761    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
762    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
763    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
764    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
765
766    unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Foo3*'])
767    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
768    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
769    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
770    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
771    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
772
773    unzipped_dir = common.UnzipTemp(zip_file, ['*Test1', '*Baz*'])
774    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
775    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
776    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
777    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
778    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
779
780  def test_UnzipTemp_withEmptyPatterns(self):
781    zip_file = self._test_UnzipTemp_createZipFile()
782    unzipped_dir = common.UnzipTemp(zip_file, [])
783    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
784    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
785    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
786    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
787    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
788
789  @test_utils.SkipIfExternalToolsUnavailable()
790  def test_UnzipTemp_withPartiallyMatchingPatterns(self):
791    zip_file = self._test_UnzipTemp_createZipFile()
792    unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Nonexistent*'])
793    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
794    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
795    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
796    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
797    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
798
799  def test_UnzipTemp_withNoMatchingPatterns(self):
800    zip_file = self._test_UnzipTemp_createZipFile()
801    unzipped_dir = common.UnzipTemp(zip_file, ['Foo4', 'Nonexistent*'])
802    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
803    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
804    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
805    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
806    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
807
808
809class CommonApkUtilsTest(test_utils.ReleaseToolsTestCase):
810  """Tests the APK utils related functions."""
811
812  APKCERTS_TXT1 = (
813      'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
814      ' private_key="certs/devkey.pk8"\n'
815      'name="Settings.apk"'
816      ' certificate="build/make/target/product/security/platform.x509.pem"'
817      ' private_key="build/make/target/product/security/platform.pk8"\n'
818      'name="TV.apk" certificate="PRESIGNED" private_key=""\n'
819  )
820
821  APKCERTS_CERTMAP1 = {
822      'RecoveryLocalizer.apk' : 'certs/devkey',
823      'Settings.apk' : 'build/make/target/product/security/platform',
824      'TV.apk' : 'PRESIGNED',
825  }
826
827  APKCERTS_TXT2 = (
828      'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"'
829      ' private_key="certs/compressed1.pk8" compressed="gz"\n'
830      'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"'
831      ' private_key="certs/compressed2.pk8" compressed="gz"\n'
832      'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"'
833      ' private_key="certs/compressed2.pk8" compressed="gz"\n'
834      'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"'
835      ' private_key="certs/compressed3.pk8" compressed="gz"\n'
836  )
837
838  APKCERTS_CERTMAP2 = {
839      'Compressed1.apk' : 'certs/compressed1',
840      'Compressed2a.apk' : 'certs/compressed2',
841      'Compressed2b.apk' : 'certs/compressed2',
842      'Compressed3.apk' : 'certs/compressed3',
843  }
844
845  APKCERTS_TXT3 = (
846      'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"'
847      ' private_key="certs/compressed4.pk8" compressed="xz"\n'
848  )
849
850  APKCERTS_CERTMAP3 = {
851      'Compressed4.apk' : 'certs/compressed4',
852  }
853
854  # Test parsing with no optional fields, both optional fields, and only the
855  # partition optional field.
856  APKCERTS_TXT4 = (
857      'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
858      ' private_key="certs/devkey.pk8"\n'
859      'name="Settings.apk"'
860      ' certificate="build/make/target/product/security/platform.x509.pem"'
861      ' private_key="build/make/target/product/security/platform.pk8"'
862      ' compressed="gz" partition="system"\n'
863      'name="TV.apk" certificate="PRESIGNED" private_key=""'
864      ' partition="product"\n'
865  )
866
867  APKCERTS_CERTMAP4 = {
868      'RecoveryLocalizer.apk' : 'certs/devkey',
869      'Settings.apk' : 'build/make/target/product/security/platform',
870      'TV.apk' : 'PRESIGNED',
871  }
872
873  def setUp(self):
874    self.testdata_dir = test_utils.get_testdata_dir()
875
876  @staticmethod
877  def _write_apkcerts_txt(apkcerts_txt, additional=None):
878    if additional is None:
879      additional = []
880    target_files = common.MakeTempFile(suffix='.zip')
881    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
882      target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt)
883      for entry in additional:
884        target_files_zip.writestr(entry, '')
885    return target_files
886
887  def test_ReadApkCerts_NoncompressedApks(self):
888    target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1)
889    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
890      certmap, ext = common.ReadApkCerts(input_zip)
891
892    self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap)
893    self.assertIsNone(ext)
894
895  def test_ReadApkCerts_CompressedApks(self):
896    # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is
897    # not stored in '.gz' format, so it shouldn't be considered as installed.
898    target_files = self._write_apkcerts_txt(
899        self.APKCERTS_TXT2,
900        ['Compressed1.apk.gz', 'Compressed3.apk'])
901
902    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
903      certmap, ext = common.ReadApkCerts(input_zip)
904
905    self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap)
906    self.assertEqual('.gz', ext)
907
908    # Alternative case with '.xz'.
909    target_files = self._write_apkcerts_txt(
910        self.APKCERTS_TXT3, ['Compressed4.apk.xz'])
911
912    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
913      certmap, ext = common.ReadApkCerts(input_zip)
914
915    self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap)
916    self.assertEqual('.xz', ext)
917
918  def test_ReadApkCerts_CompressedAndNoncompressedApks(self):
919    target_files = self._write_apkcerts_txt(
920        self.APKCERTS_TXT1 + self.APKCERTS_TXT2,
921        ['Compressed1.apk.gz', 'Compressed3.apk'])
922
923    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
924      certmap, ext = common.ReadApkCerts(input_zip)
925
926    certmap_merged = self.APKCERTS_CERTMAP1.copy()
927    certmap_merged.update(self.APKCERTS_CERTMAP2)
928    self.assertDictEqual(certmap_merged, certmap)
929    self.assertEqual('.gz', ext)
930
931  def test_ReadApkCerts_MultipleCompressionMethods(self):
932    target_files = self._write_apkcerts_txt(
933        self.APKCERTS_TXT2 + self.APKCERTS_TXT3,
934        ['Compressed1.apk.gz', 'Compressed4.apk.xz'])
935
936    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
937      self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
938
939  def test_ReadApkCerts_MismatchingKeys(self):
940    malformed_apkcerts_txt = (
941        'name="App1.apk" certificate="certs/cert1.x509.pem"'
942        ' private_key="certs/cert2.pk8"\n'
943    )
944    target_files = self._write_apkcerts_txt(malformed_apkcerts_txt)
945
946    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
947      self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
948
949  def test_ReadApkCerts_WithWithoutOptionalFields(self):
950    target_files = self._write_apkcerts_txt(self.APKCERTS_TXT4)
951    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
952      certmap, ext = common.ReadApkCerts(input_zip)
953
954    self.assertDictEqual(self.APKCERTS_CERTMAP4, certmap)
955    self.assertIsNone(ext)
956
957  def test_ExtractPublicKey(self):
958    cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
959    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
960    with open(pubkey) as pubkey_fp:
961      self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert))
962
963  def test_ExtractPublicKey_invalidInput(self):
964    wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8')
965    self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input)
966
967  @test_utils.SkipIfExternalToolsUnavailable()
968  def test_ExtractAvbPublicKey(self):
969    privkey = os.path.join(self.testdata_dir, 'testkey.key')
970    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
971    extracted_from_privkey = common.ExtractAvbPublicKey('avbtool', privkey)
972    extracted_from_pubkey = common.ExtractAvbPublicKey('avbtool', pubkey)
973    with open(extracted_from_privkey, 'rb') as privkey_fp, \
974        open(extracted_from_pubkey, 'rb') as pubkey_fp:
975      self.assertEqual(privkey_fp.read(), pubkey_fp.read())
976
977  def test_ParseCertificate(self):
978    cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
979
980    cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER']
981    proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
982                      universal_newlines=False)
983    expected, _ = proc.communicate()
984    self.assertEqual(0, proc.returncode)
985
986    with open(cert) as cert_fp:
987      actual = common.ParseCertificate(cert_fp.read())
988    self.assertEqual(expected, actual)
989
990  @test_utils.SkipIfExternalToolsUnavailable()
991  def test_GetMinSdkVersion(self):
992    test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
993    self.assertEqual('24', common.GetMinSdkVersion(test_app))
994
995  @test_utils.SkipIfExternalToolsUnavailable()
996  def test_GetMinSdkVersion_invalidInput(self):
997    self.assertRaises(
998        common.ExternalError, common.GetMinSdkVersion, 'does-not-exist.apk')
999
1000  @test_utils.SkipIfExternalToolsUnavailable()
1001  def test_GetMinSdkVersionInt(self):
1002    test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
1003    self.assertEqual(24, common.GetMinSdkVersionInt(test_app, {}))
1004
1005  @test_utils.SkipIfExternalToolsUnavailable()
1006  def test_GetMinSdkVersionInt_invalidInput(self):
1007    self.assertRaises(
1008        common.ExternalError, common.GetMinSdkVersionInt, 'does-not-exist.apk',
1009        {})
1010
1011
1012class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
1013
1014  def setUp(self):
1015    self.testdata_dir = test_utils.get_testdata_dir()
1016
1017  @test_utils.SkipIfExternalToolsUnavailable()
1018  def test_GetSparseImage_emptyBlockMapFile(self):
1019    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1020    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1021      target_files_zip.write(
1022          test_utils.construct_sparse_image([
1023              (0xCAC1, 6),
1024              (0xCAC3, 3),
1025              (0xCAC1, 4)]),
1026          arcname='IMAGES/system.img')
1027      target_files_zip.writestr('IMAGES/system.map', '')
1028      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
1029      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1030
1031    tempdir = common.UnzipTemp(target_files)
1032    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1033      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1034
1035    self.assertDictEqual(
1036        {
1037            '__COPY': RangeSet("0"),
1038            '__NONZERO-0': RangeSet("1-5 9-12"),
1039        },
1040        sparse_image.file_map)
1041
1042  def test_PartitionMapFromTargetFiles(self):
1043    target_files_dir = common.MakeTempDir()
1044    os.makedirs(os.path.join(target_files_dir, 'SYSTEM'))
1045    os.makedirs(os.path.join(target_files_dir, 'SYSTEM', 'vendor'))
1046    os.makedirs(os.path.join(target_files_dir, 'PRODUCT'))
1047    os.makedirs(os.path.join(target_files_dir, 'SYSTEM', 'product'))
1048    os.makedirs(os.path.join(target_files_dir, 'SYSTEM', 'vendor', 'odm'))
1049    os.makedirs(os.path.join(target_files_dir, 'VENDOR_DLKM'))
1050    partition_map = common.PartitionMapFromTargetFiles(target_files_dir)
1051    self.assertDictEqual(
1052        partition_map,
1053        {
1054            'system': 'SYSTEM',
1055            'vendor': 'SYSTEM/vendor',
1056            # Prefer PRODUCT over SYSTEM/product
1057            'product': 'PRODUCT',
1058            'odm': 'SYSTEM/vendor/odm',
1059            'vendor_dlkm': 'VENDOR_DLKM',
1060            # No system_ext or odm_dlkm
1061        })
1062
1063  def test_SharedUidPartitionViolations(self):
1064    uid_dict = {
1065        'android.uid.phone': {
1066            'system': ['system_phone.apk'],
1067            'system_ext': ['system_ext_phone.apk'],
1068        },
1069        'android.uid.wifi': {
1070            'vendor': ['vendor_wifi.apk'],
1071            'odm': ['odm_wifi.apk'],
1072        },
1073    }
1074    errors = common.SharedUidPartitionViolations(
1075        uid_dict, [('system', 'system_ext'), ('vendor', 'odm')])
1076    self.assertEqual(errors, [])
1077
1078  def test_SharedUidPartitionViolations_Violation(self):
1079    uid_dict = {
1080        'android.uid.phone': {
1081            'system': ['system_phone.apk'],
1082            'vendor': ['vendor_phone.apk'],
1083        },
1084    }
1085    errors = common.SharedUidPartitionViolations(
1086        uid_dict, [('system', 'system_ext'), ('vendor', 'odm')])
1087    self.assertIn(
1088        ('APK sharedUserId "android.uid.phone" found across partition groups '
1089         'in partitions "system,vendor"'), errors)
1090
1091  def test_GetSparseImage_missingImageFile(self):
1092    self.assertRaises(
1093        AssertionError, common.GetSparseImage, 'system2', self.testdata_dir,
1094        None, False)
1095    self.assertRaises(
1096        AssertionError, common.GetSparseImage, 'unknown', self.testdata_dir,
1097        None, False)
1098
1099  @test_utils.SkipIfExternalToolsUnavailable()
1100  def test_GetSparseImage_missingBlockMapFile(self):
1101    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1102    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1103      target_files_zip.write(
1104          test_utils.construct_sparse_image([
1105              (0xCAC1, 6),
1106              (0xCAC3, 3),
1107              (0xCAC1, 4)]),
1108          arcname='IMAGES/system.img')
1109      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
1110      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1111
1112    tempdir = common.UnzipTemp(target_files)
1113    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1114      self.assertRaises(
1115          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
1116          False)
1117
1118  @test_utils.SkipIfExternalToolsUnavailable()
1119  def test_GetSparseImage_sharedBlocks_notAllowed(self):
1120    """Tests the case of having overlapping blocks but disallowed."""
1121    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1122    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1123      target_files_zip.write(
1124          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1125          arcname='IMAGES/system.img')
1126      # Block 10 is shared between two files.
1127      target_files_zip.writestr(
1128          'IMAGES/system.map',
1129          '\n'.join([
1130              '/system/file1 1-5 9-10',
1131              '/system/file2 10-12']))
1132      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1133      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1134
1135    tempdir = common.UnzipTemp(target_files)
1136    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1137      self.assertRaises(
1138          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
1139          False)
1140
1141  @test_utils.SkipIfExternalToolsUnavailable()
1142  def test_GetSparseImage_sharedBlocks_allowed(self):
1143    """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true."""
1144    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1145    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1146      # Construct an image with a care_map of "0-5 9-12".
1147      target_files_zip.write(
1148          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1149          arcname='IMAGES/system.img')
1150      # Block 10 is shared between two files.
1151      target_files_zip.writestr(
1152          'IMAGES/system.map',
1153          '\n'.join([
1154              '/system/file1 1-5 9-10',
1155              '/system/file2 10-12']))
1156      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1157      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1158
1159    tempdir = common.UnzipTemp(target_files)
1160    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1161      sparse_image = common.GetSparseImage('system', tempdir, input_zip, True)
1162
1163    self.assertDictEqual(
1164        {
1165            '__COPY': RangeSet("0"),
1166            '__NONZERO-0': RangeSet("6-8 13-15"),
1167            '/system/file1': RangeSet("1-5 9-10"),
1168            '/system/file2': RangeSet("11-12"),
1169        },
1170        sparse_image.file_map)
1171
1172    # '/system/file2' should be marked with 'uses_shared_blocks', but not with
1173    # 'incomplete'.
1174    self.assertTrue(
1175        sparse_image.file_map['/system/file2'].extra['uses_shared_blocks'])
1176    self.assertNotIn(
1177        'incomplete', sparse_image.file_map['/system/file2'].extra)
1178
1179    # '/system/file1' will only contain one field -- a copy of the input text.
1180    self.assertEqual(1, len(sparse_image.file_map['/system/file1'].extra))
1181
1182    # Meta entries should not have any extra tag.
1183    self.assertFalse(sparse_image.file_map['__COPY'].extra)
1184    self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra)
1185
1186  @test_utils.SkipIfExternalToolsUnavailable()
1187  def test_GetSparseImage_incompleteRanges(self):
1188    """Tests the case of ext4 images with holes."""
1189    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1190    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1191      target_files_zip.write(
1192          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1193          arcname='IMAGES/system.img')
1194      target_files_zip.writestr(
1195          'IMAGES/system.map',
1196          '\n'.join([
1197              '/system/file1 1-5 9-10',
1198              '/system/file2 11-12']))
1199      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1200      # '/system/file2' has less blocks listed (2) than actual (3).
1201      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1202
1203    tempdir = common.UnzipTemp(target_files)
1204    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1205      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1206
1207    self.assertEqual(
1208        '1-5 9-10',
1209        sparse_image.file_map['/system/file1'].extra['text_str'])
1210    self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete'])
1211
1212  @test_utils.SkipIfExternalToolsUnavailable()
1213  def test_GetSparseImage_systemRootImage_filenameWithExtraLeadingSlash(self):
1214    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1215    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1216      target_files_zip.write(
1217          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1218          arcname='IMAGES/system.img')
1219      target_files_zip.writestr(
1220          'IMAGES/system.map',
1221          '\n'.join([
1222              '//system/file1 1-5 9-10',
1223              '//system/file2 11-12',
1224              '/system/app/file3 13-15']))
1225      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1226      # '/system/file2' has less blocks listed (2) than actual (3).
1227      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1228      # '/system/app/file3' has less blocks listed (3) than actual (4).
1229      target_files_zip.writestr('SYSTEM/app/file3', os.urandom(4096 * 4))
1230
1231    tempdir = common.UnzipTemp(target_files)
1232    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1233      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1234
1235    self.assertEqual(
1236        '1-5 9-10',
1237        sparse_image.file_map['//system/file1'].extra['text_str'])
1238    self.assertTrue(sparse_image.file_map['//system/file2'].extra['incomplete'])
1239    self.assertTrue(
1240        sparse_image.file_map['/system/app/file3'].extra['incomplete'])
1241
1242  @test_utils.SkipIfExternalToolsUnavailable()
1243  def test_GetSparseImage_systemRootImage_nonSystemFiles(self):
1244    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1245    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1246      target_files_zip.write(
1247          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1248          arcname='IMAGES/system.img')
1249      target_files_zip.writestr(
1250          'IMAGES/system.map',
1251          '\n'.join([
1252              '//system/file1 1-5 9-10',
1253              '//init.rc 13-15']))
1254      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1255      # '/init.rc' has less blocks listed (3) than actual (4).
1256      target_files_zip.writestr('ROOT/init.rc', os.urandom(4096 * 4))
1257
1258    tempdir = common.UnzipTemp(target_files)
1259    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1260      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1261
1262    self.assertEqual(
1263        '1-5 9-10',
1264        sparse_image.file_map['//system/file1'].extra['text_str'])
1265    self.assertTrue(sparse_image.file_map['//init.rc'].extra['incomplete'])
1266
1267  @test_utils.SkipIfExternalToolsUnavailable()
1268  def test_GetSparseImage_fileNotFound(self):
1269    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1270    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1271      target_files_zip.write(
1272          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1273          arcname='IMAGES/system.img')
1274      target_files_zip.writestr(
1275          'IMAGES/system.map',
1276          '\n'.join([
1277              '//system/file1 1-5 9-10',
1278              '//system/file2 11-12']))
1279      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1280
1281    tempdir = common.UnzipTemp(target_files)
1282    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1283      self.assertRaises(
1284          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
1285          False)
1286
1287  @test_utils.SkipIfExternalToolsUnavailable()
1288  def test_GetAvbChainedPartitionArg(self):
1289    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
1290    info_dict = {
1291        'avb_avbtool': 'avbtool',
1292        'avb_system_key_path': pubkey,
1293        'avb_system_rollback_index_location': 2,
1294    }
1295    args = common.GetAvbChainedPartitionArg('system', info_dict).split(':')
1296    self.assertEqual(3, len(args))
1297    self.assertEqual('system', args[0])
1298    self.assertEqual('2', args[1])
1299    self.assertTrue(os.path.exists(args[2]))
1300
1301  @test_utils.SkipIfExternalToolsUnavailable()
1302  def test_GetAvbChainedPartitionArg_withPrivateKey(self):
1303    key = os.path.join(self.testdata_dir, 'testkey.key')
1304    info_dict = {
1305        'avb_avbtool': 'avbtool',
1306        'avb_product_key_path': key,
1307        'avb_product_rollback_index_location': 2,
1308    }
1309    args = common.GetAvbChainedPartitionArg('product', info_dict).split(':')
1310    self.assertEqual(3, len(args))
1311    self.assertEqual('product', args[0])
1312    self.assertEqual('2', args[1])
1313    self.assertTrue(os.path.exists(args[2]))
1314
1315  @test_utils.SkipIfExternalToolsUnavailable()
1316  def test_GetAvbChainedPartitionArg_withSpecifiedKey(self):
1317    info_dict = {
1318        'avb_avbtool': 'avbtool',
1319        'avb_system_key_path': 'does-not-exist',
1320        'avb_system_rollback_index_location': 2,
1321    }
1322    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
1323    args = common.GetAvbChainedPartitionArg(
1324        'system', info_dict, pubkey).split(':')
1325    self.assertEqual(3, len(args))
1326    self.assertEqual('system', args[0])
1327    self.assertEqual('2', args[1])
1328    self.assertTrue(os.path.exists(args[2]))
1329
1330  @test_utils.SkipIfExternalToolsUnavailable()
1331  def test_GetAvbChainedPartitionArg_invalidKey(self):
1332    pubkey = os.path.join(self.testdata_dir, 'testkey_with_passwd.x509.pem')
1333    info_dict = {
1334        'avb_avbtool': 'avbtool',
1335        'avb_system_key_path': pubkey,
1336        'avb_system_rollback_index_location': 2,
1337    }
1338    self.assertRaises(
1339        common.ExternalError, common.GetAvbChainedPartitionArg, 'system',
1340        info_dict)
1341
1342  INFO_DICT_DEFAULT = {
1343      'recovery_api_version': 3,
1344      'fstab_version': 2,
1345      'system_root_image': 'true',
1346      'no_recovery' : 'true',
1347      'recovery_as_boot': 'true',
1348  }
1349
1350  def test_LoadListFromFile(self):
1351    file_path = os.path.join(self.testdata_dir,
1352                             'merge_config_framework_item_list')
1353    contents = common.LoadListFromFile(file_path)
1354    expected_contents = [
1355        'META/apkcerts.txt',
1356        'META/filesystem_config.txt',
1357        'META/root_filesystem_config.txt',
1358        'META/system_manifest.xml',
1359        'META/system_matrix.xml',
1360        'META/update_engine_config.txt',
1361        'PRODUCT/*',
1362        'ROOT/*',
1363        'SYSTEM/*',
1364    ]
1365    self.assertEqual(sorted(contents), sorted(expected_contents))
1366
1367  @staticmethod
1368  def _test_LoadInfoDict_createTargetFiles(info_dict, fstab_path):
1369    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1370    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1371      info_values = ''.join(
1372          ['{}={}\n'.format(k, v) for k, v in sorted(info_dict.items())])
1373      common.ZipWriteStr(target_files_zip, 'META/misc_info.txt', info_values)
1374
1375      FSTAB_TEMPLATE = "/dev/block/system {} ext4 ro,barrier=1 defaults"
1376      if info_dict.get('system_root_image') == 'true':
1377        fstab_values = FSTAB_TEMPLATE.format('/')
1378      else:
1379        fstab_values = FSTAB_TEMPLATE.format('/system')
1380      common.ZipWriteStr(target_files_zip, fstab_path, fstab_values)
1381
1382      common.ZipWriteStr(
1383          target_files_zip, 'META/file_contexts', 'file-contexts')
1384    return target_files
1385
1386  def test_LoadInfoDict(self):
1387    target_files = self._test_LoadInfoDict_createTargetFiles(
1388        self.INFO_DICT_DEFAULT,
1389        'BOOT/RAMDISK/system/etc/recovery.fstab')
1390    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1391      loaded_dict = common.LoadInfoDict(target_files_zip)
1392      self.assertEqual(3, loaded_dict['recovery_api_version'])
1393      self.assertEqual(2, loaded_dict['fstab_version'])
1394      self.assertIn('/', loaded_dict['fstab'])
1395      self.assertIn('/system', loaded_dict['fstab'])
1396
1397  def test_LoadInfoDict_legacyRecoveryFstabPath(self):
1398    target_files = self._test_LoadInfoDict_createTargetFiles(
1399        self.INFO_DICT_DEFAULT,
1400        'BOOT/RAMDISK/etc/recovery.fstab')
1401    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1402      loaded_dict = common.LoadInfoDict(target_files_zip)
1403      self.assertEqual(3, loaded_dict['recovery_api_version'])
1404      self.assertEqual(2, loaded_dict['fstab_version'])
1405      self.assertIn('/', loaded_dict['fstab'])
1406      self.assertIn('/system', loaded_dict['fstab'])
1407
1408  @test_utils.SkipIfExternalToolsUnavailable()
1409  def test_LoadInfoDict_dirInput(self):
1410    target_files = self._test_LoadInfoDict_createTargetFiles(
1411        self.INFO_DICT_DEFAULT,
1412        'BOOT/RAMDISK/system/etc/recovery.fstab')
1413    unzipped = common.UnzipTemp(target_files)
1414    loaded_dict = common.LoadInfoDict(unzipped)
1415    self.assertEqual(3, loaded_dict['recovery_api_version'])
1416    self.assertEqual(2, loaded_dict['fstab_version'])
1417    self.assertIn('/', loaded_dict['fstab'])
1418    self.assertIn('/system', loaded_dict['fstab'])
1419
1420  @test_utils.SkipIfExternalToolsUnavailable()
1421  def test_LoadInfoDict_dirInput_legacyRecoveryFstabPath(self):
1422    target_files = self._test_LoadInfoDict_createTargetFiles(
1423        self.INFO_DICT_DEFAULT,
1424        'BOOT/RAMDISK/system/etc/recovery.fstab')
1425    unzipped = common.UnzipTemp(target_files)
1426    loaded_dict = common.LoadInfoDict(unzipped)
1427    self.assertEqual(3, loaded_dict['recovery_api_version'])
1428    self.assertEqual(2, loaded_dict['fstab_version'])
1429    self.assertIn('/', loaded_dict['fstab'])
1430    self.assertIn('/system', loaded_dict['fstab'])
1431
1432  def test_LoadInfoDict_systemRootImageFalse(self):
1433    # Devices not using system-as-root nor recovery-as-boot. Non-A/B devices
1434    # launched prior to P will likely have this config.
1435    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
1436    del info_dict['no_recovery']
1437    del info_dict['system_root_image']
1438    del info_dict['recovery_as_boot']
1439    target_files = self._test_LoadInfoDict_createTargetFiles(
1440        info_dict,
1441        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
1442    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1443      loaded_dict = common.LoadInfoDict(target_files_zip)
1444      self.assertEqual(3, loaded_dict['recovery_api_version'])
1445      self.assertEqual(2, loaded_dict['fstab_version'])
1446      self.assertNotIn('/', loaded_dict['fstab'])
1447      self.assertIn('/system', loaded_dict['fstab'])
1448
1449  def test_LoadInfoDict_recoveryAsBootFalse(self):
1450    # Devices using system-as-root, but with standalone recovery image. Non-A/B
1451    # devices launched since P will likely have this config.
1452    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
1453    del info_dict['no_recovery']
1454    del info_dict['recovery_as_boot']
1455    target_files = self._test_LoadInfoDict_createTargetFiles(
1456        info_dict,
1457        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
1458    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1459      loaded_dict = common.LoadInfoDict(target_files_zip)
1460      self.assertEqual(3, loaded_dict['recovery_api_version'])
1461      self.assertEqual(2, loaded_dict['fstab_version'])
1462      self.assertIn('/', loaded_dict['fstab'])
1463      self.assertIn('/system', loaded_dict['fstab'])
1464
1465  def test_LoadInfoDict_noRecoveryTrue(self):
1466    # Device doesn't have a recovery partition at all.
1467    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
1468    del info_dict['recovery_as_boot']
1469    target_files = self._test_LoadInfoDict_createTargetFiles(
1470        info_dict,
1471        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
1472    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1473      loaded_dict = common.LoadInfoDict(target_files_zip)
1474      self.assertEqual(3, loaded_dict['recovery_api_version'])
1475      self.assertEqual(2, loaded_dict['fstab_version'])
1476      self.assertIsNone(loaded_dict['fstab'])
1477
1478  @test_utils.SkipIfExternalToolsUnavailable()
1479  def test_LoadInfoDict_missingMetaMiscInfoTxt(self):
1480    target_files = self._test_LoadInfoDict_createTargetFiles(
1481        self.INFO_DICT_DEFAULT,
1482        'BOOT/RAMDISK/system/etc/recovery.fstab')
1483    common.ZipDelete(target_files, 'META/misc_info.txt')
1484    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1485      self.assertRaises(ValueError, common.LoadInfoDict, target_files_zip)
1486
1487  @test_utils.SkipIfExternalToolsUnavailable()
1488  def test_LoadInfoDict_repacking(self):
1489    target_files = self._test_LoadInfoDict_createTargetFiles(
1490        self.INFO_DICT_DEFAULT,
1491        'BOOT/RAMDISK/system/etc/recovery.fstab')
1492    unzipped = common.UnzipTemp(target_files)
1493    loaded_dict = common.LoadInfoDict(unzipped, True)
1494    self.assertEqual(3, loaded_dict['recovery_api_version'])
1495    self.assertEqual(2, loaded_dict['fstab_version'])
1496    self.assertIn('/', loaded_dict['fstab'])
1497    self.assertIn('/system', loaded_dict['fstab'])
1498    self.assertEqual(
1499        os.path.join(unzipped, 'ROOT'), loaded_dict['root_dir'])
1500    self.assertEqual(
1501        os.path.join(unzipped, 'META', 'root_filesystem_config.txt'),
1502        loaded_dict['root_fs_config'])
1503
1504  def test_LoadInfoDict_repackingWithZipFileInput(self):
1505    target_files = self._test_LoadInfoDict_createTargetFiles(
1506        self.INFO_DICT_DEFAULT,
1507        'BOOT/RAMDISK/system/etc/recovery.fstab')
1508    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1509      self.assertRaises(
1510          AssertionError, common.LoadInfoDict, target_files_zip, True)
1511
1512  def test_MergeDynamicPartitionInfoDicts_ReturnsMergedDict(self):
1513    framework_dict = {
1514        'use_dynamic_partitions': 'true',
1515        'super_partition_groups': 'group_a',
1516        'dynamic_partition_list': 'system',
1517        'super_group_a_partition_list': 'system',
1518    }
1519    vendor_dict = {
1520        'use_dynamic_partitions': 'true',
1521        'super_partition_groups': 'group_a group_b',
1522        'dynamic_partition_list': 'vendor product',
1523        'super_block_devices': 'super',
1524        'super_super_device_size': '3000',
1525        'super_group_a_partition_list': 'vendor',
1526        'super_group_a_group_size': '1000',
1527        'super_group_b_partition_list': 'product',
1528        'super_group_b_group_size': '2000',
1529    }
1530    merged_dict = common.MergeDynamicPartitionInfoDicts(
1531        framework_dict=framework_dict,
1532        vendor_dict=vendor_dict)
1533    expected_merged_dict = {
1534        'use_dynamic_partitions': 'true',
1535        'super_partition_groups': 'group_a group_b',
1536        'dynamic_partition_list': 'product system vendor',
1537        'super_block_devices': 'super',
1538        'super_super_device_size': '3000',
1539        'super_group_a_partition_list': 'system vendor',
1540        'super_group_a_group_size': '1000',
1541        'super_group_b_partition_list': 'product',
1542        'super_group_b_group_size': '2000',
1543    }
1544    self.assertEqual(merged_dict, expected_merged_dict)
1545
1546  def test_MergeDynamicPartitionInfoDicts_IgnoringFrameworkGroupSize(self):
1547    framework_dict = {
1548        'use_dynamic_partitions': 'true',
1549        'super_partition_groups': 'group_a',
1550        'dynamic_partition_list': 'system',
1551        'super_group_a_partition_list': 'system',
1552        'super_group_a_group_size': '5000',
1553    }
1554    vendor_dict = {
1555        'use_dynamic_partitions': 'true',
1556        'super_partition_groups': 'group_a group_b',
1557        'dynamic_partition_list': 'vendor product',
1558        'super_group_a_partition_list': 'vendor',
1559        'super_group_a_group_size': '1000',
1560        'super_group_b_partition_list': 'product',
1561        'super_group_b_group_size': '2000',
1562    }
1563    merged_dict = common.MergeDynamicPartitionInfoDicts(
1564        framework_dict=framework_dict,
1565        vendor_dict=vendor_dict)
1566    expected_merged_dict = {
1567        'use_dynamic_partitions': 'true',
1568        'super_partition_groups': 'group_a group_b',
1569        'dynamic_partition_list': 'product system vendor',
1570        'super_group_a_partition_list': 'system vendor',
1571        'super_group_a_group_size': '1000',
1572        'super_group_b_partition_list': 'product',
1573        'super_group_b_group_size': '2000',
1574    }
1575    self.assertEqual(merged_dict, expected_merged_dict)
1576
1577  def test_GetAvbPartitionArg(self):
1578    info_dict = {}
1579    cmd = common.GetAvbPartitionArg('system', '/path/to/system.img', info_dict)
1580    self.assertEqual(
1581        ['--include_descriptors_from_image', '/path/to/system.img'], cmd)
1582
1583  @test_utils.SkipIfExternalToolsUnavailable()
1584  def test_AppendVBMetaArgsForPartition_vendorAsChainedPartition(self):
1585    testdata_dir = test_utils.get_testdata_dir()
1586    pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
1587    info_dict = {
1588        'avb_avbtool': 'avbtool',
1589        'avb_vendor_key_path': pubkey,
1590        'avb_vendor_rollback_index_location': 5,
1591    }
1592    cmd = common.GetAvbPartitionArg('vendor', '/path/to/vendor.img', info_dict)
1593    self.assertEqual(2, len(cmd))
1594    self.assertEqual('--chain_partition', cmd[0])
1595    chained_partition_args = cmd[1].split(':')
1596    self.assertEqual(3, len(chained_partition_args))
1597    self.assertEqual('vendor', chained_partition_args[0])
1598    self.assertEqual('5', chained_partition_args[1])
1599    self.assertTrue(os.path.exists(chained_partition_args[2]))
1600
1601  @test_utils.SkipIfExternalToolsUnavailable()
1602  def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_nonAb(self):
1603    testdata_dir = test_utils.get_testdata_dir()
1604    pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
1605    info_dict = {
1606        'avb_avbtool': 'avbtool',
1607        'avb_recovery_key_path': pubkey,
1608        'avb_recovery_rollback_index_location': 3,
1609    }
1610    cmd = common.GetAvbPartitionArg(
1611        'recovery', '/path/to/recovery.img', info_dict)
1612    self.assertFalse(cmd)
1613
1614  @test_utils.SkipIfExternalToolsUnavailable()
1615  def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_ab(self):
1616    testdata_dir = test_utils.get_testdata_dir()
1617    pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
1618    info_dict = {
1619        'ab_update': 'true',
1620        'avb_avbtool': 'avbtool',
1621        'avb_recovery_key_path': pubkey,
1622        'avb_recovery_rollback_index_location': 3,
1623    }
1624    cmd = common.GetAvbPartitionArg(
1625        'recovery', '/path/to/recovery.img', info_dict)
1626    self.assertEqual(2, len(cmd))
1627    self.assertEqual('--chain_partition', cmd[0])
1628    chained_partition_args = cmd[1].split(':')
1629    self.assertEqual(3, len(chained_partition_args))
1630    self.assertEqual('recovery', chained_partition_args[0])
1631    self.assertEqual('3', chained_partition_args[1])
1632    self.assertTrue(os.path.exists(chained_partition_args[2]))
1633
1634  def test_GenerateGkiCertificate_KeyPathNotFound(self):
1635    pubkey = os.path.join(self.testdata_dir, 'no_testkey_gki.pem')
1636    self.assertFalse(os.path.exists(pubkey))
1637
1638    common.OPTIONS.info_dict = {
1639        'gki_signing_key_path': pubkey,
1640        'gki_signing_algorithm': 'SHA256_RSA4096',
1641        'gki_signing_signature_args': '--prop foo:bar',
1642    }
1643    test_file = tempfile.NamedTemporaryFile()
1644    self.assertRaises(common.ExternalError, common._GenerateGkiCertificate,
1645                      test_file.name, 'generic_kernel')
1646
1647  def test_GenerateGkiCertificate_SearchKeyPathNotFound(self):
1648    pubkey = 'no_testkey_gki.pem'
1649    self.assertFalse(os.path.exists(pubkey))
1650
1651    # Tests it should raise ExternalError if no key found under
1652    # OPTIONS.search_path.
1653    search_path_dir = common.MakeTempDir()
1654    search_pubkey = os.path.join(search_path_dir, pubkey)
1655    self.assertFalse(os.path.exists(search_pubkey))
1656
1657    common.OPTIONS.search_path = search_path_dir
1658    common.OPTIONS.info_dict = {
1659        'gki_signing_key_path': pubkey,
1660        'gki_signing_algorithm': 'SHA256_RSA4096',
1661        'gki_signing_signature_args': '--prop foo:bar',
1662    }
1663    test_file = tempfile.NamedTemporaryFile()
1664    self.assertRaises(common.ExternalError, common._GenerateGkiCertificate,
1665                      test_file.name, 'generic_kernel')
1666
1667class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase):
1668  """Checks the format of install-recovery.sh.
1669
1670  Its format should match between common.py and validate_target_files.py.
1671  """
1672
1673  def setUp(self):
1674    self._tempdir = common.MakeTempDir()
1675    # Create a fake dict that contains the fstab info for boot&recovery.
1676    self._info = {"fstab" : {}}
1677    fake_fstab = [
1678        "/dev/soc.0/by-name/boot /boot emmc defaults defaults",
1679        "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
1680    self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, fake_fstab)
1681    # Construct the gzipped recovery.img and boot.img
1682    self.recovery_data = bytearray([
1683        0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
1684        0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
1685        0x08, 0x00, 0x00, 0x00
1686    ])
1687    # echo -n "boot" | gzip -f | hd
1688    self.boot_data = bytearray([
1689        0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
1690        0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
1691    ])
1692
1693  def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
1694    loc = os.path.join(self._tempdir, prefix, name)
1695    if not os.path.exists(os.path.dirname(loc)):
1696      os.makedirs(os.path.dirname(loc))
1697    with open(loc, "wb") as f:
1698      f.write(data)
1699
1700  def test_full_recovery(self):
1701    recovery_image = common.File("recovery.img", self.recovery_data)
1702    boot_image = common.File("boot.img", self.boot_data)
1703    self._info["full_recovery_image"] = "true"
1704
1705    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1706                             recovery_image, boot_image, self._info)
1707    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1708                                                        self._info)
1709
1710  @test_utils.SkipIfExternalToolsUnavailable()
1711  def test_recovery_from_boot(self):
1712    recovery_image = common.File("recovery.img", self.recovery_data)
1713    self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
1714    boot_image = common.File("boot.img", self.boot_data)
1715    self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
1716
1717    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1718                             recovery_image, boot_image, self._info)
1719    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1720                                                        self._info)
1721    # Validate 'recovery-from-boot' with bonus argument.
1722    self._out_tmp_sink("etc/recovery-resource.dat", b"bonus", "SYSTEM")
1723    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1724                             recovery_image, boot_image, self._info)
1725    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1726                                                        self._info)
1727
1728
1729class MockBlockDifference(object):
1730
1731  def __init__(self, partition, tgt, src=None):
1732    self.partition = partition
1733    self.tgt = tgt
1734    self.src = src
1735
1736  def WriteScript(self, script, _, progress=None,
1737                  write_verify_script=False):
1738    if progress:
1739      script.AppendExtra("progress({})".format(progress))
1740    script.AppendExtra("patch({});".format(self.partition))
1741    if write_verify_script:
1742      self.WritePostInstallVerifyScript(script)
1743
1744  def WritePostInstallVerifyScript(self, script):
1745    script.AppendExtra("verify({});".format(self.partition))
1746
1747
1748class FakeSparseImage(object):
1749
1750  def __init__(self, size):
1751    self.blocksize = 4096
1752    self.total_blocks = size // 4096
1753    assert size % 4096 == 0, "{} is not a multiple of 4096".format(size)
1754
1755
1756class DynamicPartitionsDifferenceTest(test_utils.ReleaseToolsTestCase):
1757
1758  @staticmethod
1759  def get_op_list(output_path):
1760    with zipfile.ZipFile(output_path, allowZip64=True) as output_zip:
1761      with output_zip.open('dynamic_partitions_op_list') as op_list:
1762        return [line.decode().strip() for line in op_list.readlines()
1763                if not line.startswith(b'#')]
1764
1765  def setUp(self):
1766    self.script = test_utils.MockScriptWriter()
1767    self.output_path = common.MakeTempFile(suffix='.zip')
1768
1769  def test_full(self):
1770    target_info = common.LoadDictionaryFromLines("""
1771dynamic_partition_list=system vendor
1772super_partition_groups=group_foo
1773super_group_foo_group_size={group_size}
1774super_group_foo_partition_list=system vendor
1775""".format(group_size=4 * GiB).split("\n"))
1776    block_diffs = [MockBlockDifference("system", FakeSparseImage(3 * GiB)),
1777                   MockBlockDifference("vendor", FakeSparseImage(1 * GiB))]
1778
1779    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs)
1780    with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
1781      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1782
1783    self.assertEqual(str(self.script).strip(), """
1784assert(update_dynamic_partitions(package_extract_file("dynamic_partitions_op_list")));
1785patch(system);
1786verify(system);
1787unmap_partition("system");
1788patch(vendor);
1789verify(vendor);
1790unmap_partition("vendor");
1791""".strip())
1792
1793    lines = self.get_op_list(self.output_path)
1794
1795    remove_all_groups = lines.index("remove_all_groups")
1796    add_group = lines.index("add_group group_foo 4294967296")
1797    add_vendor = lines.index("add vendor group_foo")
1798    add_system = lines.index("add system group_foo")
1799    resize_vendor = lines.index("resize vendor 1073741824")
1800    resize_system = lines.index("resize system 3221225472")
1801
1802    self.assertLess(remove_all_groups, add_group,
1803                    "Should add groups after removing all groups")
1804    self.assertLess(add_group, min(add_vendor, add_system),
1805                    "Should add partitions after adding group")
1806    self.assertLess(add_system, resize_system,
1807                    "Should resize system after adding it")
1808    self.assertLess(add_vendor, resize_vendor,
1809                    "Should resize vendor after adding it")
1810
1811  def test_inc_groups(self):
1812    source_info = common.LoadDictionaryFromLines("""
1813super_partition_groups=group_foo group_bar group_baz
1814super_group_foo_group_size={group_foo_size}
1815super_group_bar_group_size={group_bar_size}
1816""".format(group_foo_size=4 * GiB, group_bar_size=3 * GiB).split("\n"))
1817    target_info = common.LoadDictionaryFromLines("""
1818super_partition_groups=group_foo group_baz group_qux
1819super_group_foo_group_size={group_foo_size}
1820super_group_baz_group_size={group_baz_size}
1821super_group_qux_group_size={group_qux_size}
1822""".format(group_foo_size=3 * GiB, group_baz_size=4 * GiB,
1823           group_qux_size=1 * GiB).split("\n"))
1824
1825    dp_diff = common.DynamicPartitionsDifference(target_info,
1826                                                 block_diffs=[],
1827                                                 source_info_dict=source_info)
1828    with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
1829      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1830
1831    lines = self.get_op_list(self.output_path)
1832
1833    removed = lines.index("remove_group group_bar")
1834    shrunk = lines.index("resize_group group_foo 3221225472")
1835    grown = lines.index("resize_group group_baz 4294967296")
1836    added = lines.index("add_group group_qux 1073741824")
1837
1838    self.assertLess(max(removed, shrunk),
1839                    min(grown, added),
1840                    "ops that remove / shrink partitions must precede ops that "
1841                    "grow / add partitions")
1842
1843  def test_incremental(self):
1844    source_info = common.LoadDictionaryFromLines("""
1845dynamic_partition_list=system vendor product system_ext
1846super_partition_groups=group_foo
1847super_group_foo_group_size={group_foo_size}
1848super_group_foo_partition_list=system vendor product system_ext
1849""".format(group_foo_size=4 * GiB).split("\n"))
1850    target_info = common.LoadDictionaryFromLines("""
1851dynamic_partition_list=system vendor product odm
1852super_partition_groups=group_foo group_bar
1853super_group_foo_group_size={group_foo_size}
1854super_group_foo_partition_list=system vendor odm
1855super_group_bar_group_size={group_bar_size}
1856super_group_bar_partition_list=product
1857""".format(group_foo_size=3 * GiB, group_bar_size=1 * GiB).split("\n"))
1858
1859    block_diffs = [MockBlockDifference("system", FakeSparseImage(1536 * MiB),
1860                                       src=FakeSparseImage(1024 * MiB)),
1861                   MockBlockDifference("vendor", FakeSparseImage(512 * MiB),
1862                                       src=FakeSparseImage(1024 * MiB)),
1863                   MockBlockDifference("product", FakeSparseImage(1024 * MiB),
1864                                       src=FakeSparseImage(1024 * MiB)),
1865                   MockBlockDifference("system_ext", None,
1866                                       src=FakeSparseImage(1024 * MiB)),
1867                   MockBlockDifference("odm", FakeSparseImage(1024 * MiB),
1868                                       src=None)]
1869
1870    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
1871                                                 source_info_dict=source_info)
1872    with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
1873      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1874
1875    metadata_idx = self.script.lines.index(
1876        'assert(update_dynamic_partitions(package_extract_file('
1877        '"dynamic_partitions_op_list")));')
1878    self.assertLess(self.script.lines.index('patch(vendor);'), metadata_idx)
1879    self.assertLess(metadata_idx, self.script.lines.index('verify(vendor);'))
1880    for p in ("product", "system", "odm"):
1881      patch_idx = self.script.lines.index("patch({});".format(p))
1882      verify_idx = self.script.lines.index("verify({});".format(p))
1883      self.assertLess(metadata_idx, patch_idx,
1884                      "Should patch {} after updating metadata".format(p))
1885      self.assertLess(patch_idx, verify_idx,
1886                      "Should verify {} after patching".format(p))
1887
1888    self.assertNotIn("patch(system_ext);", self.script.lines)
1889
1890    lines = self.get_op_list(self.output_path)
1891
1892    remove = lines.index("remove system_ext")
1893    move_product_out = lines.index("move product default")
1894    shrink = lines.index("resize vendor 536870912")
1895    shrink_group = lines.index("resize_group group_foo 3221225472")
1896    add_group_bar = lines.index("add_group group_bar 1073741824")
1897    add_odm = lines.index("add odm group_foo")
1898    grow_existing = lines.index("resize system 1610612736")
1899    grow_added = lines.index("resize odm 1073741824")
1900    move_product_in = lines.index("move product group_bar")
1901
1902    max_idx_move_partition_out_foo = max(remove, move_product_out, shrink)
1903    min_idx_move_partition_in_foo = min(add_odm, grow_existing, grow_added)
1904
1905    self.assertLess(max_idx_move_partition_out_foo, shrink_group,
1906                    "Must shrink group after partitions inside group are shrunk"
1907                    " / removed")
1908
1909    self.assertLess(add_group_bar, move_product_in,
1910                    "Must add partitions to group after group is added")
1911
1912    self.assertLess(max_idx_move_partition_out_foo,
1913                    min_idx_move_partition_in_foo,
1914                    "Must shrink partitions / remove partitions from group"
1915                    "before adding / moving partitions into group")
1916
1917  def test_remove_partition(self):
1918    source_info = common.LoadDictionaryFromLines("""
1919blockimgdiff_versions=3,4
1920use_dynamic_partitions=true
1921dynamic_partition_list=foo
1922super_partition_groups=group_foo
1923super_group_foo_group_size={group_foo_size}
1924super_group_foo_partition_list=foo
1925""".format(group_foo_size=4 * GiB).split("\n"))
1926    target_info = common.LoadDictionaryFromLines("""
1927blockimgdiff_versions=3,4
1928use_dynamic_partitions=true
1929super_partition_groups=group_foo
1930super_group_foo_group_size={group_foo_size}
1931""".format(group_foo_size=4 * GiB).split("\n"))
1932
1933    common.OPTIONS.info_dict = target_info
1934    common.OPTIONS.target_info_dict = target_info
1935    common.OPTIONS.source_info_dict = source_info
1936    common.OPTIONS.cache_size = 4 * 4096
1937
1938    block_diffs = [common.BlockDifference("foo", EmptyImage(),
1939                                          src=DataImage("source", pad=True))]
1940
1941    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
1942                                                 source_info_dict=source_info)
1943    with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
1944      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1945
1946    self.assertNotIn("block_image_update", str(self.script),
1947                     "Removed partition should not be patched.")
1948
1949    lines = self.get_op_list(self.output_path)
1950    self.assertEqual(lines, ["remove foo"])
1951
1952
1953class PartitionBuildPropsTest(test_utils.ReleaseToolsTestCase):
1954  def setUp(self):
1955    self.odm_build_prop = [
1956        'ro.odm.build.date.utc=1578430045',
1957        'ro.odm.build.fingerprint='
1958        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1959        'ro.product.odm.device=coral',
1960        'import /odm/etc/build_${ro.boot.product.device_name}.prop',
1961    ]
1962
1963  @staticmethod
1964  def _BuildZipFile(entries):
1965    input_file = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1966    with zipfile.ZipFile(input_file, 'w', allowZip64=True) as input_zip:
1967      for name, content in entries.items():
1968        input_zip.writestr(name, content)
1969
1970    return input_file
1971
1972  def test_parseBuildProps_noImportStatement(self):
1973    build_prop = [
1974        'ro.odm.build.date.utc=1578430045',
1975        'ro.odm.build.fingerprint='
1976        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1977        'ro.product.odm.device=coral',
1978    ]
1979    input_file = self._BuildZipFile({
1980        'ODM/etc/build.prop': '\n'.join(build_prop),
1981    })
1982
1983    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
1984      placeholder_values = {
1985          'ro.boot.product.device_name': ['std', 'pro']
1986      }
1987      partition_props = common.PartitionBuildProps.FromInputFile(
1988          input_zip, 'odm', placeholder_values)
1989
1990    self.assertEqual({
1991        'ro.odm.build.date.utc': '1578430045',
1992        'ro.odm.build.fingerprint':
1993        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1994        'ro.product.odm.device': 'coral',
1995    }, partition_props.build_props)
1996
1997    self.assertEqual(set(), partition_props.prop_overrides)
1998
1999  def test_parseBuildProps_singleImportStatement(self):
2000    build_std_prop = [
2001        'ro.product.odm.device=coral',
2002        'ro.product.odm.name=product1',
2003    ]
2004    build_pro_prop = [
2005        'ro.product.odm.device=coralpro',
2006        'ro.product.odm.name=product2',
2007    ]
2008
2009    input_file = self._BuildZipFile({
2010        'ODM/etc/build.prop': '\n'.join(self.odm_build_prop),
2011        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2012        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2013    })
2014
2015    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2016      placeholder_values = {
2017          'ro.boot.product.device_name': 'std'
2018      }
2019      partition_props = common.PartitionBuildProps.FromInputFile(
2020          input_zip, 'odm', placeholder_values)
2021
2022    self.assertEqual({
2023      'ro.odm.build.date.utc': '1578430045',
2024      'ro.odm.build.fingerprint':
2025      'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2026      'ro.product.odm.device': 'coral',
2027      'ro.product.odm.name': 'product1',
2028    }, partition_props.build_props)
2029
2030    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2031      placeholder_values = {
2032          'ro.boot.product.device_name': 'pro'
2033      }
2034      partition_props = common.PartitionBuildProps.FromInputFile(
2035          input_zip, 'odm', placeholder_values)
2036
2037    self.assertEqual({
2038        'ro.odm.build.date.utc': '1578430045',
2039        'ro.odm.build.fingerprint':
2040        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2041        'ro.product.odm.device': 'coralpro',
2042        'ro.product.odm.name': 'product2',
2043    }, partition_props.build_props)
2044
2045  def test_parseBuildProps_noPlaceHolders(self):
2046    build_prop = copy.copy(self.odm_build_prop)
2047    input_file = self._BuildZipFile({
2048        'ODM/etc/build.prop': '\n'.join(build_prop),
2049    })
2050
2051    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2052      partition_props = common.PartitionBuildProps.FromInputFile(
2053          input_zip, 'odm')
2054
2055    self.assertEqual({
2056        'ro.odm.build.date.utc': '1578430045',
2057        'ro.odm.build.fingerprint':
2058        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2059        'ro.product.odm.device': 'coral',
2060    }, partition_props.build_props)
2061
2062    self.assertEqual(set(), partition_props.prop_overrides)
2063
2064  def test_parseBuildProps_multipleImportStatements(self):
2065    build_prop = copy.deepcopy(self.odm_build_prop)
2066    build_prop.append(
2067        'import /odm/etc/build_${ro.boot.product.product_name}.prop')
2068
2069    build_std_prop = [
2070        'ro.product.odm.device=coral',
2071    ]
2072    build_pro_prop = [
2073        'ro.product.odm.device=coralpro',
2074    ]
2075
2076    product1_prop = [
2077        'ro.product.odm.name=product1',
2078        'ro.product.not_care=not_care',
2079    ]
2080
2081    product2_prop = [
2082        'ro.product.odm.name=product2',
2083        'ro.product.not_care=not_care',
2084    ]
2085
2086    input_file = self._BuildZipFile({
2087        'ODM/etc/build.prop': '\n'.join(build_prop),
2088        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2089        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2090        'ODM/etc/build_product1.prop': '\n'.join(product1_prop),
2091        'ODM/etc/build_product2.prop': '\n'.join(product2_prop),
2092    })
2093
2094    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2095      placeholder_values = {
2096          'ro.boot.product.device_name': 'std',
2097          'ro.boot.product.product_name': 'product1',
2098          'ro.boot.product.not_care': 'not_care',
2099      }
2100      partition_props = common.PartitionBuildProps.FromInputFile(
2101          input_zip, 'odm', placeholder_values)
2102
2103    self.assertEqual({
2104        'ro.odm.build.date.utc': '1578430045',
2105        'ro.odm.build.fingerprint':
2106        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2107        'ro.product.odm.device': 'coral',
2108        'ro.product.odm.name': 'product1'
2109    }, partition_props.build_props)
2110
2111    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2112      placeholder_values = {
2113          'ro.boot.product.device_name': 'pro',
2114          'ro.boot.product.product_name': 'product2',
2115          'ro.boot.product.not_care': 'not_care',
2116      }
2117      partition_props = common.PartitionBuildProps.FromInputFile(
2118          input_zip, 'odm', placeholder_values)
2119
2120    self.assertEqual({
2121        'ro.odm.build.date.utc': '1578430045',
2122        'ro.odm.build.fingerprint':
2123        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2124        'ro.product.odm.device': 'coralpro',
2125        'ro.product.odm.name': 'product2'
2126    }, partition_props.build_props)
2127
2128  def test_parseBuildProps_defineAfterOverride(self):
2129    build_prop = copy.deepcopy(self.odm_build_prop)
2130    build_prop.append('ro.product.odm.device=coral')
2131
2132    build_std_prop = [
2133        'ro.product.odm.device=coral',
2134    ]
2135    build_pro_prop = [
2136        'ro.product.odm.device=coralpro',
2137    ]
2138
2139    input_file = self._BuildZipFile({
2140        'ODM/etc/build.prop': '\n'.join(build_prop),
2141        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2142        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2143    })
2144
2145    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2146      placeholder_values = {
2147          'ro.boot.product.device_name': 'std',
2148      }
2149
2150      self.assertRaises(ValueError, common.PartitionBuildProps.FromInputFile,
2151                        input_zip, 'odm', placeholder_values)
2152
2153  def test_parseBuildProps_duplicateOverride(self):
2154    build_prop = copy.deepcopy(self.odm_build_prop)
2155    build_prop.append(
2156        'import /odm/etc/build_${ro.boot.product.product_name}.prop')
2157
2158    build_std_prop = [
2159        'ro.product.odm.device=coral',
2160        'ro.product.odm.name=product1',
2161    ]
2162    build_pro_prop = [
2163        'ro.product.odm.device=coralpro',
2164    ]
2165
2166    product1_prop = [
2167        'ro.product.odm.name=product1',
2168    ]
2169
2170    product2_prop = [
2171        'ro.product.odm.name=product2',
2172    ]
2173
2174    input_file = self._BuildZipFile({
2175        'ODM/etc/build.prop': '\n'.join(build_prop),
2176        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2177        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2178        'ODM/etc/build_product1.prop': '\n'.join(product1_prop),
2179        'ODM/etc/build_product2.prop': '\n'.join(product2_prop),
2180    })
2181
2182    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2183      placeholder_values = {
2184          'ro.boot.product.device_name': 'std',
2185          'ro.boot.product.product_name': 'product1',
2186      }
2187      self.assertRaises(ValueError, common.PartitionBuildProps.FromInputFile,
2188                        input_zip, 'odm', placeholder_values)
2189