• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import copy
18import json
19import os
20import subprocess
21import tempfile
22import time
23import unittest
24import zipfile
25from hashlib import sha1
26
27import common
28import test_utils
29import validate_target_files
30from images import EmptyImage, DataImage
31from rangelib import RangeSet
32
33
34KiB = 1024
35MiB = 1024 * KiB
36GiB = 1024 * MiB
37
38
39def get_2gb_string():
40  size = int(2 * GiB + 1)
41  block_size = 4 * KiB
42  step_size = 4 * MiB
43  # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
44  for _ in range(0, size, step_size):
45    yield os.urandom(block_size)
46    yield b'\0' * (step_size - block_size)
47
48
49class BuildInfoTest(test_utils.ReleaseToolsTestCase):
50
51  TEST_INFO_FINGERPRINT_DICT = {
52      'build.prop': common.PartitionBuildProps.FromDictionary(
53          'system', {
54              'ro.product.brand': 'product-brand',
55              'ro.product.name': 'product-name',
56              'ro.product.device': 'product-device',
57              'ro.build.version.release': 'version-release',
58              'ro.build.id': 'build-id',
59              'ro.build.version.incremental': 'version-incremental',
60              'ro.build.type': 'build-type',
61              'ro.build.tags': 'build-tags',
62              'ro.build.version.sdk': 30,
63          }
64      ),
65  }
66
67  TEST_INFO_DICT = {
68      'build.prop': common.PartitionBuildProps.FromDictionary(
69          'system', {
70              'ro.product.device': 'product-device',
71              'ro.product.name': 'product-name',
72              'ro.build.fingerprint': 'build-fingerprint',
73              'ro.build.foo': 'build-foo'}
74      ),
75      'system.build.prop': common.PartitionBuildProps.FromDictionary(
76          'system', {
77              'ro.product.system.brand': 'product-brand',
78              'ro.product.system.name': 'product-name',
79              'ro.product.system.device': 'product-device',
80              'ro.system.build.version.release': 'version-release',
81              'ro.system.build.id': 'build-id',
82              'ro.system.build.version.incremental': 'version-incremental',
83              'ro.system.build.type': 'build-type',
84              'ro.system.build.tags': 'build-tags',
85              'ro.system.build.foo': 'build-foo'}
86      ),
87      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
88          'vendor', {
89              'ro.product.vendor.brand': 'vendor-product-brand',
90              'ro.product.vendor.name': 'vendor-product-name',
91              'ro.product.vendor.device': 'vendor-product-device',
92              'ro.vendor.build.version.release': 'vendor-version-release',
93              'ro.vendor.build.id': 'vendor-build-id',
94              'ro.vendor.build.version.incremental':
95              'vendor-version-incremental',
96              'ro.vendor.build.type': 'vendor-build-type',
97              'ro.vendor.build.tags': 'vendor-build-tags'}
98      ),
99      'property1': 'value1',
100      'property2': 4096,
101  }
102
103  TEST_INFO_DICT_USES_OEM_PROPS = {
104      'build.prop': common.PartitionBuildProps.FromDictionary(
105          'system', {
106              'ro.product.name': 'product-name',
107              'ro.build.thumbprint': 'build-thumbprint',
108              'ro.build.bar': 'build-bar'}
109      ),
110      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
111          'vendor', {
112              'ro.vendor.build.fingerprint': 'vendor-build-fingerprint'}
113      ),
114      'property1': 'value1',
115      'property2': 4096,
116      'oem_fingerprint_properties': 'ro.product.device ro.product.brand',
117  }
118
119  TEST_OEM_DICTS = [
120      {
121          'ro.product.brand': 'brand1',
122          'ro.product.device': 'device1',
123      },
124      {
125          'ro.product.brand': 'brand2',
126          'ro.product.device': 'device2',
127      },
128      {
129          'ro.product.brand': 'brand3',
130          'ro.product.device': 'device3',
131      },
132  ]
133
134  TEST_INFO_DICT_PROPERTY_SOURCE_ORDER = {
135      'build.prop': common.PartitionBuildProps.FromDictionary(
136          'system', {
137              'ro.build.fingerprint': 'build-fingerprint',
138              'ro.product.property_source_order':
139                  'product,odm,vendor,system_ext,system'}
140      ),
141      'system.build.prop': common.PartitionBuildProps.FromDictionary(
142          'system', {
143              'ro.product.system.device': 'system-product-device'}
144      ),
145      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
146          'vendor', {
147              'ro.product.vendor.device': 'vendor-product-device'}
148      ),
149  }
150
151  TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_10 = {
152      'build.prop': common.PartitionBuildProps.FromDictionary(
153          'system', {
154              'ro.build.fingerprint': 'build-fingerprint',
155              'ro.product.property_source_order':
156                  'product,product_services,odm,vendor,system',
157              'ro.build.version.release': '10',
158              'ro.build.version.codename': 'REL'}
159      ),
160      'system.build.prop': common.PartitionBuildProps.FromDictionary(
161          'system', {
162              'ro.product.system.device': 'system-product-device'}
163      ),
164      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
165          'vendor', {
166              'ro.product.vendor.device': 'vendor-product-device'}
167      ),
168  }
169
170  TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_9 = {
171      'build.prop': common.PartitionBuildProps.FromDictionary(
172          'system', {
173              'ro.product.device': 'product-device',
174              'ro.build.fingerprint': 'build-fingerprint',
175              'ro.build.version.release': '9',
176              'ro.build.version.codename': 'REL'}
177      ),
178      'system.build.prop': common.PartitionBuildProps.FromDictionary(
179          'system', {
180              'ro.product.system.device': 'system-product-device'}
181      ),
182      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
183          'vendor', {
184              'ro.product.vendor.device': 'vendor-product-device'}
185      ),
186  }
187
188  def test_init(self):
189    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
190    self.assertEqual('product-device', target_info.device)
191    self.assertEqual('build-fingerprint', target_info.fingerprint)
192    self.assertFalse(target_info.is_ab)
193    self.assertIsNone(target_info.oem_props)
194
195  def test_init_with_oem_props(self):
196    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
197                                   self.TEST_OEM_DICTS)
198    self.assertEqual('device1', target_info.device)
199    self.assertEqual('brand1/product-name/device1:build-thumbprint',
200                     target_info.fingerprint)
201
202    # Swap the order in oem_dicts, which would lead to different BuildInfo.
203    oem_dicts = copy.copy(self.TEST_OEM_DICTS)
204    oem_dicts[0], oem_dicts[2] = oem_dicts[2], oem_dicts[0]
205    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
206                                   oem_dicts)
207    self.assertEqual('device3', target_info.device)
208    self.assertEqual('brand3/product-name/device3:build-thumbprint',
209                     target_info.fingerprint)
210
211  def test_init_badFingerprint(self):
212    info_dict = copy.deepcopy(self.TEST_INFO_DICT)
213    info_dict['build.prop'].build_props[
214        'ro.build.fingerprint'] = 'bad fingerprint'
215    self.assertRaises(ValueError, common.BuildInfo, info_dict, None)
216
217    info_dict['build.prop'].build_props[
218        'ro.build.fingerprint'] = 'bad\x80fingerprint'
219    self.assertRaises(ValueError, common.BuildInfo, info_dict, None)
220
221  def test_init_goodFingerprint(self):
222    info_dict = copy.deepcopy(self.TEST_INFO_FINGERPRINT_DICT)
223    build_info = common.BuildInfo(info_dict)
224    self.assertEqual(
225      'product-brand/product-name/product-device:version-release/build-id/'
226      'version-incremental:build-type/build-tags', build_info.fingerprint)
227
228    build_props = info_dict['build.prop'].build_props
229    del build_props['ro.build.id']
230    build_props['ro.build.legacy.id'] = 'legacy-build-id'
231    build_info = common.BuildInfo(info_dict, use_legacy_id=True)
232    self.assertEqual(
233      'product-brand/product-name/product-device:version-release/'
234      'legacy-build-id/version-incremental:build-type/build-tags',
235      build_info.fingerprint)
236
237    self.assertRaises(common.ExternalError, common.BuildInfo, info_dict, None,
238                      False)
239
240    info_dict['avb_enable'] = 'true'
241    info_dict['vbmeta_digest'] = 'abcde12345'
242    build_info = common.BuildInfo(info_dict, use_legacy_id=False)
243    self.assertEqual(
244      'product-brand/product-name/product-device:version-release/'
245      'legacy-build-id.abcde123/version-incremental:build-type/build-tags',
246      build_info.fingerprint)
247
248  def test___getitem__(self):
249    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
250    self.assertEqual('value1', target_info['property1'])
251    self.assertEqual(4096, target_info['property2'])
252    self.assertEqual('build-foo',
253                     target_info['build.prop'].GetProp('ro.build.foo'))
254
255  def test___getitem__with_oem_props(self):
256    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
257                                   self.TEST_OEM_DICTS)
258    self.assertEqual('value1', target_info['property1'])
259    self.assertEqual(4096, target_info['property2'])
260    self.assertIsNone(target_info['build.prop'].GetProp('ro.build.foo'))
261
262  def test___setitem__(self):
263    target_info = common.BuildInfo(copy.deepcopy(self.TEST_INFO_DICT), None)
264    self.assertEqual('value1', target_info['property1'])
265    target_info['property1'] = 'value2'
266    self.assertEqual('value2', target_info['property1'])
267
268    self.assertEqual('build-foo',
269                     target_info['build.prop'].GetProp('ro.build.foo'))
270    target_info['build.prop'].build_props['ro.build.foo'] = 'build-bar'
271    self.assertEqual('build-bar',
272                     target_info['build.prop'].GetProp('ro.build.foo'))
273
274  def test_get(self):
275    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
276    self.assertEqual('value1', target_info.get('property1'))
277    self.assertEqual(4096, target_info.get('property2'))
278    self.assertEqual(4096, target_info.get('property2', 1024))
279    self.assertEqual(1024, target_info.get('property-nonexistent', 1024))
280    self.assertEqual('build-foo',
281                     target_info.get('build.prop').GetProp('ro.build.foo'))
282
283  def test_get_with_oem_props(self):
284    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
285                                   self.TEST_OEM_DICTS)
286    self.assertEqual('value1', target_info.get('property1'))
287    self.assertEqual(4096, target_info.get('property2'))
288    self.assertEqual(4096, target_info.get('property2', 1024))
289    self.assertEqual(1024, target_info.get('property-nonexistent', 1024))
290    self.assertIsNone(target_info.get('build.prop').GetProp('ro.build.foo'))
291
292  def test_items(self):
293    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
294    items = target_info.items()
295    self.assertIn(('property1', 'value1'), items)
296    self.assertIn(('property2', 4096), items)
297
298  def test_GetBuildProp(self):
299    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
300    self.assertEqual('build-foo', target_info.GetBuildProp('ro.build.foo'))
301    self.assertRaises(common.ExternalError, target_info.GetBuildProp,
302                      'ro.build.nonexistent')
303
304  def test_GetBuildProp_with_oem_props(self):
305    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
306                                   self.TEST_OEM_DICTS)
307    self.assertEqual('build-bar', target_info.GetBuildProp('ro.build.bar'))
308    self.assertRaises(common.ExternalError, target_info.GetBuildProp,
309                      'ro.build.nonexistent')
310
311  def test_GetPartitionFingerprint(self):
312    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
313    self.assertEqual(
314        target_info.GetPartitionFingerprint('vendor'),
315        'vendor-product-brand/vendor-product-name/vendor-product-device'
316        ':vendor-version-release/vendor-build-id/vendor-version-incremental'
317        ':vendor-build-type/vendor-build-tags')
318
319  def test_GetPartitionFingerprint_system_other_uses_system(self):
320    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
321    self.assertEqual(
322        target_info.GetPartitionFingerprint('system_other'),
323        target_info.GetPartitionFingerprint('system'))
324
325  def test_GetPartitionFingerprint_uses_fingerprint_prop_if_available(self):
326    info_dict = copy.deepcopy(self.TEST_INFO_DICT)
327    info_dict['vendor.build.prop'].build_props[
328        'ro.vendor.build.fingerprint'] = 'vendor:fingerprint'
329    target_info = common.BuildInfo(info_dict, None)
330    self.assertEqual(
331        target_info.GetPartitionFingerprint('vendor'),
332        'vendor:fingerprint')
333
334  def test_WriteMountOemScript(self):
335    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
336                                   self.TEST_OEM_DICTS)
337    script_writer = test_utils.MockScriptWriter()
338    target_info.WriteMountOemScript(script_writer)
339    self.assertEqual([('Mount', '/oem', None)], script_writer.lines)
340
341  def test_WriteDeviceAssertions(self):
342    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
343    script_writer = test_utils.MockScriptWriter()
344    target_info.WriteDeviceAssertions(script_writer, False)
345    self.assertEqual([('AssertDevice', 'product-device')], script_writer.lines)
346
347  def test_WriteDeviceAssertions_with_oem_props(self):
348    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
349                                   self.TEST_OEM_DICTS)
350    script_writer = test_utils.MockScriptWriter()
351    target_info.WriteDeviceAssertions(script_writer, False)
352    self.assertEqual(
353        [
354            ('AssertOemProperty', 'ro.product.device',
355             ['device1', 'device2', 'device3'], False),
356            ('AssertOemProperty', 'ro.product.brand',
357             ['brand1', 'brand2', 'brand3'], False),
358        ],
359        script_writer.lines)
360
361  def test_ResolveRoProductProperty_FromVendor(self):
362    info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER)
363    info = common.BuildInfo(info_dict, None)
364    self.assertEqual('vendor-product-device',
365                     info.GetBuildProp('ro.product.device'))
366
367  def test_ResolveRoProductProperty_FromSystem(self):
368    info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER)
369    del info_dict['vendor.build.prop'].build_props['ro.product.vendor.device']
370    info = common.BuildInfo(info_dict, None)
371    self.assertEqual('system-product-device',
372                     info.GetBuildProp('ro.product.device'))
373
374  def test_ResolveRoProductProperty_InvalidPropertySearchOrder(self):
375    info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER)
376    info_dict['build.prop'].build_props[
377        'ro.product.property_source_order'] = 'bad-source'
378    with self.assertRaisesRegexp(common.ExternalError,
379        'Invalid ro.product.property_source_order'):
380      info = common.BuildInfo(info_dict, None)
381      info.GetBuildProp('ro.product.device')
382
383  def test_ResolveRoProductProperty_Android10PropertySearchOrder(self):
384    info_dict = copy.deepcopy(
385        self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_10)
386    info = common.BuildInfo(info_dict, None)
387    self.assertEqual('vendor-product-device',
388                     info.GetBuildProp('ro.product.device'))
389
390  def test_ResolveRoProductProperty_Android9PropertySearchOrder(self):
391    info_dict = copy.deepcopy(
392        self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_9)
393    info = common.BuildInfo(info_dict, None)
394    self.assertEqual('product-device',
395                     info.GetBuildProp('ro.product.device'))
396
397
398class CommonZipTest(test_utils.ReleaseToolsTestCase):
399
400  def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
401              test_file_name=None, expected_stat=None, expected_mode=0o644,
402              expected_compress_type=zipfile.ZIP_STORED):
403    # Verify the stat if present.
404    if test_file_name is not None:
405      new_stat = os.stat(test_file_name)
406      self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
407      self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
408
409    # Reopen the zip file to verify.
410    zip_file = zipfile.ZipFile(zip_file_name, "r", allowZip64=True)
411
412    # Verify the timestamp.
413    info = zip_file.getinfo(arcname)
414    self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
415
416    # Verify the file mode.
417    mode = (info.external_attr >> 16) & 0o777
418    self.assertEqual(mode, expected_mode)
419
420    # Verify the compress type.
421    self.assertEqual(info.compress_type, expected_compress_type)
422
423    # Verify the zip contents.
424    entry = zip_file.open(arcname)
425    sha1_hash = sha1()
426    for chunk in iter(lambda: entry.read(4 * MiB), b''):
427      sha1_hash.update(chunk)
428    self.assertEqual(expected_hash, sha1_hash.hexdigest())
429    self.assertIsNone(zip_file.testzip())
430
431  def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
432    extra_zipwrite_args = dict(extra_zipwrite_args or {})
433
434    test_file = tempfile.NamedTemporaryFile(delete=False)
435    test_file_name = test_file.name
436
437    zip_file = tempfile.NamedTemporaryFile(delete=False)
438    zip_file_name = zip_file.name
439
440    # File names within an archive strip the leading slash.
441    arcname = extra_zipwrite_args.get("arcname", test_file_name)
442    if arcname[0] == "/":
443      arcname = arcname[1:]
444
445    zip_file.close()
446    zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True)
447
448    try:
449      sha1_hash = sha1()
450      for data in contents:
451        sha1_hash.update(bytes(data))
452        test_file.write(bytes(data))
453      test_file.close()
454
455      expected_stat = os.stat(test_file_name)
456      expected_mode = extra_zipwrite_args.get("perms", 0o644)
457      expected_compress_type = extra_zipwrite_args.get("compress_type",
458                                                       zipfile.ZIP_STORED)
459      time.sleep(5)  # Make sure the atime/mtime will change measurably.
460
461      common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
462      common.ZipClose(zip_file)
463
464      self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
465                   test_file_name, expected_stat, expected_mode,
466                   expected_compress_type)
467    finally:
468      os.remove(test_file_name)
469      os.remove(zip_file_name)
470
471  def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
472    extra_args = dict(extra_args or {})
473
474    zip_file = tempfile.NamedTemporaryFile(delete=False)
475    zip_file_name = zip_file.name
476    zip_file.close()
477
478    zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True)
479
480    try:
481      expected_compress_type = extra_args.get("compress_type",
482                                              zipfile.ZIP_STORED)
483      time.sleep(5)  # Make sure the atime/mtime will change measurably.
484
485      if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
486        arcname = zinfo_or_arcname
487        expected_mode = extra_args.get("perms", 0o644)
488      else:
489        arcname = zinfo_or_arcname.filename
490        if zinfo_or_arcname.external_attr:
491          zinfo_perms = zinfo_or_arcname.external_attr >> 16
492        else:
493          zinfo_perms = 0o600
494        expected_mode = extra_args.get("perms", zinfo_perms)
495
496      common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
497      common.ZipClose(zip_file)
498
499      self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
500                   expected_mode=expected_mode,
501                   expected_compress_type=expected_compress_type)
502    finally:
503      os.remove(zip_file_name)
504
505  def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
506    extra_args = dict(extra_args or {})
507
508    zip_file = tempfile.NamedTemporaryFile(delete=False)
509    zip_file_name = zip_file.name
510
511    test_file = tempfile.NamedTemporaryFile(delete=False)
512    test_file_name = test_file.name
513
514    arcname_large = test_file_name
515    arcname_small = "bar"
516
517    # File names within an archive strip the leading slash.
518    if arcname_large[0] == "/":
519      arcname_large = arcname_large[1:]
520
521    zip_file.close()
522    zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True)
523
524    try:
525      sha1_hash = sha1()
526      for data in large:
527        sha1_hash.update(data)
528        test_file.write(data)
529      test_file.close()
530
531      expected_stat = os.stat(test_file_name)
532      expected_mode = 0o644
533      expected_compress_type = extra_args.get("compress_type",
534                                              zipfile.ZIP_STORED)
535      time.sleep(5)  # Make sure the atime/mtime will change measurably.
536
537      common.ZipWrite(zip_file, test_file_name, **extra_args)
538      common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
539      common.ZipClose(zip_file)
540
541      # Verify the contents written by ZipWrite().
542      self._verify(zip_file, zip_file_name, arcname_large,
543                   sha1_hash.hexdigest(), test_file_name, expected_stat,
544                   expected_mode, expected_compress_type)
545
546      # Verify the contents written by ZipWriteStr().
547      self._verify(zip_file, zip_file_name, arcname_small,
548                   sha1(small).hexdigest(),
549                   expected_compress_type=expected_compress_type)
550    finally:
551      os.remove(zip_file_name)
552      os.remove(test_file_name)
553
554  def _test_reset_ZIP64_LIMIT(self, func, *args):
555    default_limit = (1 << 31) - 1
556    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
557    func(*args)
558    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
559
560  def test_ZipWrite(self):
561    file_contents = os.urandom(1024)
562    self._test_ZipWrite(file_contents)
563
564  def test_ZipWrite_with_opts(self):
565    file_contents = os.urandom(1024)
566    self._test_ZipWrite(file_contents, {
567        "arcname": "foobar",
568        "perms": 0o777,
569        "compress_type": zipfile.ZIP_DEFLATED,
570    })
571    self._test_ZipWrite(file_contents, {
572        "arcname": "foobar",
573        "perms": 0o700,
574        "compress_type": zipfile.ZIP_STORED,
575    })
576
577  def test_ZipWrite_large_file(self):
578    file_contents = get_2gb_string()
579    self._test_ZipWrite(file_contents, {
580        "compress_type": zipfile.ZIP_DEFLATED,
581    })
582
583  def test_ZipWrite_resets_ZIP64_LIMIT(self):
584    self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
585
586  def test_ZipWriteStr(self):
587    random_string = os.urandom(1024)
588    # Passing arcname
589    self._test_ZipWriteStr("foo", random_string)
590
591    # Passing zinfo
592    zinfo = zipfile.ZipInfo(filename="foo")
593    self._test_ZipWriteStr(zinfo, random_string)
594
595    # Timestamp in the zinfo should be overwritten.
596    zinfo.date_time = (2015, 3, 1, 15, 30, 0)
597    self._test_ZipWriteStr(zinfo, random_string)
598
599  def test_ZipWriteStr_with_opts(self):
600    random_string = os.urandom(1024)
601    # Passing arcname
602    self._test_ZipWriteStr("foo", random_string, {
603        "perms": 0o700,
604        "compress_type": zipfile.ZIP_DEFLATED,
605    })
606    self._test_ZipWriteStr("bar", random_string, {
607        "compress_type": zipfile.ZIP_STORED,
608    })
609
610    # Passing zinfo
611    zinfo = zipfile.ZipInfo(filename="foo")
612    self._test_ZipWriteStr(zinfo, random_string, {
613        "compress_type": zipfile.ZIP_DEFLATED,
614    })
615    self._test_ZipWriteStr(zinfo, random_string, {
616        "perms": 0o600,
617        "compress_type": zipfile.ZIP_STORED,
618    })
619    self._test_ZipWriteStr(zinfo, random_string, {
620        "perms": 0o000,
621        "compress_type": zipfile.ZIP_STORED,
622    })
623
624  def test_ZipWriteStr_large_file(self):
625    # zipfile.writestr() doesn't work when the str size is over 2GiB even with
626    # the workaround. We will only test the case of writing a string into a
627    # large archive.
628    long_string = get_2gb_string()
629    short_string = os.urandom(1024)
630    self._test_ZipWriteStr_large_file(long_string, short_string, {
631        "compress_type": zipfile.ZIP_DEFLATED,
632    })
633
634  def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
635    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, 'foo', b'')
636    zinfo = zipfile.ZipInfo(filename="foo")
637    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, b'')
638
639  def test_bug21309935(self):
640    zip_file = tempfile.NamedTemporaryFile(delete=False)
641    zip_file_name = zip_file.name
642    zip_file.close()
643
644    try:
645      random_string = os.urandom(1024)
646      zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True)
647      # Default perms should be 0o644 when passing the filename.
648      common.ZipWriteStr(zip_file, "foo", random_string)
649      # Honor the specified perms.
650      common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
651      # The perms in zinfo should be untouched.
652      zinfo = zipfile.ZipInfo(filename="baz")
653      zinfo.external_attr = 0o740 << 16
654      common.ZipWriteStr(zip_file, zinfo, random_string)
655      # Explicitly specified perms has the priority.
656      zinfo = zipfile.ZipInfo(filename="qux")
657      zinfo.external_attr = 0o700 << 16
658      common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
659      common.ZipClose(zip_file)
660
661      self._verify(zip_file, zip_file_name, "foo",
662                   sha1(random_string).hexdigest(),
663                   expected_mode=0o644)
664      self._verify(zip_file, zip_file_name, "bar",
665                   sha1(random_string).hexdigest(),
666                   expected_mode=0o755)
667      self._verify(zip_file, zip_file_name, "baz",
668                   sha1(random_string).hexdigest(),
669                   expected_mode=0o740)
670      self._verify(zip_file, zip_file_name, "qux",
671                   sha1(random_string).hexdigest(),
672                   expected_mode=0o400)
673    finally:
674      os.remove(zip_file_name)
675
676  @test_utils.SkipIfExternalToolsUnavailable()
677  def test_ZipDelete(self):
678    zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
679    output_zip = zipfile.ZipFile(zip_file.name, 'w',
680                                 compression=zipfile.ZIP_DEFLATED)
681    with tempfile.NamedTemporaryFile() as entry_file:
682      entry_file.write(os.urandom(1024))
683      common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
684      common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
685      common.ZipWrite(output_zip, entry_file.name, arcname='Test3')
686      common.ZipClose(output_zip)
687    zip_file.close()
688
689    try:
690      common.ZipDelete(zip_file.name, 'Test2')
691      with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip:
692        entries = check_zip.namelist()
693        self.assertTrue('Test1' in entries)
694        self.assertFalse('Test2' in entries)
695        self.assertTrue('Test3' in entries)
696
697      self.assertRaises(
698          common.ExternalError, common.ZipDelete, zip_file.name, 'Test2')
699      with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip:
700        entries = check_zip.namelist()
701        self.assertTrue('Test1' in entries)
702        self.assertFalse('Test2' in entries)
703        self.assertTrue('Test3' in entries)
704
705      common.ZipDelete(zip_file.name, ['Test3'])
706      with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip:
707        entries = check_zip.namelist()
708        self.assertTrue('Test1' in entries)
709        self.assertFalse('Test2' in entries)
710        self.assertFalse('Test3' in entries)
711
712      common.ZipDelete(zip_file.name, ['Test1', 'Test2'])
713      with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip:
714        entries = check_zip.namelist()
715        self.assertFalse('Test1' in entries)
716        self.assertFalse('Test2' in entries)
717        self.assertFalse('Test3' in entries)
718    finally:
719      os.remove(zip_file.name)
720
721  @staticmethod
722  def _test_UnzipTemp_createZipFile():
723    zip_file = common.MakeTempFile(suffix='.zip')
724    output_zip = zipfile.ZipFile(
725        zip_file, 'w', compression=zipfile.ZIP_DEFLATED)
726    contents = os.urandom(1024)
727    with tempfile.NamedTemporaryFile() as entry_file:
728      entry_file.write(contents)
729      common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
730      common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
731      common.ZipWrite(output_zip, entry_file.name, arcname='Foo3')
732      common.ZipWrite(output_zip, entry_file.name, arcname='Bar4')
733      common.ZipWrite(output_zip, entry_file.name, arcname='Dir5/Baz5')
734      common.ZipClose(output_zip)
735    common.ZipClose(output_zip)
736    return zip_file
737
738  @test_utils.SkipIfExternalToolsUnavailable()
739  def test_UnzipTemp(self):
740    zip_file = self._test_UnzipTemp_createZipFile()
741    unzipped_dir = common.UnzipTemp(zip_file)
742    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
743    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
744    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
745    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
746    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
747
748  @test_utils.SkipIfExternalToolsUnavailable()
749  def test_UnzipTemp_withPatterns(self):
750    zip_file = self._test_UnzipTemp_createZipFile()
751
752    unzipped_dir = common.UnzipTemp(zip_file, ['Test1'])
753    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
754    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
755    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
756    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
757    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
758
759    unzipped_dir = common.UnzipTemp(zip_file, ['Test1', 'Foo3'])
760    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
761    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
762    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
763    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
764    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
765
766    unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Foo3*'])
767    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
768    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
769    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
770    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
771    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
772
773    unzipped_dir = common.UnzipTemp(zip_file, ['*Test1', '*Baz*'])
774    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
775    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
776    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
777    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
778    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
779
780  def test_UnzipTemp_withEmptyPatterns(self):
781    zip_file = self._test_UnzipTemp_createZipFile()
782    unzipped_dir = common.UnzipTemp(zip_file, [])
783    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
784    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
785    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
786    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
787    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
788
789  @test_utils.SkipIfExternalToolsUnavailable()
790  def test_UnzipTemp_withPartiallyMatchingPatterns(self):
791    zip_file = self._test_UnzipTemp_createZipFile()
792    unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Nonexistent*'])
793    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
794    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
795    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
796    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
797    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
798
799  def test_UnzipTemp_withNoMatchingPatterns(self):
800    zip_file = self._test_UnzipTemp_createZipFile()
801    unzipped_dir = common.UnzipTemp(zip_file, ['Foo4', 'Nonexistent*'])
802    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
803    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
804    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
805    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
806    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
807
808
809class CommonApkUtilsTest(test_utils.ReleaseToolsTestCase):
810  """Tests the APK utils related functions."""
811
812  APKCERTS_TXT1 = (
813      'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
814      ' private_key="certs/devkey.pk8"\n'
815      'name="Settings.apk"'
816      ' certificate="build/make/target/product/security/platform.x509.pem"'
817      ' private_key="build/make/target/product/security/platform.pk8"\n'
818      'name="TV.apk" certificate="PRESIGNED" private_key=""\n'
819  )
820
821  APKCERTS_CERTMAP1 = {
822      'RecoveryLocalizer.apk' : 'certs/devkey',
823      'Settings.apk' : 'build/make/target/product/security/platform',
824      'TV.apk' : 'PRESIGNED',
825  }
826
827  APKCERTS_TXT2 = (
828      'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"'
829      ' private_key="certs/compressed1.pk8" compressed="gz"\n'
830      'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"'
831      ' private_key="certs/compressed2.pk8" compressed="gz"\n'
832      'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"'
833      ' private_key="certs/compressed2.pk8" compressed="gz"\n'
834      'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"'
835      ' private_key="certs/compressed3.pk8" compressed="gz"\n'
836  )
837
838  APKCERTS_CERTMAP2 = {
839      'Compressed1.apk' : 'certs/compressed1',
840      'Compressed2a.apk' : 'certs/compressed2',
841      'Compressed2b.apk' : 'certs/compressed2',
842      'Compressed3.apk' : 'certs/compressed3',
843  }
844
845  APKCERTS_TXT3 = (
846      'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"'
847      ' private_key="certs/compressed4.pk8" compressed="xz"\n'
848  )
849
850  APKCERTS_CERTMAP3 = {
851      'Compressed4.apk' : 'certs/compressed4',
852  }
853
854  # Test parsing with no optional fields, both optional fields, and only the
855  # partition optional field.
856  APKCERTS_TXT4 = (
857      'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
858      ' private_key="certs/devkey.pk8"\n'
859      'name="Settings.apk"'
860      ' certificate="build/make/target/product/security/platform.x509.pem"'
861      ' private_key="build/make/target/product/security/platform.pk8"'
862      ' compressed="gz" partition="system"\n'
863      'name="TV.apk" certificate="PRESIGNED" private_key=""'
864      ' partition="product"\n'
865  )
866
867  APKCERTS_CERTMAP4 = {
868      'RecoveryLocalizer.apk' : 'certs/devkey',
869      'Settings.apk' : 'build/make/target/product/security/platform',
870      'TV.apk' : 'PRESIGNED',
871  }
872
873  def setUp(self):
874    self.testdata_dir = test_utils.get_testdata_dir()
875
876  @staticmethod
877  def _write_apkcerts_txt(apkcerts_txt, additional=None):
878    if additional is None:
879      additional = []
880    target_files = common.MakeTempFile(suffix='.zip')
881    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
882      target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt)
883      for entry in additional:
884        target_files_zip.writestr(entry, '')
885    return target_files
886
887  def test_ReadApkCerts_NoncompressedApks(self):
888    target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1)
889    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
890      certmap, ext = common.ReadApkCerts(input_zip)
891
892    self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap)
893    self.assertIsNone(ext)
894
895  def test_ReadApkCerts_CompressedApks(self):
896    # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is
897    # not stored in '.gz' format, so it shouldn't be considered as installed.
898    target_files = self._write_apkcerts_txt(
899        self.APKCERTS_TXT2,
900        ['Compressed1.apk.gz', 'Compressed3.apk'])
901
902    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
903      certmap, ext = common.ReadApkCerts(input_zip)
904
905    self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap)
906    self.assertEqual('.gz', ext)
907
908    # Alternative case with '.xz'.
909    target_files = self._write_apkcerts_txt(
910        self.APKCERTS_TXT3, ['Compressed4.apk.xz'])
911
912    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
913      certmap, ext = common.ReadApkCerts(input_zip)
914
915    self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap)
916    self.assertEqual('.xz', ext)
917
918  def test_ReadApkCerts_CompressedAndNoncompressedApks(self):
919    target_files = self._write_apkcerts_txt(
920        self.APKCERTS_TXT1 + self.APKCERTS_TXT2,
921        ['Compressed1.apk.gz', 'Compressed3.apk'])
922
923    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
924      certmap, ext = common.ReadApkCerts(input_zip)
925
926    certmap_merged = self.APKCERTS_CERTMAP1.copy()
927    certmap_merged.update(self.APKCERTS_CERTMAP2)
928    self.assertDictEqual(certmap_merged, certmap)
929    self.assertEqual('.gz', ext)
930
931  def test_ReadApkCerts_MultipleCompressionMethods(self):
932    target_files = self._write_apkcerts_txt(
933        self.APKCERTS_TXT2 + self.APKCERTS_TXT3,
934        ['Compressed1.apk.gz', 'Compressed4.apk.xz'])
935
936    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
937      self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
938
939  def test_ReadApkCerts_MismatchingKeys(self):
940    malformed_apkcerts_txt = (
941        'name="App1.apk" certificate="certs/cert1.x509.pem"'
942        ' private_key="certs/cert2.pk8"\n'
943    )
944    target_files = self._write_apkcerts_txt(malformed_apkcerts_txt)
945
946    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
947      self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
948
949  def test_ReadApkCerts_WithWithoutOptionalFields(self):
950    target_files = self._write_apkcerts_txt(self.APKCERTS_TXT4)
951    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
952      certmap, ext = common.ReadApkCerts(input_zip)
953
954    self.assertDictEqual(self.APKCERTS_CERTMAP4, certmap)
955    self.assertIsNone(ext)
956
957  def test_ExtractPublicKey(self):
958    cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
959    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
960    with open(pubkey) as pubkey_fp:
961      self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert))
962
963  def test_ExtractPublicKey_invalidInput(self):
964    wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8')
965    self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input)
966
967  @test_utils.SkipIfExternalToolsUnavailable()
968  def test_ExtractAvbPublicKey(self):
969    privkey = os.path.join(self.testdata_dir, 'testkey.key')
970    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
971    extracted_from_privkey = common.ExtractAvbPublicKey('avbtool', privkey)
972    extracted_from_pubkey = common.ExtractAvbPublicKey('avbtool', pubkey)
973    with open(extracted_from_privkey, 'rb') as privkey_fp, \
974        open(extracted_from_pubkey, 'rb') as pubkey_fp:
975      self.assertEqual(privkey_fp.read(), pubkey_fp.read())
976
977  def test_ParseCertificate(self):
978    cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
979
980    cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER']
981    proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
982                      universal_newlines=False)
983    expected, _ = proc.communicate()
984    self.assertEqual(0, proc.returncode)
985
986    with open(cert) as cert_fp:
987      actual = common.ParseCertificate(cert_fp.read())
988    self.assertEqual(expected, actual)
989
990  @test_utils.SkipIfExternalToolsUnavailable()
991  def test_GetMinSdkVersion(self):
992    test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
993    self.assertEqual('24', common.GetMinSdkVersion(test_app))
994
995  @test_utils.SkipIfExternalToolsUnavailable()
996  def test_GetMinSdkVersion_invalidInput(self):
997    self.assertRaises(
998        common.ExternalError, common.GetMinSdkVersion, 'does-not-exist.apk')
999
1000  @test_utils.SkipIfExternalToolsUnavailable()
1001  def test_GetMinSdkVersionInt(self):
1002    test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
1003    self.assertEqual(24, common.GetMinSdkVersionInt(test_app, {}))
1004
1005  @test_utils.SkipIfExternalToolsUnavailable()
1006  def test_GetMinSdkVersionInt_invalidInput(self):
1007    self.assertRaises(
1008        common.ExternalError, common.GetMinSdkVersionInt, 'does-not-exist.apk',
1009        {})
1010
1011
1012class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
1013
1014  def setUp(self):
1015    self.testdata_dir = test_utils.get_testdata_dir()
1016
1017  @test_utils.SkipIfExternalToolsUnavailable()
1018  def test_GetSparseImage_emptyBlockMapFile(self):
1019    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1020    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1021      target_files_zip.write(
1022          test_utils.construct_sparse_image([
1023              (0xCAC1, 6),
1024              (0xCAC3, 3),
1025              (0xCAC1, 4)]),
1026          arcname='IMAGES/system.img')
1027      target_files_zip.writestr('IMAGES/system.map', '')
1028      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
1029      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1030
1031    tempdir = common.UnzipTemp(target_files)
1032    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1033      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1034
1035    self.assertDictEqual(
1036        {
1037            '__COPY': RangeSet("0"),
1038            '__NONZERO-0': RangeSet("1-5 9-12"),
1039        },
1040        sparse_image.file_map)
1041
1042  def test_PartitionMapFromTargetFiles(self):
1043    target_files_dir = common.MakeTempDir()
1044    os.makedirs(os.path.join(target_files_dir, 'SYSTEM'))
1045    os.makedirs(os.path.join(target_files_dir, 'SYSTEM', 'vendor'))
1046    os.makedirs(os.path.join(target_files_dir, 'PRODUCT'))
1047    os.makedirs(os.path.join(target_files_dir, 'SYSTEM', 'product'))
1048    os.makedirs(os.path.join(target_files_dir, 'SYSTEM', 'vendor', 'odm'))
1049    os.makedirs(os.path.join(target_files_dir, 'VENDOR_DLKM'))
1050    partition_map = common.PartitionMapFromTargetFiles(target_files_dir)
1051    self.assertDictEqual(
1052        partition_map,
1053        {
1054            'system': 'SYSTEM',
1055            'vendor': 'SYSTEM/vendor',
1056            # Prefer PRODUCT over SYSTEM/product
1057            'product': 'PRODUCT',
1058            'odm': 'SYSTEM/vendor/odm',
1059            'vendor_dlkm': 'VENDOR_DLKM',
1060            # No system_ext or odm_dlkm
1061        })
1062
1063  def test_SharedUidPartitionViolations(self):
1064    uid_dict = {
1065        'android.uid.phone': {
1066            'system': ['system_phone.apk'],
1067            'system_ext': ['system_ext_phone.apk'],
1068        },
1069        'android.uid.wifi': {
1070            'vendor': ['vendor_wifi.apk'],
1071            'odm': ['odm_wifi.apk'],
1072        },
1073    }
1074    errors = common.SharedUidPartitionViolations(
1075        uid_dict, [('system', 'system_ext'), ('vendor', 'odm')])
1076    self.assertEqual(errors, [])
1077
1078  def test_SharedUidPartitionViolations_Violation(self):
1079    uid_dict = {
1080        'android.uid.phone': {
1081            'system': ['system_phone.apk'],
1082            'vendor': ['vendor_phone.apk'],
1083        },
1084    }
1085    errors = common.SharedUidPartitionViolations(
1086        uid_dict, [('system', 'system_ext'), ('vendor', 'odm')])
1087    self.assertIn(
1088        ('APK sharedUserId "android.uid.phone" found across partition groups '
1089         'in partitions "system,vendor"'), errors)
1090
1091  def test_GetSparseImage_missingImageFile(self):
1092    self.assertRaises(
1093        AssertionError, common.GetSparseImage, 'system2', self.testdata_dir,
1094        None, False)
1095    self.assertRaises(
1096        AssertionError, common.GetSparseImage, 'unknown', self.testdata_dir,
1097        None, False)
1098
1099  @test_utils.SkipIfExternalToolsUnavailable()
1100  def test_GetSparseImage_missingBlockMapFile(self):
1101    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1102    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1103      target_files_zip.write(
1104          test_utils.construct_sparse_image([
1105              (0xCAC1, 6),
1106              (0xCAC3, 3),
1107              (0xCAC1, 4)]),
1108          arcname='IMAGES/system.img')
1109      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
1110      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1111
1112    tempdir = common.UnzipTemp(target_files)
1113    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1114      self.assertRaises(
1115          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
1116          False)
1117
1118  @test_utils.SkipIfExternalToolsUnavailable()
1119  def test_GetSparseImage_sharedBlocks_notAllowed(self):
1120    """Tests the case of having overlapping blocks but disallowed."""
1121    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1122    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1123      target_files_zip.write(
1124          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1125          arcname='IMAGES/system.img')
1126      # Block 10 is shared between two files.
1127      target_files_zip.writestr(
1128          'IMAGES/system.map',
1129          '\n'.join([
1130              '/system/file1 1-5 9-10',
1131              '/system/file2 10-12']))
1132      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1133      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1134
1135    tempdir = common.UnzipTemp(target_files)
1136    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1137      self.assertRaises(
1138          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
1139          False)
1140
1141  @test_utils.SkipIfExternalToolsUnavailable()
1142  def test_GetSparseImage_sharedBlocks_allowed(self):
1143    """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true."""
1144    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1145    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1146      # Construct an image with a care_map of "0-5 9-12".
1147      target_files_zip.write(
1148          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1149          arcname='IMAGES/system.img')
1150      # Block 10 is shared between two files.
1151      target_files_zip.writestr(
1152          'IMAGES/system.map',
1153          '\n'.join([
1154              '/system/file1 1-5 9-10',
1155              '/system/file2 10-12']))
1156      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1157      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1158
1159    tempdir = common.UnzipTemp(target_files)
1160    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1161      sparse_image = common.GetSparseImage('system', tempdir, input_zip, True)
1162
1163    self.assertDictEqual(
1164        {
1165            '__COPY': RangeSet("0"),
1166            '__NONZERO-0': RangeSet("6-8 13-15"),
1167            '/system/file1': RangeSet("1-5 9-10"),
1168            '/system/file2': RangeSet("11-12"),
1169        },
1170        sparse_image.file_map)
1171
1172    # '/system/file2' should be marked with 'uses_shared_blocks', but not with
1173    # 'incomplete'.
1174    self.assertTrue(
1175        sparse_image.file_map['/system/file2'].extra['uses_shared_blocks'])
1176    self.assertNotIn(
1177        'incomplete', sparse_image.file_map['/system/file2'].extra)
1178
1179    # '/system/file1' will only contain one field -- a copy of the input text.
1180    self.assertEqual(1, len(sparse_image.file_map['/system/file1'].extra))
1181
1182    # Meta entries should not have any extra tag.
1183    self.assertFalse(sparse_image.file_map['__COPY'].extra)
1184    self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra)
1185
1186  @test_utils.SkipIfExternalToolsUnavailable()
1187  def test_GetSparseImage_incompleteRanges(self):
1188    """Tests the case of ext4 images with holes."""
1189    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1190    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1191      target_files_zip.write(
1192          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1193          arcname='IMAGES/system.img')
1194      target_files_zip.writestr(
1195          'IMAGES/system.map',
1196          '\n'.join([
1197              '/system/file1 1-5 9-10',
1198              '/system/file2 11-12']))
1199      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1200      # '/system/file2' has less blocks listed (2) than actual (3).
1201      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1202
1203    tempdir = common.UnzipTemp(target_files)
1204    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1205      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1206
1207    self.assertEqual(
1208        '1-5 9-10',
1209        sparse_image.file_map['/system/file1'].extra['text_str'])
1210    self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete'])
1211
1212  @test_utils.SkipIfExternalToolsUnavailable()
1213  def test_GetSparseImage_systemRootImage_filenameWithExtraLeadingSlash(self):
1214    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1215    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1216      target_files_zip.write(
1217          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1218          arcname='IMAGES/system.img')
1219      target_files_zip.writestr(
1220          'IMAGES/system.map',
1221          '\n'.join([
1222              '//system/file1 1-5 9-10',
1223              '//system/file2 11-12',
1224              '/system/app/file3 13-15']))
1225      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1226      # '/system/file2' has less blocks listed (2) than actual (3).
1227      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1228      # '/system/app/file3' has less blocks listed (3) than actual (4).
1229      target_files_zip.writestr('SYSTEM/app/file3', os.urandom(4096 * 4))
1230
1231    tempdir = common.UnzipTemp(target_files)
1232    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1233      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1234
1235    self.assertEqual(
1236        '1-5 9-10',
1237        sparse_image.file_map['//system/file1'].extra['text_str'])
1238    self.assertTrue(sparse_image.file_map['//system/file2'].extra['incomplete'])
1239    self.assertTrue(
1240        sparse_image.file_map['/system/app/file3'].extra['incomplete'])
1241
1242  @test_utils.SkipIfExternalToolsUnavailable()
1243  def test_GetSparseImage_systemRootImage_nonSystemFiles(self):
1244    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1245    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1246      target_files_zip.write(
1247          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1248          arcname='IMAGES/system.img')
1249      target_files_zip.writestr(
1250          'IMAGES/system.map',
1251          '\n'.join([
1252              '//system/file1 1-5 9-10',
1253              '//init.rc 13-15']))
1254      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1255      # '/init.rc' has less blocks listed (3) than actual (4).
1256      target_files_zip.writestr('ROOT/init.rc', os.urandom(4096 * 4))
1257
1258    tempdir = common.UnzipTemp(target_files)
1259    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1260      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1261
1262    self.assertEqual(
1263        '1-5 9-10',
1264        sparse_image.file_map['//system/file1'].extra['text_str'])
1265    self.assertTrue(sparse_image.file_map['//init.rc'].extra['incomplete'])
1266
1267  @test_utils.SkipIfExternalToolsUnavailable()
1268  def test_GetSparseImage_fileNotFound(self):
1269    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1270    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1271      target_files_zip.write(
1272          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1273          arcname='IMAGES/system.img')
1274      target_files_zip.writestr(
1275          'IMAGES/system.map',
1276          '\n'.join([
1277              '//system/file1 1-5 9-10',
1278              '//system/file2 11-12']))
1279      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1280
1281    tempdir = common.UnzipTemp(target_files)
1282    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1283      self.assertRaises(
1284          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
1285          False)
1286
1287  @test_utils.SkipIfExternalToolsUnavailable()
1288  def test_GetAvbChainedPartitionArg(self):
1289    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
1290    info_dict = {
1291        'avb_avbtool': 'avbtool',
1292        'avb_system_key_path': pubkey,
1293        'avb_system_rollback_index_location': 2,
1294    }
1295    args = common.GetAvbChainedPartitionArg('system', info_dict).split(':')
1296    self.assertEqual(3, len(args))
1297    self.assertEqual('system', args[0])
1298    self.assertEqual('2', args[1])
1299    self.assertTrue(os.path.exists(args[2]))
1300
1301  @test_utils.SkipIfExternalToolsUnavailable()
1302  def test_GetAvbChainedPartitionArg_withPrivateKey(self):
1303    key = os.path.join(self.testdata_dir, 'testkey.key')
1304    info_dict = {
1305        'avb_avbtool': 'avbtool',
1306        'avb_product_key_path': key,
1307        'avb_product_rollback_index_location': 2,
1308    }
1309    args = common.GetAvbChainedPartitionArg('product', info_dict).split(':')
1310    self.assertEqual(3, len(args))
1311    self.assertEqual('product', args[0])
1312    self.assertEqual('2', args[1])
1313    self.assertTrue(os.path.exists(args[2]))
1314
1315  @test_utils.SkipIfExternalToolsUnavailable()
1316  def test_GetAvbChainedPartitionArg_withSpecifiedKey(self):
1317    info_dict = {
1318        'avb_avbtool': 'avbtool',
1319        'avb_system_key_path': 'does-not-exist',
1320        'avb_system_rollback_index_location': 2,
1321    }
1322    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
1323    args = common.GetAvbChainedPartitionArg(
1324        'system', info_dict, pubkey).split(':')
1325    self.assertEqual(3, len(args))
1326    self.assertEqual('system', args[0])
1327    self.assertEqual('2', args[1])
1328    self.assertTrue(os.path.exists(args[2]))
1329
1330  @test_utils.SkipIfExternalToolsUnavailable()
1331  def test_GetAvbChainedPartitionArg_invalidKey(self):
1332    pubkey = os.path.join(self.testdata_dir, 'testkey_with_passwd.x509.pem')
1333    info_dict = {
1334        'avb_avbtool': 'avbtool',
1335        'avb_system_key_path': pubkey,
1336        'avb_system_rollback_index_location': 2,
1337    }
1338    self.assertRaises(
1339        common.ExternalError, common.GetAvbChainedPartitionArg, 'system',
1340        info_dict)
1341
1342  INFO_DICT_DEFAULT = {
1343      'recovery_api_version': 3,
1344      'fstab_version': 2,
1345      'system_root_image': 'true',
1346      'no_recovery' : 'true',
1347      'recovery_as_boot': 'true',
1348  }
1349
1350  def test_LoadListFromFile(self):
1351    file_path = os.path.join(self.testdata_dir,
1352                             'merge_config_framework_item_list')
1353    contents = common.LoadListFromFile(file_path)
1354    expected_contents = [
1355        'META/apkcerts.txt',
1356        'META/filesystem_config.txt',
1357        'META/root_filesystem_config.txt',
1358        'META/system_manifest.xml',
1359        'META/system_matrix.xml',
1360        'META/update_engine_config.txt',
1361        'PRODUCT/*',
1362        'ROOT/*',
1363        'SYSTEM/*',
1364    ]
1365    self.assertEqual(sorted(contents), sorted(expected_contents))
1366
1367  @staticmethod
1368  def _test_LoadInfoDict_createTargetFiles(info_dict, fstab_path):
1369    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1370    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1371      info_values = ''.join(
1372          ['{}={}\n'.format(k, v) for k, v in sorted(info_dict.items())])
1373      common.ZipWriteStr(target_files_zip, 'META/misc_info.txt', info_values)
1374
1375      FSTAB_TEMPLATE = "/dev/block/system {} ext4 ro,barrier=1 defaults"
1376      if info_dict.get('system_root_image') == 'true':
1377        fstab_values = FSTAB_TEMPLATE.format('/')
1378      else:
1379        fstab_values = FSTAB_TEMPLATE.format('/system')
1380      common.ZipWriteStr(target_files_zip, fstab_path, fstab_values)
1381
1382      common.ZipWriteStr(
1383          target_files_zip, 'META/file_contexts', 'file-contexts')
1384    return target_files
1385
1386  def test_LoadInfoDict(self):
1387    target_files = self._test_LoadInfoDict_createTargetFiles(
1388        self.INFO_DICT_DEFAULT,
1389        'BOOT/RAMDISK/system/etc/recovery.fstab')
1390    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1391      loaded_dict = common.LoadInfoDict(target_files_zip)
1392      self.assertEqual(3, loaded_dict['recovery_api_version'])
1393      self.assertEqual(2, loaded_dict['fstab_version'])
1394      self.assertIn('/', loaded_dict['fstab'])
1395      self.assertIn('/system', loaded_dict['fstab'])
1396
1397  def test_LoadInfoDict_legacyRecoveryFstabPath(self):
1398    target_files = self._test_LoadInfoDict_createTargetFiles(
1399        self.INFO_DICT_DEFAULT,
1400        'BOOT/RAMDISK/etc/recovery.fstab')
1401    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1402      loaded_dict = common.LoadInfoDict(target_files_zip)
1403      self.assertEqual(3, loaded_dict['recovery_api_version'])
1404      self.assertEqual(2, loaded_dict['fstab_version'])
1405      self.assertIn('/', loaded_dict['fstab'])
1406      self.assertIn('/system', loaded_dict['fstab'])
1407
1408  @test_utils.SkipIfExternalToolsUnavailable()
1409  def test_LoadInfoDict_dirInput(self):
1410    target_files = self._test_LoadInfoDict_createTargetFiles(
1411        self.INFO_DICT_DEFAULT,
1412        'BOOT/RAMDISK/system/etc/recovery.fstab')
1413    unzipped = common.UnzipTemp(target_files)
1414    loaded_dict = common.LoadInfoDict(unzipped)
1415    self.assertEqual(3, loaded_dict['recovery_api_version'])
1416    self.assertEqual(2, loaded_dict['fstab_version'])
1417    self.assertIn('/', loaded_dict['fstab'])
1418    self.assertIn('/system', loaded_dict['fstab'])
1419
1420  @test_utils.SkipIfExternalToolsUnavailable()
1421  def test_LoadInfoDict_dirInput_legacyRecoveryFstabPath(self):
1422    target_files = self._test_LoadInfoDict_createTargetFiles(
1423        self.INFO_DICT_DEFAULT,
1424        'BOOT/RAMDISK/system/etc/recovery.fstab')
1425    unzipped = common.UnzipTemp(target_files)
1426    loaded_dict = common.LoadInfoDict(unzipped)
1427    self.assertEqual(3, loaded_dict['recovery_api_version'])
1428    self.assertEqual(2, loaded_dict['fstab_version'])
1429    self.assertIn('/', loaded_dict['fstab'])
1430    self.assertIn('/system', loaded_dict['fstab'])
1431
1432  def test_LoadInfoDict_systemRootImageFalse(self):
1433    # Devices not using system-as-root nor recovery-as-boot. Non-A/B devices
1434    # launched prior to P will likely have this config.
1435    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
1436    del info_dict['no_recovery']
1437    del info_dict['system_root_image']
1438    del info_dict['recovery_as_boot']
1439    target_files = self._test_LoadInfoDict_createTargetFiles(
1440        info_dict,
1441        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
1442    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1443      loaded_dict = common.LoadInfoDict(target_files_zip)
1444      self.assertEqual(3, loaded_dict['recovery_api_version'])
1445      self.assertEqual(2, loaded_dict['fstab_version'])
1446      self.assertNotIn('/', loaded_dict['fstab'])
1447      self.assertIn('/system', loaded_dict['fstab'])
1448
1449  def test_LoadInfoDict_recoveryAsBootFalse(self):
1450    # Devices using system-as-root, but with standalone recovery image. Non-A/B
1451    # devices launched since P will likely have this config.
1452    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
1453    del info_dict['no_recovery']
1454    del info_dict['recovery_as_boot']
1455    target_files = self._test_LoadInfoDict_createTargetFiles(
1456        info_dict,
1457        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
1458    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1459      loaded_dict = common.LoadInfoDict(target_files_zip)
1460      self.assertEqual(3, loaded_dict['recovery_api_version'])
1461      self.assertEqual(2, loaded_dict['fstab_version'])
1462      self.assertIn('/', loaded_dict['fstab'])
1463      self.assertIn('/system', loaded_dict['fstab'])
1464
1465  def test_LoadInfoDict_noRecoveryTrue(self):
1466    # Device doesn't have a recovery partition at all.
1467    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
1468    del info_dict['recovery_as_boot']
1469    target_files = self._test_LoadInfoDict_createTargetFiles(
1470        info_dict,
1471        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
1472    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1473      loaded_dict = common.LoadInfoDict(target_files_zip)
1474      self.assertEqual(3, loaded_dict['recovery_api_version'])
1475      self.assertEqual(2, loaded_dict['fstab_version'])
1476      self.assertIsNone(loaded_dict['fstab'])
1477
1478  @test_utils.SkipIfExternalToolsUnavailable()
1479  def test_LoadInfoDict_missingMetaMiscInfoTxt(self):
1480    target_files = self._test_LoadInfoDict_createTargetFiles(
1481        self.INFO_DICT_DEFAULT,
1482        'BOOT/RAMDISK/system/etc/recovery.fstab')
1483    common.ZipDelete(target_files, 'META/misc_info.txt')
1484    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1485      self.assertRaises(ValueError, common.LoadInfoDict, target_files_zip)
1486
1487  @test_utils.SkipIfExternalToolsUnavailable()
1488  def test_LoadInfoDict_repacking(self):
1489    target_files = self._test_LoadInfoDict_createTargetFiles(
1490        self.INFO_DICT_DEFAULT,
1491        'BOOT/RAMDISK/system/etc/recovery.fstab')
1492    unzipped = common.UnzipTemp(target_files)
1493    loaded_dict = common.LoadInfoDict(unzipped, True)
1494    self.assertEqual(3, loaded_dict['recovery_api_version'])
1495    self.assertEqual(2, loaded_dict['fstab_version'])
1496    self.assertIn('/', loaded_dict['fstab'])
1497    self.assertIn('/system', loaded_dict['fstab'])
1498    self.assertEqual(
1499        os.path.join(unzipped, 'ROOT'), loaded_dict['root_dir'])
1500    self.assertEqual(
1501        os.path.join(unzipped, 'META', 'root_filesystem_config.txt'),
1502        loaded_dict['root_fs_config'])
1503
1504  def test_LoadInfoDict_repackingWithZipFileInput(self):
1505    target_files = self._test_LoadInfoDict_createTargetFiles(
1506        self.INFO_DICT_DEFAULT,
1507        'BOOT/RAMDISK/system/etc/recovery.fstab')
1508    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1509      self.assertRaises(
1510          AssertionError, common.LoadInfoDict, target_files_zip, True)
1511
1512  def test_MergeDynamicPartitionInfoDicts_ReturnsMergedDict(self):
1513    framework_dict = {
1514        'use_dynamic_partitions': 'true',
1515        'super_partition_groups': 'group_a',
1516        'dynamic_partition_list': 'system',
1517        'super_group_a_partition_list': 'system',
1518    }
1519    vendor_dict = {
1520        'use_dynamic_partitions': 'true',
1521        'super_partition_groups': 'group_a group_b',
1522        'dynamic_partition_list': 'vendor product',
1523        'super_block_devices': 'super',
1524        'super_super_device_size': '3000',
1525        'super_group_a_partition_list': 'vendor',
1526        'super_group_a_group_size': '1000',
1527        'super_group_b_partition_list': 'product',
1528        'super_group_b_group_size': '2000',
1529    }
1530    merged_dict = common.MergeDynamicPartitionInfoDicts(
1531        framework_dict=framework_dict,
1532        vendor_dict=vendor_dict)
1533    expected_merged_dict = {
1534        'use_dynamic_partitions': 'true',
1535        'super_partition_groups': 'group_a group_b',
1536        'dynamic_partition_list': 'product system vendor',
1537        'super_block_devices': 'super',
1538        'super_super_device_size': '3000',
1539        'super_group_a_partition_list': 'system vendor',
1540        'super_group_a_group_size': '1000',
1541        'super_group_b_partition_list': 'product',
1542        'super_group_b_group_size': '2000',
1543    }
1544    self.assertEqual(merged_dict, expected_merged_dict)
1545
1546  def test_MergeDynamicPartitionInfoDicts_IgnoringFrameworkGroupSize(self):
1547    framework_dict = {
1548        'use_dynamic_partitions': 'true',
1549        'super_partition_groups': 'group_a',
1550        'dynamic_partition_list': 'system',
1551        'super_group_a_partition_list': 'system',
1552        'super_group_a_group_size': '5000',
1553    }
1554    vendor_dict = {
1555        'use_dynamic_partitions': 'true',
1556        'super_partition_groups': 'group_a group_b',
1557        'dynamic_partition_list': 'vendor product',
1558        'super_group_a_partition_list': 'vendor',
1559        'super_group_a_group_size': '1000',
1560        'super_group_b_partition_list': 'product',
1561        'super_group_b_group_size': '2000',
1562    }
1563    merged_dict = common.MergeDynamicPartitionInfoDicts(
1564        framework_dict=framework_dict,
1565        vendor_dict=vendor_dict)
1566    expected_merged_dict = {
1567        'use_dynamic_partitions': 'true',
1568        'super_partition_groups': 'group_a group_b',
1569        'dynamic_partition_list': 'product system vendor',
1570        'super_group_a_partition_list': 'system vendor',
1571        'super_group_a_group_size': '1000',
1572        'super_group_b_partition_list': 'product',
1573        'super_group_b_group_size': '2000',
1574    }
1575    self.assertEqual(merged_dict, expected_merged_dict)
1576
1577  def test_GetAvbPartitionArg(self):
1578    info_dict = {}
1579    cmd = common.GetAvbPartitionArg('system', '/path/to/system.img', info_dict)
1580    self.assertEqual(
1581        ['--include_descriptors_from_image', '/path/to/system.img'], cmd)
1582
1583  @test_utils.SkipIfExternalToolsUnavailable()
1584  def test_AppendVBMetaArgsForPartition_vendorAsChainedPartition(self):
1585    testdata_dir = test_utils.get_testdata_dir()
1586    pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
1587    info_dict = {
1588        'avb_avbtool': 'avbtool',
1589        'avb_vendor_key_path': pubkey,
1590        'avb_vendor_rollback_index_location': 5,
1591    }
1592    cmd = common.GetAvbPartitionArg('vendor', '/path/to/vendor.img', info_dict)
1593    self.assertEqual(2, len(cmd))
1594    self.assertEqual('--chain_partition', cmd[0])
1595    chained_partition_args = cmd[1].split(':')
1596    self.assertEqual(3, len(chained_partition_args))
1597    self.assertEqual('vendor', chained_partition_args[0])
1598    self.assertEqual('5', chained_partition_args[1])
1599    self.assertTrue(os.path.exists(chained_partition_args[2]))
1600
1601  @test_utils.SkipIfExternalToolsUnavailable()
1602  def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_nonAb(self):
1603    testdata_dir = test_utils.get_testdata_dir()
1604    pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
1605    info_dict = {
1606        'avb_avbtool': 'avbtool',
1607        'avb_recovery_key_path': pubkey,
1608        'avb_recovery_rollback_index_location': 3,
1609    }
1610    cmd = common.GetAvbPartitionArg(
1611        'recovery', '/path/to/recovery.img', info_dict)
1612    self.assertFalse(cmd)
1613
1614  @test_utils.SkipIfExternalToolsUnavailable()
1615  def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_ab(self):
1616    testdata_dir = test_utils.get_testdata_dir()
1617    pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
1618    info_dict = {
1619        'ab_update': 'true',
1620        'avb_avbtool': 'avbtool',
1621        'avb_recovery_key_path': pubkey,
1622        'avb_recovery_rollback_index_location': 3,
1623    }
1624    cmd = common.GetAvbPartitionArg(
1625        'recovery', '/path/to/recovery.img', info_dict)
1626    self.assertEqual(2, len(cmd))
1627    self.assertEqual('--chain_partition', cmd[0])
1628    chained_partition_args = cmd[1].split(':')
1629    self.assertEqual(3, len(chained_partition_args))
1630    self.assertEqual('recovery', chained_partition_args[0])
1631    self.assertEqual('3', chained_partition_args[1])
1632    self.assertTrue(os.path.exists(chained_partition_args[2]))
1633
1634  def test_BuildVBMeta_appendAftlCommandSyntax(self):
1635    testdata_dir = test_utils.get_testdata_dir()
1636    common.OPTIONS.info_dict = {
1637        'ab_update': 'true',
1638        'avb_avbtool': 'avbtool',
1639        'build.prop': common.PartitionBuildProps.FromDictionary(
1640            'system', {
1641                'ro.build.version.incremental': '6285659',
1642                'ro.product.device': 'coral',
1643                'ro.build.fingerprint':
1644                'google/coral/coral:R/RP1A.200311.002/'
1645                '6285659:userdebug/dev-keys'}
1646        ),
1647    }
1648    common.OPTIONS.aftl_tool_path = 'aftltool'
1649    common.OPTIONS.aftl_server = 'log.endpoints.aftl-dev.cloud.goog:9000'
1650    common.OPTIONS.aftl_key_path = os.path.join(testdata_dir,
1651                                                'test_transparency_key.pub')
1652    common.OPTIONS.aftl_manufacturer_key_path = os.path.join(
1653        testdata_dir, 'test_aftl_rsa4096.pem')
1654
1655    vbmeta_image = tempfile.NamedTemporaryFile(delete=False)
1656    cmd = common.ConstructAftlMakeImageCommands(vbmeta_image.name)
1657    expected_cmd = [
1658        'aftltool', 'make_icp_from_vbmeta',
1659        '--vbmeta_image_path', 'place_holder',
1660        '--output', vbmeta_image.name,
1661        '--version_incremental', '6285659',
1662        '--transparency_log_servers',
1663        'log.endpoints.aftl-dev.cloud.goog:9000,{}'.format(
1664            common.OPTIONS.aftl_key_path),
1665        '--manufacturer_key', common.OPTIONS.aftl_manufacturer_key_path,
1666        '--algorithm', 'SHA256_RSA4096',
1667        '--padding', '4096']
1668
1669    # ignore the place holder, i.e. path to a temp file
1670    self.assertEqual(cmd[:3], expected_cmd[:3])
1671    self.assertEqual(cmd[4:], expected_cmd[4:])
1672
1673  @unittest.skip("enable after we have a server for public")
1674  def test_BuildVBMeta_appendAftlContactServer(self):
1675    testdata_dir = test_utils.get_testdata_dir()
1676    common.OPTIONS.info_dict = {
1677        'ab_update': 'true',
1678        'avb_avbtool': 'avbtool',
1679        'build.prop': common.PartitionBuildProps.FromDictionary(
1680            'system', {
1681                'ro.build.version.incremental': '6285659',
1682                'ro.product.device': 'coral',
1683                'ro.build.fingerprint':
1684                'google/coral/coral:R/RP1A.200311.002/'
1685                '6285659:userdebug/dev-keys'}
1686        )
1687    }
1688    common.OPTIONS.aftl_tool_path = "aftltool"
1689    common.OPTIONS.aftl_server = "log.endpoints.aftl-dev.cloud.goog:9000"
1690    common.OPTIONS.aftl_key_path = os.path.join(testdata_dir,
1691                                                'test_transparency_key.pub')
1692    common.OPTIONS.aftl_manufacturer_key_path = os.path.join(
1693        testdata_dir, 'test_aftl_rsa4096.pem')
1694
1695    input_dir = common.MakeTempDir()
1696    system_image = common.MakeTempFile()
1697    build_image_cmd = ['mkuserimg_mke2fs', input_dir, system_image, 'ext4',
1698                       '/system', str(4096 * 100), '-j', '0', '-s']
1699    common.RunAndCheckOutput(build_image_cmd)
1700
1701    add_footer_cmd = ['avbtool', 'add_hashtree_footer',
1702                      '--partition_size', str(4096 * 150),
1703                      '--partition_name', 'system',
1704                      '--image', system_image]
1705    common.RunAndCheckOutput(add_footer_cmd)
1706
1707    vbmeta_image = common.MakeTempFile()
1708    common.BuildVBMeta(vbmeta_image, {'system': system_image}, 'vbmeta',
1709                       ['system'])
1710
1711    verify_cmd = ['aftltool', 'verify_image_icp', '--vbmeta_image_path',
1712                  vbmeta_image, '--transparency_log_pub_keys',
1713                  common.OPTIONS.aftl_key_path]
1714    common.RunAndCheckOutput(verify_cmd)
1715
1716  @test_utils.SkipIfExternalToolsUnavailable()
1717  def test_AppendGkiSigningArgs_NoSigningKeyPath(self):
1718    # A non-GKI boot.img has no gki_signing_key_path.
1719    common.OPTIONS.info_dict = {
1720        # 'gki_signing_key_path': pubkey,
1721        'gki_signing_algorithm': 'SHA256_RSA4096',
1722        'gki_signing_signature_args': '--prop foo:bar',
1723    }
1724
1725    # Tests no --gki_signing_* args are appended if there is no
1726    # gki_signing_key_path.
1727    cmd = ['mkbootimg', '--header_version', '4']
1728    expected_cmd = ['mkbootimg', '--header_version', '4']
1729    common.AppendGkiSigningArgs(cmd)
1730    self.assertEqual(cmd, expected_cmd)
1731
1732  def test_AppendGkiSigningArgs_NoSigningAlgorithm(self):
1733    pubkey = os.path.join(self.testdata_dir, 'testkey_gki.pem')
1734    with open(pubkey, 'wb') as f:
1735      f.write(b'\x00' * 100)
1736    self.assertTrue(os.path.exists(pubkey))
1737
1738    # Tests no --gki_signing_* args are appended if there is no
1739    # gki_signing_algorithm.
1740    common.OPTIONS.info_dict = {
1741        'gki_signing_key_path': pubkey,
1742        # 'gki_signing_algorithm': 'SHA256_RSA4096',
1743        'gki_signing_signature_args': '--prop foo:bar',
1744    }
1745
1746    cmd = ['mkbootimg', '--header_version', '4']
1747    expected_cmd = ['mkbootimg', '--header_version', '4']
1748    common.AppendGkiSigningArgs(cmd)
1749    self.assertEqual(cmd, expected_cmd)
1750
1751  @test_utils.SkipIfExternalToolsUnavailable()
1752  def test_AppendGkiSigningArgs(self):
1753    pubkey = os.path.join(self.testdata_dir, 'testkey_gki.pem')
1754    with open(pubkey, 'wb') as f:
1755      f.write(b'\x00' * 100)
1756    self.assertTrue(os.path.exists(pubkey))
1757
1758    common.OPTIONS.info_dict = {
1759        'gki_signing_key_path': pubkey,
1760        'gki_signing_algorithm': 'SHA256_RSA4096',
1761        'gki_signing_signature_args': '--prop foo:bar',
1762    }
1763    cmd = ['mkbootimg', '--header_version', '4']
1764    common.AppendGkiSigningArgs(cmd)
1765
1766    expected_cmd = [
1767      'mkbootimg', '--header_version', '4',
1768      '--gki_signing_key', pubkey,
1769      '--gki_signing_algorithm', 'SHA256_RSA4096',
1770      '--gki_signing_signature_args', '--prop foo:bar'
1771    ]
1772    self.assertEqual(cmd, expected_cmd)
1773
1774  @test_utils.SkipIfExternalToolsUnavailable()
1775  def test_AppendGkiSigningArgs_KeyPathNotFound(self):
1776    pubkey = os.path.join(self.testdata_dir, 'no_testkey_gki.pem')
1777    self.assertFalse(os.path.exists(pubkey))
1778
1779    common.OPTIONS.info_dict = {
1780        'gki_signing_key_path': pubkey,
1781        'gki_signing_algorithm': 'SHA256_RSA4096',
1782        'gki_signing_signature_args': '--prop foo:bar',
1783    }
1784    cmd = ['mkbootimg', '--header_version', '4']
1785    self.assertRaises(common.ExternalError, common.AppendGkiSigningArgs, cmd)
1786
1787  @test_utils.SkipIfExternalToolsUnavailable()
1788  def test_AppendGkiSigningArgs_SearchKeyPath(self):
1789    pubkey = 'testkey_gki.pem'
1790    self.assertFalse(os.path.exists(pubkey))
1791
1792    # Tests it should replace the pubkey with an existed key under
1793    # OPTIONS.search_path, i.e., os.path.join(OPTIONS.search_path, pubkey).
1794    search_path_dir = common.MakeTempDir()
1795    search_pubkey = os.path.join(search_path_dir, pubkey)
1796    with open(search_pubkey, 'wb') as f:
1797      f.write(b'\x00' * 100)
1798    self.assertTrue(os.path.exists(search_pubkey))
1799
1800    common.OPTIONS.search_path = search_path_dir
1801    common.OPTIONS.info_dict = {
1802        'gki_signing_key_path': pubkey,
1803        'gki_signing_algorithm': 'SHA256_RSA4096',
1804        'gki_signing_signature_args': '--prop foo:bar',
1805    }
1806    cmd = ['mkbootimg', '--header_version', '4']
1807    common.AppendGkiSigningArgs(cmd)
1808
1809    expected_cmd = [
1810      'mkbootimg', '--header_version', '4',
1811      '--gki_signing_key', search_pubkey,
1812      '--gki_signing_algorithm', 'SHA256_RSA4096',
1813      '--gki_signing_signature_args', '--prop foo:bar'
1814    ]
1815    self.assertEqual(cmd, expected_cmd)
1816
1817  @test_utils.SkipIfExternalToolsUnavailable()
1818  def test_AppendGkiSigningArgs_SearchKeyPathNotFound(self):
1819    pubkey = 'no_testkey_gki.pem'
1820    self.assertFalse(os.path.exists(pubkey))
1821
1822    # Tests it should raise ExternalError if no key found under
1823    # OPTIONS.search_path.
1824    search_path_dir = common.MakeTempDir()
1825    search_pubkey = os.path.join(search_path_dir, pubkey)
1826    self.assertFalse(os.path.exists(search_pubkey))
1827
1828    common.OPTIONS.search_path = search_path_dir
1829    common.OPTIONS.info_dict = {
1830        'gki_signing_key_path': pubkey,
1831        'gki_signing_algorithm': 'SHA256_RSA4096',
1832        'gki_signing_signature_args': '--prop foo:bar',
1833    }
1834    cmd = ['mkbootimg', '--header_version', '4']
1835    self.assertRaises(common.ExternalError, common.AppendGkiSigningArgs, cmd)
1836
1837
1838class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase):
1839  """Checks the format of install-recovery.sh.
1840
1841  Its format should match between common.py and validate_target_files.py.
1842  """
1843
1844  def setUp(self):
1845    self._tempdir = common.MakeTempDir()
1846    # Create a fake dict that contains the fstab info for boot&recovery.
1847    self._info = {"fstab" : {}}
1848    fake_fstab = [
1849        "/dev/soc.0/by-name/boot /boot emmc defaults defaults",
1850        "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
1851    self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, fake_fstab)
1852    # Construct the gzipped recovery.img and boot.img
1853    self.recovery_data = bytearray([
1854        0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
1855        0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
1856        0x08, 0x00, 0x00, 0x00
1857    ])
1858    # echo -n "boot" | gzip -f | hd
1859    self.boot_data = bytearray([
1860        0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
1861        0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
1862    ])
1863
1864  def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
1865    loc = os.path.join(self._tempdir, prefix, name)
1866    if not os.path.exists(os.path.dirname(loc)):
1867      os.makedirs(os.path.dirname(loc))
1868    with open(loc, "wb") as f:
1869      f.write(data)
1870
1871  def test_full_recovery(self):
1872    recovery_image = common.File("recovery.img", self.recovery_data)
1873    boot_image = common.File("boot.img", self.boot_data)
1874    self._info["full_recovery_image"] = "true"
1875
1876    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1877                             recovery_image, boot_image, self._info)
1878    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1879                                                        self._info)
1880
1881  @test_utils.SkipIfExternalToolsUnavailable()
1882  def test_recovery_from_boot(self):
1883    recovery_image = common.File("recovery.img", self.recovery_data)
1884    self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
1885    boot_image = common.File("boot.img", self.boot_data)
1886    self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
1887
1888    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1889                             recovery_image, boot_image, self._info)
1890    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1891                                                        self._info)
1892    # Validate 'recovery-from-boot' with bonus argument.
1893    self._out_tmp_sink("etc/recovery-resource.dat", b"bonus", "SYSTEM")
1894    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1895                             recovery_image, boot_image, self._info)
1896    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1897                                                        self._info)
1898
1899
1900class MockBlockDifference(object):
1901
1902  def __init__(self, partition, tgt, src=None):
1903    self.partition = partition
1904    self.tgt = tgt
1905    self.src = src
1906
1907  def WriteScript(self, script, _, progress=None,
1908                  write_verify_script=False):
1909    if progress:
1910      script.AppendExtra("progress({})".format(progress))
1911    script.AppendExtra("patch({});".format(self.partition))
1912    if write_verify_script:
1913      self.WritePostInstallVerifyScript(script)
1914
1915  def WritePostInstallVerifyScript(self, script):
1916    script.AppendExtra("verify({});".format(self.partition))
1917
1918
1919class FakeSparseImage(object):
1920
1921  def __init__(self, size):
1922    self.blocksize = 4096
1923    self.total_blocks = size // 4096
1924    assert size % 4096 == 0, "{} is not a multiple of 4096".format(size)
1925
1926
1927class DynamicPartitionsDifferenceTest(test_utils.ReleaseToolsTestCase):
1928
1929  @staticmethod
1930  def get_op_list(output_path):
1931    with zipfile.ZipFile(output_path, allowZip64=True) as output_zip:
1932      with output_zip.open('dynamic_partitions_op_list') as op_list:
1933        return [line.decode().strip() for line in op_list.readlines()
1934                if not line.startswith(b'#')]
1935
1936  def setUp(self):
1937    self.script = test_utils.MockScriptWriter()
1938    self.output_path = common.MakeTempFile(suffix='.zip')
1939
1940  def test_full(self):
1941    target_info = common.LoadDictionaryFromLines("""
1942dynamic_partition_list=system vendor
1943super_partition_groups=group_foo
1944super_group_foo_group_size={group_size}
1945super_group_foo_partition_list=system vendor
1946""".format(group_size=4 * GiB).split("\n"))
1947    block_diffs = [MockBlockDifference("system", FakeSparseImage(3 * GiB)),
1948                   MockBlockDifference("vendor", FakeSparseImage(1 * GiB))]
1949
1950    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs)
1951    with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
1952      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1953
1954    self.assertEqual(str(self.script).strip(), """
1955assert(update_dynamic_partitions(package_extract_file("dynamic_partitions_op_list")));
1956patch(system);
1957verify(system);
1958unmap_partition("system");
1959patch(vendor);
1960verify(vendor);
1961unmap_partition("vendor");
1962""".strip())
1963
1964    lines = self.get_op_list(self.output_path)
1965
1966    remove_all_groups = lines.index("remove_all_groups")
1967    add_group = lines.index("add_group group_foo 4294967296")
1968    add_vendor = lines.index("add vendor group_foo")
1969    add_system = lines.index("add system group_foo")
1970    resize_vendor = lines.index("resize vendor 1073741824")
1971    resize_system = lines.index("resize system 3221225472")
1972
1973    self.assertLess(remove_all_groups, add_group,
1974                    "Should add groups after removing all groups")
1975    self.assertLess(add_group, min(add_vendor, add_system),
1976                    "Should add partitions after adding group")
1977    self.assertLess(add_system, resize_system,
1978                    "Should resize system after adding it")
1979    self.assertLess(add_vendor, resize_vendor,
1980                    "Should resize vendor after adding it")
1981
1982  def test_inc_groups(self):
1983    source_info = common.LoadDictionaryFromLines("""
1984super_partition_groups=group_foo group_bar group_baz
1985super_group_foo_group_size={group_foo_size}
1986super_group_bar_group_size={group_bar_size}
1987""".format(group_foo_size=4 * GiB, group_bar_size=3 * GiB).split("\n"))
1988    target_info = common.LoadDictionaryFromLines("""
1989super_partition_groups=group_foo group_baz group_qux
1990super_group_foo_group_size={group_foo_size}
1991super_group_baz_group_size={group_baz_size}
1992super_group_qux_group_size={group_qux_size}
1993""".format(group_foo_size=3 * GiB, group_baz_size=4 * GiB,
1994           group_qux_size=1 * GiB).split("\n"))
1995
1996    dp_diff = common.DynamicPartitionsDifference(target_info,
1997                                                 block_diffs=[],
1998                                                 source_info_dict=source_info)
1999    with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
2000      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
2001
2002    lines = self.get_op_list(self.output_path)
2003
2004    removed = lines.index("remove_group group_bar")
2005    shrunk = lines.index("resize_group group_foo 3221225472")
2006    grown = lines.index("resize_group group_baz 4294967296")
2007    added = lines.index("add_group group_qux 1073741824")
2008
2009    self.assertLess(max(removed, shrunk),
2010                    min(grown, added),
2011                    "ops that remove / shrink partitions must precede ops that "
2012                    "grow / add partitions")
2013
2014  def test_incremental(self):
2015    source_info = common.LoadDictionaryFromLines("""
2016dynamic_partition_list=system vendor product system_ext
2017super_partition_groups=group_foo
2018super_group_foo_group_size={group_foo_size}
2019super_group_foo_partition_list=system vendor product system_ext
2020""".format(group_foo_size=4 * GiB).split("\n"))
2021    target_info = common.LoadDictionaryFromLines("""
2022dynamic_partition_list=system vendor product odm
2023super_partition_groups=group_foo group_bar
2024super_group_foo_group_size={group_foo_size}
2025super_group_foo_partition_list=system vendor odm
2026super_group_bar_group_size={group_bar_size}
2027super_group_bar_partition_list=product
2028""".format(group_foo_size=3 * GiB, group_bar_size=1 * GiB).split("\n"))
2029
2030    block_diffs = [MockBlockDifference("system", FakeSparseImage(1536 * MiB),
2031                                       src=FakeSparseImage(1024 * MiB)),
2032                   MockBlockDifference("vendor", FakeSparseImage(512 * MiB),
2033                                       src=FakeSparseImage(1024 * MiB)),
2034                   MockBlockDifference("product", FakeSparseImage(1024 * MiB),
2035                                       src=FakeSparseImage(1024 * MiB)),
2036                   MockBlockDifference("system_ext", None,
2037                                       src=FakeSparseImage(1024 * MiB)),
2038                   MockBlockDifference("odm", FakeSparseImage(1024 * MiB),
2039                                       src=None)]
2040
2041    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
2042                                                 source_info_dict=source_info)
2043    with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
2044      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
2045
2046    metadata_idx = self.script.lines.index(
2047        'assert(update_dynamic_partitions(package_extract_file('
2048        '"dynamic_partitions_op_list")));')
2049    self.assertLess(self.script.lines.index('patch(vendor);'), metadata_idx)
2050    self.assertLess(metadata_idx, self.script.lines.index('verify(vendor);'))
2051    for p in ("product", "system", "odm"):
2052      patch_idx = self.script.lines.index("patch({});".format(p))
2053      verify_idx = self.script.lines.index("verify({});".format(p))
2054      self.assertLess(metadata_idx, patch_idx,
2055                      "Should patch {} after updating metadata".format(p))
2056      self.assertLess(patch_idx, verify_idx,
2057                      "Should verify {} after patching".format(p))
2058
2059    self.assertNotIn("patch(system_ext);", self.script.lines)
2060
2061    lines = self.get_op_list(self.output_path)
2062
2063    remove = lines.index("remove system_ext")
2064    move_product_out = lines.index("move product default")
2065    shrink = lines.index("resize vendor 536870912")
2066    shrink_group = lines.index("resize_group group_foo 3221225472")
2067    add_group_bar = lines.index("add_group group_bar 1073741824")
2068    add_odm = lines.index("add odm group_foo")
2069    grow_existing = lines.index("resize system 1610612736")
2070    grow_added = lines.index("resize odm 1073741824")
2071    move_product_in = lines.index("move product group_bar")
2072
2073    max_idx_move_partition_out_foo = max(remove, move_product_out, shrink)
2074    min_idx_move_partition_in_foo = min(add_odm, grow_existing, grow_added)
2075
2076    self.assertLess(max_idx_move_partition_out_foo, shrink_group,
2077                    "Must shrink group after partitions inside group are shrunk"
2078                    " / removed")
2079
2080    self.assertLess(add_group_bar, move_product_in,
2081                    "Must add partitions to group after group is added")
2082
2083    self.assertLess(max_idx_move_partition_out_foo,
2084                    min_idx_move_partition_in_foo,
2085                    "Must shrink partitions / remove partitions from group"
2086                    "before adding / moving partitions into group")
2087
2088  def test_remove_partition(self):
2089    source_info = common.LoadDictionaryFromLines("""
2090blockimgdiff_versions=3,4
2091use_dynamic_partitions=true
2092dynamic_partition_list=foo
2093super_partition_groups=group_foo
2094super_group_foo_group_size={group_foo_size}
2095super_group_foo_partition_list=foo
2096""".format(group_foo_size=4 * GiB).split("\n"))
2097    target_info = common.LoadDictionaryFromLines("""
2098blockimgdiff_versions=3,4
2099use_dynamic_partitions=true
2100super_partition_groups=group_foo
2101super_group_foo_group_size={group_foo_size}
2102""".format(group_foo_size=4 * GiB).split("\n"))
2103
2104    common.OPTIONS.info_dict = target_info
2105    common.OPTIONS.target_info_dict = target_info
2106    common.OPTIONS.source_info_dict = source_info
2107    common.OPTIONS.cache_size = 4 * 4096
2108
2109    block_diffs = [common.BlockDifference("foo", EmptyImage(),
2110                                          src=DataImage("source", pad=True))]
2111
2112    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
2113                                                 source_info_dict=source_info)
2114    with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
2115      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
2116
2117    self.assertNotIn("block_image_update", str(self.script),
2118                     "Removed partition should not be patched.")
2119
2120    lines = self.get_op_list(self.output_path)
2121    self.assertEqual(lines, ["remove foo"])
2122
2123
2124class PartitionBuildPropsTest(test_utils.ReleaseToolsTestCase):
2125  def setUp(self):
2126    self.odm_build_prop = [
2127        'ro.odm.build.date.utc=1578430045',
2128        'ro.odm.build.fingerprint='
2129        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2130        'ro.product.odm.device=coral',
2131        'import /odm/etc/build_${ro.boot.product.device_name}.prop',
2132    ]
2133
2134  @staticmethod
2135  def _BuildZipFile(entries):
2136    input_file = common.MakeTempFile(prefix='target_files-', suffix='.zip')
2137    with zipfile.ZipFile(input_file, 'w', allowZip64=True) as input_zip:
2138      for name, content in entries.items():
2139        input_zip.writestr(name, content)
2140
2141    return input_file
2142
2143  def test_parseBuildProps_noImportStatement(self):
2144    build_prop = [
2145        'ro.odm.build.date.utc=1578430045',
2146        'ro.odm.build.fingerprint='
2147        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2148        'ro.product.odm.device=coral',
2149    ]
2150    input_file = self._BuildZipFile({
2151        'ODM/etc/build.prop': '\n'.join(build_prop),
2152    })
2153
2154    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2155      placeholder_values = {
2156          'ro.boot.product.device_name': ['std', 'pro']
2157      }
2158      partition_props = common.PartitionBuildProps.FromInputFile(
2159          input_zip, 'odm', placeholder_values)
2160
2161    self.assertEqual({
2162        'ro.odm.build.date.utc': '1578430045',
2163        'ro.odm.build.fingerprint':
2164        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2165        'ro.product.odm.device': 'coral',
2166    }, partition_props.build_props)
2167
2168    self.assertEqual(set(), partition_props.prop_overrides)
2169
2170  def test_parseBuildProps_singleImportStatement(self):
2171    build_std_prop = [
2172        'ro.product.odm.device=coral',
2173        'ro.product.odm.name=product1',
2174    ]
2175    build_pro_prop = [
2176        'ro.product.odm.device=coralpro',
2177        'ro.product.odm.name=product2',
2178    ]
2179
2180    input_file = self._BuildZipFile({
2181        'ODM/etc/build.prop': '\n'.join(self.odm_build_prop),
2182        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2183        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2184    })
2185
2186    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2187      placeholder_values = {
2188          'ro.boot.product.device_name': 'std'
2189      }
2190      partition_props = common.PartitionBuildProps.FromInputFile(
2191          input_zip, 'odm', placeholder_values)
2192
2193    self.assertEqual({
2194      'ro.odm.build.date.utc': '1578430045',
2195      'ro.odm.build.fingerprint':
2196      'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2197      'ro.product.odm.device': 'coral',
2198      'ro.product.odm.name': 'product1',
2199    }, partition_props.build_props)
2200
2201    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2202      placeholder_values = {
2203          'ro.boot.product.device_name': 'pro'
2204      }
2205      partition_props = common.PartitionBuildProps.FromInputFile(
2206          input_zip, 'odm', placeholder_values)
2207
2208    self.assertEqual({
2209        'ro.odm.build.date.utc': '1578430045',
2210        'ro.odm.build.fingerprint':
2211        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2212        'ro.product.odm.device': 'coralpro',
2213        'ro.product.odm.name': 'product2',
2214    }, partition_props.build_props)
2215
2216  def test_parseBuildProps_noPlaceHolders(self):
2217    build_prop = copy.copy(self.odm_build_prop)
2218    input_file = self._BuildZipFile({
2219        'ODM/etc/build.prop': '\n'.join(build_prop),
2220    })
2221
2222    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2223      partition_props = common.PartitionBuildProps.FromInputFile(
2224          input_zip, 'odm')
2225
2226    self.assertEqual({
2227        'ro.odm.build.date.utc': '1578430045',
2228        'ro.odm.build.fingerprint':
2229        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2230        'ro.product.odm.device': 'coral',
2231    }, partition_props.build_props)
2232
2233    self.assertEqual(set(), partition_props.prop_overrides)
2234
2235  def test_parseBuildProps_multipleImportStatements(self):
2236    build_prop = copy.deepcopy(self.odm_build_prop)
2237    build_prop.append(
2238        'import /odm/etc/build_${ro.boot.product.product_name}.prop')
2239
2240    build_std_prop = [
2241        'ro.product.odm.device=coral',
2242    ]
2243    build_pro_prop = [
2244        'ro.product.odm.device=coralpro',
2245    ]
2246
2247    product1_prop = [
2248        'ro.product.odm.name=product1',
2249        'ro.product.not_care=not_care',
2250    ]
2251
2252    product2_prop = [
2253        'ro.product.odm.name=product2',
2254        'ro.product.not_care=not_care',
2255    ]
2256
2257    input_file = self._BuildZipFile({
2258        'ODM/etc/build.prop': '\n'.join(build_prop),
2259        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2260        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2261        'ODM/etc/build_product1.prop': '\n'.join(product1_prop),
2262        'ODM/etc/build_product2.prop': '\n'.join(product2_prop),
2263    })
2264
2265    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2266      placeholder_values = {
2267          'ro.boot.product.device_name': 'std',
2268          'ro.boot.product.product_name': 'product1',
2269          'ro.boot.product.not_care': 'not_care',
2270      }
2271      partition_props = common.PartitionBuildProps.FromInputFile(
2272          input_zip, 'odm', placeholder_values)
2273
2274    self.assertEqual({
2275        'ro.odm.build.date.utc': '1578430045',
2276        'ro.odm.build.fingerprint':
2277        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2278        'ro.product.odm.device': 'coral',
2279        'ro.product.odm.name': 'product1'
2280    }, partition_props.build_props)
2281
2282    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2283      placeholder_values = {
2284          'ro.boot.product.device_name': 'pro',
2285          'ro.boot.product.product_name': 'product2',
2286          'ro.boot.product.not_care': 'not_care',
2287      }
2288      partition_props = common.PartitionBuildProps.FromInputFile(
2289          input_zip, 'odm', placeholder_values)
2290
2291    self.assertEqual({
2292        'ro.odm.build.date.utc': '1578430045',
2293        'ro.odm.build.fingerprint':
2294        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2295        'ro.product.odm.device': 'coralpro',
2296        'ro.product.odm.name': 'product2'
2297    }, partition_props.build_props)
2298
2299  def test_parseBuildProps_defineAfterOverride(self):
2300    build_prop = copy.deepcopy(self.odm_build_prop)
2301    build_prop.append('ro.product.odm.device=coral')
2302
2303    build_std_prop = [
2304        'ro.product.odm.device=coral',
2305    ]
2306    build_pro_prop = [
2307        'ro.product.odm.device=coralpro',
2308    ]
2309
2310    input_file = self._BuildZipFile({
2311        'ODM/etc/build.prop': '\n'.join(build_prop),
2312        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2313        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2314    })
2315
2316    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2317      placeholder_values = {
2318          'ro.boot.product.device_name': 'std',
2319      }
2320
2321      self.assertRaises(ValueError, common.PartitionBuildProps.FromInputFile,
2322                        input_zip, 'odm', placeholder_values)
2323
2324  def test_parseBuildProps_duplicateOverride(self):
2325    build_prop = copy.deepcopy(self.odm_build_prop)
2326    build_prop.append(
2327        'import /odm/etc/build_${ro.boot.product.product_name}.prop')
2328
2329    build_std_prop = [
2330        'ro.product.odm.device=coral',
2331        'ro.product.odm.name=product1',
2332    ]
2333    build_pro_prop = [
2334        'ro.product.odm.device=coralpro',
2335    ]
2336
2337    product1_prop = [
2338        'ro.product.odm.name=product1',
2339    ]
2340
2341    product2_prop = [
2342        'ro.product.odm.name=product2',
2343    ]
2344
2345    input_file = self._BuildZipFile({
2346        'ODM/etc/build.prop': '\n'.join(build_prop),
2347        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2348        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2349        'ODM/etc/build_product1.prop': '\n'.join(product1_prop),
2350        'ODM/etc/build_product2.prop': '\n'.join(product2_prop),
2351    })
2352
2353    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2354      placeholder_values = {
2355          'ro.boot.product.device_name': 'std',
2356          'ro.boot.product.product_name': 'product1',
2357      }
2358      self.assertRaises(ValueError, common.PartitionBuildProps.FromInputFile,
2359                        input_zip, 'odm', placeholder_values)
2360