• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import copy
18import os
19import subprocess
20import tempfile
21import time
22import zipfile
23from hashlib import sha1
24
25import common
26import test_utils
27import validate_target_files
28from images import EmptyImage, DataImage
29from rangelib import RangeSet
30
31
32KiB = 1024
33MiB = 1024 * KiB
34GiB = 1024 * MiB
35
36
37def get_2gb_string():
38  size = int(2 * GiB + 1)
39  block_size = 4 * KiB
40  step_size = 4 * MiB
41  # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
42  for _ in range(0, size, step_size):
43    yield os.urandom(block_size)
44    yield b'\0' * (step_size - block_size)
45
46
47class BuildInfoTest(test_utils.ReleaseToolsTestCase):
48
49  TEST_INFO_DICT = {
50      'build.prop': common.PartitionBuildProps.FromDictionary(
51          'system', {
52              'ro.product.device': 'product-device',
53              'ro.product.name': 'product-name',
54              'ro.build.fingerprint': 'build-fingerprint',
55              'ro.build.foo': 'build-foo'}
56      ),
57      'system.build.prop': common.PartitionBuildProps.FromDictionary(
58          'system', {
59              'ro.product.system.brand': 'product-brand',
60              'ro.product.system.name': 'product-name',
61              'ro.product.system.device': 'product-device',
62              'ro.system.build.version.release': 'version-release',
63              'ro.system.build.id': 'build-id',
64              'ro.system.build.version.incremental': 'version-incremental',
65              'ro.system.build.type': 'build-type',
66              'ro.system.build.tags': 'build-tags',
67              'ro.system.build.foo': 'build-foo'}
68      ),
69      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
70          'vendor', {
71              'ro.product.vendor.brand': 'vendor-product-brand',
72              'ro.product.vendor.name': 'vendor-product-name',
73              'ro.product.vendor.device': 'vendor-product-device',
74              'ro.vendor.build.version.release': 'vendor-version-release',
75              'ro.vendor.build.id': 'vendor-build-id',
76              'ro.vendor.build.version.incremental':
77              'vendor-version-incremental',
78              'ro.vendor.build.type': 'vendor-build-type',
79              'ro.vendor.build.tags': 'vendor-build-tags'}
80      ),
81      'property1': 'value1',
82      'property2': 4096,
83  }
84
85  TEST_INFO_DICT_USES_OEM_PROPS = {
86      'build.prop': common.PartitionBuildProps.FromDictionary(
87          'system', {
88              'ro.product.name': 'product-name',
89              'ro.build.thumbprint': 'build-thumbprint',
90              'ro.build.bar': 'build-bar'}
91      ),
92      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
93          'vendor', {
94              'ro.vendor.build.fingerprint': 'vendor-build-fingerprint'}
95      ),
96      'property1': 'value1',
97      'property2': 4096,
98      'oem_fingerprint_properties': 'ro.product.device ro.product.brand',
99  }
100
101  TEST_OEM_DICTS = [
102      {
103          'ro.product.brand': 'brand1',
104          'ro.product.device': 'device1',
105      },
106      {
107          'ro.product.brand': 'brand2',
108          'ro.product.device': 'device2',
109      },
110      {
111          'ro.product.brand': 'brand3',
112          'ro.product.device': 'device3',
113      },
114  ]
115
116  TEST_INFO_DICT_PROPERTY_SOURCE_ORDER = {
117      'build.prop': common.PartitionBuildProps.FromDictionary(
118          'system', {
119              'ro.build.fingerprint': 'build-fingerprint',
120              'ro.product.property_source_order':
121                  'product,odm,vendor,system_ext,system'}
122      ),
123      'system.build.prop': common.PartitionBuildProps.FromDictionary(
124          'system', {
125              'ro.product.system.device': 'system-product-device'}
126      ),
127      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
128          'vendor', {
129              'ro.product.vendor.device': 'vendor-product-device'}
130      ),
131  }
132
133  TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_10 = {
134      'build.prop': common.PartitionBuildProps.FromDictionary(
135          'system', {
136              'ro.build.fingerprint': 'build-fingerprint',
137              'ro.product.property_source_order':
138                  'product,product_services,odm,vendor,system',
139              'ro.build.version.release': '10',
140              'ro.build.version.codename': 'REL'}
141      ),
142      'system.build.prop': common.PartitionBuildProps.FromDictionary(
143          'system', {
144              'ro.product.system.device': 'system-product-device'}
145      ),
146      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
147          'vendor', {
148              'ro.product.vendor.device': 'vendor-product-device'}
149      ),
150  }
151
152  TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_9 = {
153      'build.prop': common.PartitionBuildProps.FromDictionary(
154          'system', {
155              'ro.product.device': 'product-device',
156              'ro.build.fingerprint': 'build-fingerprint',
157              'ro.build.version.release': '9',
158              'ro.build.version.codename': 'REL'}
159      ),
160      'system.build.prop': common.PartitionBuildProps.FromDictionary(
161          'system', {
162              'ro.product.system.device': 'system-product-device'}
163      ),
164      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
165          'vendor', {
166              'ro.product.vendor.device': 'vendor-product-device'}
167      ),
168  }
169
170  def test_init(self):
171    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
172    self.assertEqual('product-device', target_info.device)
173    self.assertEqual('build-fingerprint', target_info.fingerprint)
174    self.assertFalse(target_info.is_ab)
175    self.assertIsNone(target_info.oem_props)
176
177  def test_init_with_oem_props(self):
178    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
179                                   self.TEST_OEM_DICTS)
180    self.assertEqual('device1', target_info.device)
181    self.assertEqual('brand1/product-name/device1:build-thumbprint',
182                     target_info.fingerprint)
183
184    # Swap the order in oem_dicts, which would lead to different BuildInfo.
185    oem_dicts = copy.copy(self.TEST_OEM_DICTS)
186    oem_dicts[0], oem_dicts[2] = oem_dicts[2], oem_dicts[0]
187    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
188                                   oem_dicts)
189    self.assertEqual('device3', target_info.device)
190    self.assertEqual('brand3/product-name/device3:build-thumbprint',
191                     target_info.fingerprint)
192
193  def test_init_badFingerprint(self):
194    info_dict = copy.deepcopy(self.TEST_INFO_DICT)
195    info_dict['build.prop'].build_props[
196        'ro.build.fingerprint'] = 'bad fingerprint'
197    self.assertRaises(ValueError, common.BuildInfo, info_dict, None)
198
199    info_dict['build.prop'].build_props[
200        'ro.build.fingerprint'] = 'bad\x80fingerprint'
201    self.assertRaises(ValueError, common.BuildInfo, info_dict, None)
202
203  def test___getitem__(self):
204    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
205    self.assertEqual('value1', target_info['property1'])
206    self.assertEqual(4096, target_info['property2'])
207    self.assertEqual('build-foo',
208                     target_info['build.prop'].GetProp('ro.build.foo'))
209
210  def test___getitem__with_oem_props(self):
211    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
212                                   self.TEST_OEM_DICTS)
213    self.assertEqual('value1', target_info['property1'])
214    self.assertEqual(4096, target_info['property2'])
215    self.assertIsNone(target_info['build.prop'].GetProp('ro.build.foo'))
216
217  def test___setitem__(self):
218    target_info = common.BuildInfo(copy.deepcopy(self.TEST_INFO_DICT), None)
219    self.assertEqual('value1', target_info['property1'])
220    target_info['property1'] = 'value2'
221    self.assertEqual('value2', target_info['property1'])
222
223    self.assertEqual('build-foo',
224                     target_info['build.prop'].GetProp('ro.build.foo'))
225    target_info['build.prop'].build_props['ro.build.foo'] = 'build-bar'
226    self.assertEqual('build-bar',
227                     target_info['build.prop'].GetProp('ro.build.foo'))
228
229  def test_get(self):
230    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
231    self.assertEqual('value1', target_info.get('property1'))
232    self.assertEqual(4096, target_info.get('property2'))
233    self.assertEqual(4096, target_info.get('property2', 1024))
234    self.assertEqual(1024, target_info.get('property-nonexistent', 1024))
235    self.assertEqual('build-foo',
236                     target_info.get('build.prop').GetProp('ro.build.foo'))
237
238  def test_get_with_oem_props(self):
239    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
240                                   self.TEST_OEM_DICTS)
241    self.assertEqual('value1', target_info.get('property1'))
242    self.assertEqual(4096, target_info.get('property2'))
243    self.assertEqual(4096, target_info.get('property2', 1024))
244    self.assertEqual(1024, target_info.get('property-nonexistent', 1024))
245    self.assertIsNone(target_info.get('build.prop').GetProp('ro.build.foo'))
246
247  def test_items(self):
248    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
249    items = target_info.items()
250    self.assertIn(('property1', 'value1'), items)
251    self.assertIn(('property2', 4096), items)
252
253  def test_GetBuildProp(self):
254    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
255    self.assertEqual('build-foo', target_info.GetBuildProp('ro.build.foo'))
256    self.assertRaises(common.ExternalError, target_info.GetBuildProp,
257                      'ro.build.nonexistent')
258
259  def test_GetBuildProp_with_oem_props(self):
260    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
261                                   self.TEST_OEM_DICTS)
262    self.assertEqual('build-bar', target_info.GetBuildProp('ro.build.bar'))
263    self.assertRaises(common.ExternalError, target_info.GetBuildProp,
264                      'ro.build.nonexistent')
265
266  def test_GetPartitionFingerprint(self):
267    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
268    self.assertEqual(
269        target_info.GetPartitionFingerprint('vendor'),
270        'vendor-product-brand/vendor-product-name/vendor-product-device'
271        ':vendor-version-release/vendor-build-id/vendor-version-incremental'
272        ':vendor-build-type/vendor-build-tags')
273
274  def test_GetPartitionFingerprint_system_other_uses_system(self):
275    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
276    self.assertEqual(
277        target_info.GetPartitionFingerprint('system_other'),
278        target_info.GetPartitionFingerprint('system'))
279
280  def test_GetPartitionFingerprint_uses_fingerprint_prop_if_available(self):
281    info_dict = copy.deepcopy(self.TEST_INFO_DICT)
282    info_dict['vendor.build.prop'].build_props[
283        'ro.vendor.build.fingerprint'] = 'vendor:fingerprint'
284    target_info = common.BuildInfo(info_dict, None)
285    self.assertEqual(
286        target_info.GetPartitionFingerprint('vendor'),
287        'vendor:fingerprint')
288
289  def test_WriteMountOemScript(self):
290    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
291                                   self.TEST_OEM_DICTS)
292    script_writer = test_utils.MockScriptWriter()
293    target_info.WriteMountOemScript(script_writer)
294    self.assertEqual([('Mount', '/oem', None)], script_writer.lines)
295
296  def test_WriteDeviceAssertions(self):
297    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
298    script_writer = test_utils.MockScriptWriter()
299    target_info.WriteDeviceAssertions(script_writer, False)
300    self.assertEqual([('AssertDevice', 'product-device')], script_writer.lines)
301
302  def test_WriteDeviceAssertions_with_oem_props(self):
303    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
304                                   self.TEST_OEM_DICTS)
305    script_writer = test_utils.MockScriptWriter()
306    target_info.WriteDeviceAssertions(script_writer, False)
307    self.assertEqual(
308        [
309            ('AssertOemProperty', 'ro.product.device',
310             ['device1', 'device2', 'device3'], False),
311            ('AssertOemProperty', 'ro.product.brand',
312             ['brand1', 'brand2', 'brand3'], False),
313        ],
314        script_writer.lines)
315
316  def test_ResolveRoProductProperty_FromVendor(self):
317    info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER)
318    info = common.BuildInfo(info_dict, None)
319    self.assertEqual('vendor-product-device',
320                     info.GetBuildProp('ro.product.device'))
321
322  def test_ResolveRoProductProperty_FromSystem(self):
323    info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER)
324    del info_dict['vendor.build.prop'].build_props['ro.product.vendor.device']
325    info = common.BuildInfo(info_dict, None)
326    self.assertEqual('system-product-device',
327                     info.GetBuildProp('ro.product.device'))
328
329  def test_ResolveRoProductProperty_InvalidPropertySearchOrder(self):
330    info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER)
331    info_dict['build.prop'].build_props[
332        'ro.product.property_source_order'] = 'bad-source'
333    with self.assertRaisesRegexp(common.ExternalError,
334        'Invalid ro.product.property_source_order'):
335      info = common.BuildInfo(info_dict, None)
336      info.GetBuildProp('ro.product.device')
337
338  def test_ResolveRoProductProperty_Android10PropertySearchOrder(self):
339    info_dict = copy.deepcopy(
340        self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_10)
341    info = common.BuildInfo(info_dict, None)
342    self.assertEqual('vendor-product-device',
343                     info.GetBuildProp('ro.product.device'))
344
345  def test_ResolveRoProductProperty_Android9PropertySearchOrder(self):
346    info_dict = copy.deepcopy(
347        self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_9)
348    info = common.BuildInfo(info_dict, None)
349    self.assertEqual('product-device',
350                     info.GetBuildProp('ro.product.device'))
351
352
353class CommonZipTest(test_utils.ReleaseToolsTestCase):
354
355  def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
356              test_file_name=None, expected_stat=None, expected_mode=0o644,
357              expected_compress_type=zipfile.ZIP_STORED):
358    # Verify the stat if present.
359    if test_file_name is not None:
360      new_stat = os.stat(test_file_name)
361      self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
362      self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
363
364    # Reopen the zip file to verify.
365    zip_file = zipfile.ZipFile(zip_file_name, "r")
366
367    # Verify the timestamp.
368    info = zip_file.getinfo(arcname)
369    self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
370
371    # Verify the file mode.
372    mode = (info.external_attr >> 16) & 0o777
373    self.assertEqual(mode, expected_mode)
374
375    # Verify the compress type.
376    self.assertEqual(info.compress_type, expected_compress_type)
377
378    # Verify the zip contents.
379    entry = zip_file.open(arcname)
380    sha1_hash = sha1()
381    for chunk in iter(lambda: entry.read(4 * MiB), b''):
382      sha1_hash.update(chunk)
383    self.assertEqual(expected_hash, sha1_hash.hexdigest())
384    self.assertIsNone(zip_file.testzip())
385
386  def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
387    extra_zipwrite_args = dict(extra_zipwrite_args or {})
388
389    test_file = tempfile.NamedTemporaryFile(delete=False)
390    test_file_name = test_file.name
391
392    zip_file = tempfile.NamedTemporaryFile(delete=False)
393    zip_file_name = zip_file.name
394
395    # File names within an archive strip the leading slash.
396    arcname = extra_zipwrite_args.get("arcname", test_file_name)
397    if arcname[0] == "/":
398      arcname = arcname[1:]
399
400    zip_file.close()
401    zip_file = zipfile.ZipFile(zip_file_name, "w")
402
403    try:
404      sha1_hash = sha1()
405      for data in contents:
406        sha1_hash.update(bytes(data))
407        test_file.write(bytes(data))
408      test_file.close()
409
410      expected_stat = os.stat(test_file_name)
411      expected_mode = extra_zipwrite_args.get("perms", 0o644)
412      expected_compress_type = extra_zipwrite_args.get("compress_type",
413                                                       zipfile.ZIP_STORED)
414      time.sleep(5)  # Make sure the atime/mtime will change measurably.
415
416      common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
417      common.ZipClose(zip_file)
418
419      self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
420                   test_file_name, expected_stat, expected_mode,
421                   expected_compress_type)
422    finally:
423      os.remove(test_file_name)
424      os.remove(zip_file_name)
425
426  def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
427    extra_args = dict(extra_args or {})
428
429    zip_file = tempfile.NamedTemporaryFile(delete=False)
430    zip_file_name = zip_file.name
431    zip_file.close()
432
433    zip_file = zipfile.ZipFile(zip_file_name, "w")
434
435    try:
436      expected_compress_type = extra_args.get("compress_type",
437                                              zipfile.ZIP_STORED)
438      time.sleep(5)  # Make sure the atime/mtime will change measurably.
439
440      if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
441        arcname = zinfo_or_arcname
442        expected_mode = extra_args.get("perms", 0o644)
443      else:
444        arcname = zinfo_or_arcname.filename
445        if zinfo_or_arcname.external_attr:
446          zinfo_perms = zinfo_or_arcname.external_attr >> 16
447        else:
448          zinfo_perms = 0o600
449        expected_mode = extra_args.get("perms", zinfo_perms)
450
451      common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
452      common.ZipClose(zip_file)
453
454      self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
455                   expected_mode=expected_mode,
456                   expected_compress_type=expected_compress_type)
457    finally:
458      os.remove(zip_file_name)
459
460  def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
461    extra_args = dict(extra_args or {})
462
463    zip_file = tempfile.NamedTemporaryFile(delete=False)
464    zip_file_name = zip_file.name
465
466    test_file = tempfile.NamedTemporaryFile(delete=False)
467    test_file_name = test_file.name
468
469    arcname_large = test_file_name
470    arcname_small = "bar"
471
472    # File names within an archive strip the leading slash.
473    if arcname_large[0] == "/":
474      arcname_large = arcname_large[1:]
475
476    zip_file.close()
477    zip_file = zipfile.ZipFile(zip_file_name, "w")
478
479    try:
480      sha1_hash = sha1()
481      for data in large:
482        sha1_hash.update(data)
483        test_file.write(data)
484      test_file.close()
485
486      expected_stat = os.stat(test_file_name)
487      expected_mode = 0o644
488      expected_compress_type = extra_args.get("compress_type",
489                                              zipfile.ZIP_STORED)
490      time.sleep(5)  # Make sure the atime/mtime will change measurably.
491
492      common.ZipWrite(zip_file, test_file_name, **extra_args)
493      common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
494      common.ZipClose(zip_file)
495
496      # Verify the contents written by ZipWrite().
497      self._verify(zip_file, zip_file_name, arcname_large,
498                   sha1_hash.hexdigest(), test_file_name, expected_stat,
499                   expected_mode, expected_compress_type)
500
501      # Verify the contents written by ZipWriteStr().
502      self._verify(zip_file, zip_file_name, arcname_small,
503                   sha1(small).hexdigest(),
504                   expected_compress_type=expected_compress_type)
505    finally:
506      os.remove(zip_file_name)
507      os.remove(test_file_name)
508
509  def _test_reset_ZIP64_LIMIT(self, func, *args):
510    default_limit = (1 << 31) - 1
511    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
512    func(*args)
513    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
514
515  def test_ZipWrite(self):
516    file_contents = os.urandom(1024)
517    self._test_ZipWrite(file_contents)
518
519  def test_ZipWrite_with_opts(self):
520    file_contents = os.urandom(1024)
521    self._test_ZipWrite(file_contents, {
522        "arcname": "foobar",
523        "perms": 0o777,
524        "compress_type": zipfile.ZIP_DEFLATED,
525    })
526    self._test_ZipWrite(file_contents, {
527        "arcname": "foobar",
528        "perms": 0o700,
529        "compress_type": zipfile.ZIP_STORED,
530    })
531
532  def test_ZipWrite_large_file(self):
533    file_contents = get_2gb_string()
534    self._test_ZipWrite(file_contents, {
535        "compress_type": zipfile.ZIP_DEFLATED,
536    })
537
538  def test_ZipWrite_resets_ZIP64_LIMIT(self):
539    self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
540
541  def test_ZipWriteStr(self):
542    random_string = os.urandom(1024)
543    # Passing arcname
544    self._test_ZipWriteStr("foo", random_string)
545
546    # Passing zinfo
547    zinfo = zipfile.ZipInfo(filename="foo")
548    self._test_ZipWriteStr(zinfo, random_string)
549
550    # Timestamp in the zinfo should be overwritten.
551    zinfo.date_time = (2015, 3, 1, 15, 30, 0)
552    self._test_ZipWriteStr(zinfo, random_string)
553
554  def test_ZipWriteStr_with_opts(self):
555    random_string = os.urandom(1024)
556    # Passing arcname
557    self._test_ZipWriteStr("foo", random_string, {
558        "perms": 0o700,
559        "compress_type": zipfile.ZIP_DEFLATED,
560    })
561    self._test_ZipWriteStr("bar", random_string, {
562        "compress_type": zipfile.ZIP_STORED,
563    })
564
565    # Passing zinfo
566    zinfo = zipfile.ZipInfo(filename="foo")
567    self._test_ZipWriteStr(zinfo, random_string, {
568        "compress_type": zipfile.ZIP_DEFLATED,
569    })
570    self._test_ZipWriteStr(zinfo, random_string, {
571        "perms": 0o600,
572        "compress_type": zipfile.ZIP_STORED,
573    })
574    self._test_ZipWriteStr(zinfo, random_string, {
575        "perms": 0o000,
576        "compress_type": zipfile.ZIP_STORED,
577    })
578
579  def test_ZipWriteStr_large_file(self):
580    # zipfile.writestr() doesn't work when the str size is over 2GiB even with
581    # the workaround. We will only test the case of writing a string into a
582    # large archive.
583    long_string = get_2gb_string()
584    short_string = os.urandom(1024)
585    self._test_ZipWriteStr_large_file(long_string, short_string, {
586        "compress_type": zipfile.ZIP_DEFLATED,
587    })
588
589  def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
590    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, 'foo', b'')
591    zinfo = zipfile.ZipInfo(filename="foo")
592    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, b'')
593
594  def test_bug21309935(self):
595    zip_file = tempfile.NamedTemporaryFile(delete=False)
596    zip_file_name = zip_file.name
597    zip_file.close()
598
599    try:
600      random_string = os.urandom(1024)
601      zip_file = zipfile.ZipFile(zip_file_name, "w")
602      # Default perms should be 0o644 when passing the filename.
603      common.ZipWriteStr(zip_file, "foo", random_string)
604      # Honor the specified perms.
605      common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
606      # The perms in zinfo should be untouched.
607      zinfo = zipfile.ZipInfo(filename="baz")
608      zinfo.external_attr = 0o740 << 16
609      common.ZipWriteStr(zip_file, zinfo, random_string)
610      # Explicitly specified perms has the priority.
611      zinfo = zipfile.ZipInfo(filename="qux")
612      zinfo.external_attr = 0o700 << 16
613      common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
614      common.ZipClose(zip_file)
615
616      self._verify(zip_file, zip_file_name, "foo",
617                   sha1(random_string).hexdigest(),
618                   expected_mode=0o644)
619      self._verify(zip_file, zip_file_name, "bar",
620                   sha1(random_string).hexdigest(),
621                   expected_mode=0o755)
622      self._verify(zip_file, zip_file_name, "baz",
623                   sha1(random_string).hexdigest(),
624                   expected_mode=0o740)
625      self._verify(zip_file, zip_file_name, "qux",
626                   sha1(random_string).hexdigest(),
627                   expected_mode=0o400)
628    finally:
629      os.remove(zip_file_name)
630
631  @test_utils.SkipIfExternalToolsUnavailable()
632  def test_ZipDelete(self):
633    zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
634    output_zip = zipfile.ZipFile(zip_file.name, 'w',
635                                 compression=zipfile.ZIP_DEFLATED)
636    with tempfile.NamedTemporaryFile() as entry_file:
637      entry_file.write(os.urandom(1024))
638      common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
639      common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
640      common.ZipWrite(output_zip, entry_file.name, arcname='Test3')
641      common.ZipClose(output_zip)
642    zip_file.close()
643
644    try:
645      common.ZipDelete(zip_file.name, 'Test2')
646      with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
647        entries = check_zip.namelist()
648        self.assertTrue('Test1' in entries)
649        self.assertFalse('Test2' in entries)
650        self.assertTrue('Test3' in entries)
651
652      self.assertRaises(
653          common.ExternalError, common.ZipDelete, zip_file.name, 'Test2')
654      with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
655        entries = check_zip.namelist()
656        self.assertTrue('Test1' in entries)
657        self.assertFalse('Test2' in entries)
658        self.assertTrue('Test3' in entries)
659
660      common.ZipDelete(zip_file.name, ['Test3'])
661      with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
662        entries = check_zip.namelist()
663        self.assertTrue('Test1' in entries)
664        self.assertFalse('Test2' in entries)
665        self.assertFalse('Test3' in entries)
666
667      common.ZipDelete(zip_file.name, ['Test1', 'Test2'])
668      with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
669        entries = check_zip.namelist()
670        self.assertFalse('Test1' in entries)
671        self.assertFalse('Test2' in entries)
672        self.assertFalse('Test3' in entries)
673    finally:
674      os.remove(zip_file.name)
675
676  @staticmethod
677  def _test_UnzipTemp_createZipFile():
678    zip_file = common.MakeTempFile(suffix='.zip')
679    output_zip = zipfile.ZipFile(
680        zip_file, 'w', compression=zipfile.ZIP_DEFLATED)
681    contents = os.urandom(1024)
682    with tempfile.NamedTemporaryFile() as entry_file:
683      entry_file.write(contents)
684      common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
685      common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
686      common.ZipWrite(output_zip, entry_file.name, arcname='Foo3')
687      common.ZipWrite(output_zip, entry_file.name, arcname='Bar4')
688      common.ZipWrite(output_zip, entry_file.name, arcname='Dir5/Baz5')
689      common.ZipClose(output_zip)
690    common.ZipClose(output_zip)
691    return zip_file
692
693  @test_utils.SkipIfExternalToolsUnavailable()
694  def test_UnzipTemp(self):
695    zip_file = self._test_UnzipTemp_createZipFile()
696    unzipped_dir = common.UnzipTemp(zip_file)
697    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
698    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
699    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
700    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
701    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
702
703  @test_utils.SkipIfExternalToolsUnavailable()
704  def test_UnzipTemp_withPatterns(self):
705    zip_file = self._test_UnzipTemp_createZipFile()
706
707    unzipped_dir = common.UnzipTemp(zip_file, ['Test1'])
708    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
709    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
710    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
711    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
712    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
713
714    unzipped_dir = common.UnzipTemp(zip_file, ['Test1', 'Foo3'])
715    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
716    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
717    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
718    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
719    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
720
721    unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Foo3*'])
722    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
723    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
724    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
725    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
726    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
727
728    unzipped_dir = common.UnzipTemp(zip_file, ['*Test1', '*Baz*'])
729    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
730    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
731    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
732    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
733    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
734
735  def test_UnzipTemp_withEmptyPatterns(self):
736    zip_file = self._test_UnzipTemp_createZipFile()
737    unzipped_dir = common.UnzipTemp(zip_file, [])
738    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
739    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
740    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
741    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
742    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
743
744  @test_utils.SkipIfExternalToolsUnavailable()
745  def test_UnzipTemp_withPartiallyMatchingPatterns(self):
746    zip_file = self._test_UnzipTemp_createZipFile()
747    unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Nonexistent*'])
748    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
749    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
750    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
751    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
752    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
753
754  def test_UnzipTemp_withNoMatchingPatterns(self):
755    zip_file = self._test_UnzipTemp_createZipFile()
756    unzipped_dir = common.UnzipTemp(zip_file, ['Foo4', 'Nonexistent*'])
757    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
758    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
759    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
760    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
761    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
762
763
764class CommonApkUtilsTest(test_utils.ReleaseToolsTestCase):
765  """Tests the APK utils related functions."""
766
767  APKCERTS_TXT1 = (
768      'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
769      ' private_key="certs/devkey.pk8"\n'
770      'name="Settings.apk"'
771      ' certificate="build/make/target/product/security/platform.x509.pem"'
772      ' private_key="build/make/target/product/security/platform.pk8"\n'
773      'name="TV.apk" certificate="PRESIGNED" private_key=""\n'
774  )
775
776  APKCERTS_CERTMAP1 = {
777      'RecoveryLocalizer.apk' : 'certs/devkey',
778      'Settings.apk' : 'build/make/target/product/security/platform',
779      'TV.apk' : 'PRESIGNED',
780  }
781
782  APKCERTS_TXT2 = (
783      'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"'
784      ' private_key="certs/compressed1.pk8" compressed="gz"\n'
785      'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"'
786      ' private_key="certs/compressed2.pk8" compressed="gz"\n'
787      'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"'
788      ' private_key="certs/compressed2.pk8" compressed="gz"\n'
789      'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"'
790      ' private_key="certs/compressed3.pk8" compressed="gz"\n'
791  )
792
793  APKCERTS_CERTMAP2 = {
794      'Compressed1.apk' : 'certs/compressed1',
795      'Compressed2a.apk' : 'certs/compressed2',
796      'Compressed2b.apk' : 'certs/compressed2',
797      'Compressed3.apk' : 'certs/compressed3',
798  }
799
800  APKCERTS_TXT3 = (
801      'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"'
802      ' private_key="certs/compressed4.pk8" compressed="xz"\n'
803  )
804
805  APKCERTS_CERTMAP3 = {
806      'Compressed4.apk' : 'certs/compressed4',
807  }
808
809  # Test parsing with no optional fields, both optional fields, and only the
810  # partition optional field.
811  APKCERTS_TXT4 = (
812      'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
813      ' private_key="certs/devkey.pk8"\n'
814      'name="Settings.apk"'
815      ' certificate="build/make/target/product/security/platform.x509.pem"'
816      ' private_key="build/make/target/product/security/platform.pk8"'
817      ' compressed="gz" partition="system"\n'
818      'name="TV.apk" certificate="PRESIGNED" private_key=""'
819      ' partition="product"\n'
820  )
821
822  APKCERTS_CERTMAP4 = {
823      'RecoveryLocalizer.apk' : 'certs/devkey',
824      'Settings.apk' : 'build/make/target/product/security/platform',
825      'TV.apk' : 'PRESIGNED',
826  }
827
828  def setUp(self):
829    self.testdata_dir = test_utils.get_testdata_dir()
830
831  @staticmethod
832  def _write_apkcerts_txt(apkcerts_txt, additional=None):
833    if additional is None:
834      additional = []
835    target_files = common.MakeTempFile(suffix='.zip')
836    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
837      target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt)
838      for entry in additional:
839        target_files_zip.writestr(entry, '')
840    return target_files
841
842  def test_ReadApkCerts_NoncompressedApks(self):
843    target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1)
844    with zipfile.ZipFile(target_files, 'r') as input_zip:
845      certmap, ext = common.ReadApkCerts(input_zip)
846
847    self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap)
848    self.assertIsNone(ext)
849
850  def test_ReadApkCerts_CompressedApks(self):
851    # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is
852    # not stored in '.gz' format, so it shouldn't be considered as installed.
853    target_files = self._write_apkcerts_txt(
854        self.APKCERTS_TXT2,
855        ['Compressed1.apk.gz', 'Compressed3.apk'])
856
857    with zipfile.ZipFile(target_files, 'r') as input_zip:
858      certmap, ext = common.ReadApkCerts(input_zip)
859
860    self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap)
861    self.assertEqual('.gz', ext)
862
863    # Alternative case with '.xz'.
864    target_files = self._write_apkcerts_txt(
865        self.APKCERTS_TXT3, ['Compressed4.apk.xz'])
866
867    with zipfile.ZipFile(target_files, 'r') as input_zip:
868      certmap, ext = common.ReadApkCerts(input_zip)
869
870    self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap)
871    self.assertEqual('.xz', ext)
872
873  def test_ReadApkCerts_CompressedAndNoncompressedApks(self):
874    target_files = self._write_apkcerts_txt(
875        self.APKCERTS_TXT1 + self.APKCERTS_TXT2,
876        ['Compressed1.apk.gz', 'Compressed3.apk'])
877
878    with zipfile.ZipFile(target_files, 'r') as input_zip:
879      certmap, ext = common.ReadApkCerts(input_zip)
880
881    certmap_merged = self.APKCERTS_CERTMAP1.copy()
882    certmap_merged.update(self.APKCERTS_CERTMAP2)
883    self.assertDictEqual(certmap_merged, certmap)
884    self.assertEqual('.gz', ext)
885
886  def test_ReadApkCerts_MultipleCompressionMethods(self):
887    target_files = self._write_apkcerts_txt(
888        self.APKCERTS_TXT2 + self.APKCERTS_TXT3,
889        ['Compressed1.apk.gz', 'Compressed4.apk.xz'])
890
891    with zipfile.ZipFile(target_files, 'r') as input_zip:
892      self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
893
894  def test_ReadApkCerts_MismatchingKeys(self):
895    malformed_apkcerts_txt = (
896        'name="App1.apk" certificate="certs/cert1.x509.pem"'
897        ' private_key="certs/cert2.pk8"\n'
898    )
899    target_files = self._write_apkcerts_txt(malformed_apkcerts_txt)
900
901    with zipfile.ZipFile(target_files, 'r') as input_zip:
902      self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
903
904  def test_ReadApkCerts_WithWithoutOptionalFields(self):
905    target_files = self._write_apkcerts_txt(self.APKCERTS_TXT4)
906    with zipfile.ZipFile(target_files, 'r') as input_zip:
907      certmap, ext = common.ReadApkCerts(input_zip)
908
909    self.assertDictEqual(self.APKCERTS_CERTMAP4, certmap)
910    self.assertIsNone(ext)
911
912  def test_ExtractPublicKey(self):
913    cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
914    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
915    with open(pubkey) as pubkey_fp:
916      self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert))
917
918  def test_ExtractPublicKey_invalidInput(self):
919    wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8')
920    self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input)
921
922  @test_utils.SkipIfExternalToolsUnavailable()
923  def test_ExtractAvbPublicKey(self):
924    privkey = os.path.join(self.testdata_dir, 'testkey.key')
925    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
926    extracted_from_privkey = common.ExtractAvbPublicKey('avbtool', privkey)
927    extracted_from_pubkey = common.ExtractAvbPublicKey('avbtool', pubkey)
928    with open(extracted_from_privkey, 'rb') as privkey_fp, \
929        open(extracted_from_pubkey, 'rb') as pubkey_fp:
930      self.assertEqual(privkey_fp.read(), pubkey_fp.read())
931
932  def test_ParseCertificate(self):
933    cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
934
935    cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER']
936    proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
937                      universal_newlines=False)
938    expected, _ = proc.communicate()
939    self.assertEqual(0, proc.returncode)
940
941    with open(cert) as cert_fp:
942      actual = common.ParseCertificate(cert_fp.read())
943    self.assertEqual(expected, actual)
944
945  @test_utils.SkipIfExternalToolsUnavailable()
946  def test_GetMinSdkVersion(self):
947    test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
948    self.assertEqual('24', common.GetMinSdkVersion(test_app))
949
950  @test_utils.SkipIfExternalToolsUnavailable()
951  def test_GetMinSdkVersion_invalidInput(self):
952    self.assertRaises(
953        common.ExternalError, common.GetMinSdkVersion, 'does-not-exist.apk')
954
955  @test_utils.SkipIfExternalToolsUnavailable()
956  def test_GetMinSdkVersionInt(self):
957    test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
958    self.assertEqual(24, common.GetMinSdkVersionInt(test_app, {}))
959
960  @test_utils.SkipIfExternalToolsUnavailable()
961  def test_GetMinSdkVersionInt_invalidInput(self):
962    self.assertRaises(
963        common.ExternalError, common.GetMinSdkVersionInt, 'does-not-exist.apk',
964        {})
965
966
967class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
968
969  def setUp(self):
970    self.testdata_dir = test_utils.get_testdata_dir()
971
972  @test_utils.SkipIfExternalToolsUnavailable()
973  def test_GetSparseImage_emptyBlockMapFile(self):
974    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
975    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
976      target_files_zip.write(
977          test_utils.construct_sparse_image([
978              (0xCAC1, 6),
979              (0xCAC3, 3),
980              (0xCAC1, 4)]),
981          arcname='IMAGES/system.img')
982      target_files_zip.writestr('IMAGES/system.map', '')
983      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
984      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
985
986    tempdir = common.UnzipTemp(target_files)
987    with zipfile.ZipFile(target_files, 'r') as input_zip:
988      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
989
990    self.assertDictEqual(
991        {
992            '__COPY': RangeSet("0"),
993            '__NONZERO-0': RangeSet("1-5 9-12"),
994        },
995        sparse_image.file_map)
996
997  def test_GetSparseImage_missingImageFile(self):
998    self.assertRaises(
999        AssertionError, common.GetSparseImage, 'system2', self.testdata_dir,
1000        None, False)
1001    self.assertRaises(
1002        AssertionError, common.GetSparseImage, 'unknown', self.testdata_dir,
1003        None, False)
1004
1005  @test_utils.SkipIfExternalToolsUnavailable()
1006  def test_GetSparseImage_missingBlockMapFile(self):
1007    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1008    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
1009      target_files_zip.write(
1010          test_utils.construct_sparse_image([
1011              (0xCAC1, 6),
1012              (0xCAC3, 3),
1013              (0xCAC1, 4)]),
1014          arcname='IMAGES/system.img')
1015      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
1016      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1017
1018    tempdir = common.UnzipTemp(target_files)
1019    with zipfile.ZipFile(target_files, 'r') as input_zip:
1020      self.assertRaises(
1021          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
1022          False)
1023
1024  @test_utils.SkipIfExternalToolsUnavailable()
1025  def test_GetSparseImage_sharedBlocks_notAllowed(self):
1026    """Tests the case of having overlapping blocks but disallowed."""
1027    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1028    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
1029      target_files_zip.write(
1030          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1031          arcname='IMAGES/system.img')
1032      # Block 10 is shared between two files.
1033      target_files_zip.writestr(
1034          'IMAGES/system.map',
1035          '\n'.join([
1036              '/system/file1 1-5 9-10',
1037              '/system/file2 10-12']))
1038      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1039      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1040
1041    tempdir = common.UnzipTemp(target_files)
1042    with zipfile.ZipFile(target_files, 'r') as input_zip:
1043      self.assertRaises(
1044          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
1045          False)
1046
1047  @test_utils.SkipIfExternalToolsUnavailable()
1048  def test_GetSparseImage_sharedBlocks_allowed(self):
1049    """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true."""
1050    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1051    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
1052      # Construct an image with a care_map of "0-5 9-12".
1053      target_files_zip.write(
1054          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1055          arcname='IMAGES/system.img')
1056      # Block 10 is shared between two files.
1057      target_files_zip.writestr(
1058          'IMAGES/system.map',
1059          '\n'.join([
1060              '/system/file1 1-5 9-10',
1061              '/system/file2 10-12']))
1062      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1063      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1064
1065    tempdir = common.UnzipTemp(target_files)
1066    with zipfile.ZipFile(target_files, 'r') as input_zip:
1067      sparse_image = common.GetSparseImage('system', tempdir, input_zip, True)
1068
1069    self.assertDictEqual(
1070        {
1071            '__COPY': RangeSet("0"),
1072            '__NONZERO-0': RangeSet("6-8 13-15"),
1073            '/system/file1': RangeSet("1-5 9-10"),
1074            '/system/file2': RangeSet("11-12"),
1075        },
1076        sparse_image.file_map)
1077
1078    # '/system/file2' should be marked with 'uses_shared_blocks', but not with
1079    # 'incomplete'.
1080    self.assertTrue(
1081        sparse_image.file_map['/system/file2'].extra['uses_shared_blocks'])
1082    self.assertNotIn(
1083        'incomplete', sparse_image.file_map['/system/file2'].extra)
1084
1085    # '/system/file1' will only contain one field -- a copy of the input text.
1086    self.assertEqual(1, len(sparse_image.file_map['/system/file1'].extra))
1087
1088    # Meta entries should not have any extra tag.
1089    self.assertFalse(sparse_image.file_map['__COPY'].extra)
1090    self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra)
1091
1092  @test_utils.SkipIfExternalToolsUnavailable()
1093  def test_GetSparseImage_incompleteRanges(self):
1094    """Tests the case of ext4 images with holes."""
1095    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1096    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
1097      target_files_zip.write(
1098          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1099          arcname='IMAGES/system.img')
1100      target_files_zip.writestr(
1101          'IMAGES/system.map',
1102          '\n'.join([
1103              '/system/file1 1-5 9-10',
1104              '/system/file2 11-12']))
1105      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1106      # '/system/file2' has less blocks listed (2) than actual (3).
1107      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1108
1109    tempdir = common.UnzipTemp(target_files)
1110    with zipfile.ZipFile(target_files, 'r') as input_zip:
1111      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1112
1113    self.assertEqual(
1114        '1-5 9-10',
1115        sparse_image.file_map['/system/file1'].extra['text_str'])
1116    self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete'])
1117
1118  @test_utils.SkipIfExternalToolsUnavailable()
1119  def test_GetSparseImage_systemRootImage_filenameWithExtraLeadingSlash(self):
1120    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1121    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
1122      target_files_zip.write(
1123          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1124          arcname='IMAGES/system.img')
1125      target_files_zip.writestr(
1126          'IMAGES/system.map',
1127          '\n'.join([
1128              '//system/file1 1-5 9-10',
1129              '//system/file2 11-12',
1130              '/system/app/file3 13-15']))
1131      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1132      # '/system/file2' has less blocks listed (2) than actual (3).
1133      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1134      # '/system/app/file3' has less blocks listed (3) than actual (4).
1135      target_files_zip.writestr('SYSTEM/app/file3', os.urandom(4096 * 4))
1136
1137    tempdir = common.UnzipTemp(target_files)
1138    with zipfile.ZipFile(target_files, 'r') as input_zip:
1139      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1140
1141    self.assertEqual(
1142        '1-5 9-10',
1143        sparse_image.file_map['//system/file1'].extra['text_str'])
1144    self.assertTrue(sparse_image.file_map['//system/file2'].extra['incomplete'])
1145    self.assertTrue(
1146        sparse_image.file_map['/system/app/file3'].extra['incomplete'])
1147
1148  @test_utils.SkipIfExternalToolsUnavailable()
1149  def test_GetSparseImage_systemRootImage_nonSystemFiles(self):
1150    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1151    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
1152      target_files_zip.write(
1153          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1154          arcname='IMAGES/system.img')
1155      target_files_zip.writestr(
1156          'IMAGES/system.map',
1157          '\n'.join([
1158              '//system/file1 1-5 9-10',
1159              '//init.rc 13-15']))
1160      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1161      # '/init.rc' has less blocks listed (3) than actual (4).
1162      target_files_zip.writestr('ROOT/init.rc', os.urandom(4096 * 4))
1163
1164    tempdir = common.UnzipTemp(target_files)
1165    with zipfile.ZipFile(target_files, 'r') as input_zip:
1166      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1167
1168    self.assertEqual(
1169        '1-5 9-10',
1170        sparse_image.file_map['//system/file1'].extra['text_str'])
1171    self.assertTrue(sparse_image.file_map['//init.rc'].extra['incomplete'])
1172
1173  @test_utils.SkipIfExternalToolsUnavailable()
1174  def test_GetSparseImage_fileNotFound(self):
1175    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1176    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
1177      target_files_zip.write(
1178          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1179          arcname='IMAGES/system.img')
1180      target_files_zip.writestr(
1181          'IMAGES/system.map',
1182          '\n'.join([
1183              '//system/file1 1-5 9-10',
1184              '//system/file2 11-12']))
1185      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1186
1187    tempdir = common.UnzipTemp(target_files)
1188    with zipfile.ZipFile(target_files, 'r') as input_zip:
1189      self.assertRaises(
1190          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
1191          False)
1192
1193  @test_utils.SkipIfExternalToolsUnavailable()
1194  def test_GetAvbChainedPartitionArg(self):
1195    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
1196    info_dict = {
1197        'avb_avbtool': 'avbtool',
1198        'avb_system_key_path': pubkey,
1199        'avb_system_rollback_index_location': 2,
1200    }
1201    args = common.GetAvbChainedPartitionArg('system', info_dict).split(':')
1202    self.assertEqual(3, len(args))
1203    self.assertEqual('system', args[0])
1204    self.assertEqual('2', args[1])
1205    self.assertTrue(os.path.exists(args[2]))
1206
1207  @test_utils.SkipIfExternalToolsUnavailable()
1208  def test_GetAvbChainedPartitionArg_withPrivateKey(self):
1209    key = os.path.join(self.testdata_dir, 'testkey.key')
1210    info_dict = {
1211        'avb_avbtool': 'avbtool',
1212        'avb_product_key_path': key,
1213        'avb_product_rollback_index_location': 2,
1214    }
1215    args = common.GetAvbChainedPartitionArg('product', info_dict).split(':')
1216    self.assertEqual(3, len(args))
1217    self.assertEqual('product', args[0])
1218    self.assertEqual('2', args[1])
1219    self.assertTrue(os.path.exists(args[2]))
1220
1221  @test_utils.SkipIfExternalToolsUnavailable()
1222  def test_GetAvbChainedPartitionArg_withSpecifiedKey(self):
1223    info_dict = {
1224        'avb_avbtool': 'avbtool',
1225        'avb_system_key_path': 'does-not-exist',
1226        'avb_system_rollback_index_location': 2,
1227    }
1228    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
1229    args = common.GetAvbChainedPartitionArg(
1230        'system', info_dict, pubkey).split(':')
1231    self.assertEqual(3, len(args))
1232    self.assertEqual('system', args[0])
1233    self.assertEqual('2', args[1])
1234    self.assertTrue(os.path.exists(args[2]))
1235
1236  @test_utils.SkipIfExternalToolsUnavailable()
1237  def test_GetAvbChainedPartitionArg_invalidKey(self):
1238    pubkey = os.path.join(self.testdata_dir, 'testkey_with_passwd.x509.pem')
1239    info_dict = {
1240        'avb_avbtool': 'avbtool',
1241        'avb_system_key_path': pubkey,
1242        'avb_system_rollback_index_location': 2,
1243    }
1244    self.assertRaises(
1245        common.ExternalError, common.GetAvbChainedPartitionArg, 'system',
1246        info_dict)
1247
1248  INFO_DICT_DEFAULT = {
1249      'recovery_api_version': 3,
1250      'fstab_version': 2,
1251      'system_root_image': 'true',
1252      'no_recovery' : 'true',
1253      'recovery_as_boot': 'true',
1254  }
1255
1256  def test_LoadListFromFile(self):
1257    file_path = os.path.join(self.testdata_dir,
1258                             'merge_config_framework_item_list')
1259    contents = common.LoadListFromFile(file_path)
1260    expected_contents = [
1261        'META/apkcerts.txt',
1262        'META/filesystem_config.txt',
1263        'META/root_filesystem_config.txt',
1264        'META/system_manifest.xml',
1265        'META/system_matrix.xml',
1266        'META/update_engine_config.txt',
1267        'PRODUCT/*',
1268        'ROOT/*',
1269        'SYSTEM/*',
1270    ]
1271    self.assertEqual(sorted(contents), sorted(expected_contents))
1272
1273  @staticmethod
1274  def _test_LoadInfoDict_createTargetFiles(info_dict, fstab_path):
1275    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1276    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
1277      info_values = ''.join(
1278          ['{}={}\n'.format(k, v) for k, v in sorted(info_dict.items())])
1279      common.ZipWriteStr(target_files_zip, 'META/misc_info.txt', info_values)
1280
1281      FSTAB_TEMPLATE = "/dev/block/system {} ext4 ro,barrier=1 defaults"
1282      if info_dict.get('system_root_image') == 'true':
1283        fstab_values = FSTAB_TEMPLATE.format('/')
1284      else:
1285        fstab_values = FSTAB_TEMPLATE.format('/system')
1286      common.ZipWriteStr(target_files_zip, fstab_path, fstab_values)
1287
1288      common.ZipWriteStr(
1289          target_files_zip, 'META/file_contexts', 'file-contexts')
1290    return target_files
1291
1292  def test_LoadInfoDict(self):
1293    target_files = self._test_LoadInfoDict_createTargetFiles(
1294        self.INFO_DICT_DEFAULT,
1295        'BOOT/RAMDISK/system/etc/recovery.fstab')
1296    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
1297      loaded_dict = common.LoadInfoDict(target_files_zip)
1298      self.assertEqual(3, loaded_dict['recovery_api_version'])
1299      self.assertEqual(2, loaded_dict['fstab_version'])
1300      self.assertIn('/', loaded_dict['fstab'])
1301      self.assertIn('/system', loaded_dict['fstab'])
1302
1303  def test_LoadInfoDict_legacyRecoveryFstabPath(self):
1304    target_files = self._test_LoadInfoDict_createTargetFiles(
1305        self.INFO_DICT_DEFAULT,
1306        'BOOT/RAMDISK/etc/recovery.fstab')
1307    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
1308      loaded_dict = common.LoadInfoDict(target_files_zip)
1309      self.assertEqual(3, loaded_dict['recovery_api_version'])
1310      self.assertEqual(2, loaded_dict['fstab_version'])
1311      self.assertIn('/', loaded_dict['fstab'])
1312      self.assertIn('/system', loaded_dict['fstab'])
1313
1314  @test_utils.SkipIfExternalToolsUnavailable()
1315  def test_LoadInfoDict_dirInput(self):
1316    target_files = self._test_LoadInfoDict_createTargetFiles(
1317        self.INFO_DICT_DEFAULT,
1318        'BOOT/RAMDISK/system/etc/recovery.fstab')
1319    unzipped = common.UnzipTemp(target_files)
1320    loaded_dict = common.LoadInfoDict(unzipped)
1321    self.assertEqual(3, loaded_dict['recovery_api_version'])
1322    self.assertEqual(2, loaded_dict['fstab_version'])
1323    self.assertIn('/', loaded_dict['fstab'])
1324    self.assertIn('/system', loaded_dict['fstab'])
1325
1326  @test_utils.SkipIfExternalToolsUnavailable()
1327  def test_LoadInfoDict_dirInput_legacyRecoveryFstabPath(self):
1328    target_files = self._test_LoadInfoDict_createTargetFiles(
1329        self.INFO_DICT_DEFAULT,
1330        'BOOT/RAMDISK/system/etc/recovery.fstab')
1331    unzipped = common.UnzipTemp(target_files)
1332    loaded_dict = common.LoadInfoDict(unzipped)
1333    self.assertEqual(3, loaded_dict['recovery_api_version'])
1334    self.assertEqual(2, loaded_dict['fstab_version'])
1335    self.assertIn('/', loaded_dict['fstab'])
1336    self.assertIn('/system', loaded_dict['fstab'])
1337
1338  def test_LoadInfoDict_systemRootImageFalse(self):
1339    # Devices not using system-as-root nor recovery-as-boot. Non-A/B devices
1340    # launched prior to P will likely have this config.
1341    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
1342    del info_dict['no_recovery']
1343    del info_dict['system_root_image']
1344    del info_dict['recovery_as_boot']
1345    target_files = self._test_LoadInfoDict_createTargetFiles(
1346        info_dict,
1347        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
1348    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
1349      loaded_dict = common.LoadInfoDict(target_files_zip)
1350      self.assertEqual(3, loaded_dict['recovery_api_version'])
1351      self.assertEqual(2, loaded_dict['fstab_version'])
1352      self.assertNotIn('/', loaded_dict['fstab'])
1353      self.assertIn('/system', loaded_dict['fstab'])
1354
1355  def test_LoadInfoDict_recoveryAsBootFalse(self):
1356    # Devices using system-as-root, but with standalone recovery image. Non-A/B
1357    # devices launched since P will likely have this config.
1358    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
1359    del info_dict['no_recovery']
1360    del info_dict['recovery_as_boot']
1361    target_files = self._test_LoadInfoDict_createTargetFiles(
1362        info_dict,
1363        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
1364    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
1365      loaded_dict = common.LoadInfoDict(target_files_zip)
1366      self.assertEqual(3, loaded_dict['recovery_api_version'])
1367      self.assertEqual(2, loaded_dict['fstab_version'])
1368      self.assertIn('/', loaded_dict['fstab'])
1369      self.assertIn('/system', loaded_dict['fstab'])
1370
1371  def test_LoadInfoDict_noRecoveryTrue(self):
1372    # Device doesn't have a recovery partition at all.
1373    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
1374    del info_dict['recovery_as_boot']
1375    target_files = self._test_LoadInfoDict_createTargetFiles(
1376        info_dict,
1377        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
1378    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
1379      loaded_dict = common.LoadInfoDict(target_files_zip)
1380      self.assertEqual(3, loaded_dict['recovery_api_version'])
1381      self.assertEqual(2, loaded_dict['fstab_version'])
1382      self.assertIsNone(loaded_dict['fstab'])
1383
1384  @test_utils.SkipIfExternalToolsUnavailable()
1385  def test_LoadInfoDict_missingMetaMiscInfoTxt(self):
1386    target_files = self._test_LoadInfoDict_createTargetFiles(
1387        self.INFO_DICT_DEFAULT,
1388        'BOOT/RAMDISK/system/etc/recovery.fstab')
1389    common.ZipDelete(target_files, 'META/misc_info.txt')
1390    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
1391      self.assertRaises(ValueError, common.LoadInfoDict, target_files_zip)
1392
1393  @test_utils.SkipIfExternalToolsUnavailable()
1394  def test_LoadInfoDict_repacking(self):
1395    target_files = self._test_LoadInfoDict_createTargetFiles(
1396        self.INFO_DICT_DEFAULT,
1397        'BOOT/RAMDISK/system/etc/recovery.fstab')
1398    unzipped = common.UnzipTemp(target_files)
1399    loaded_dict = common.LoadInfoDict(unzipped, True)
1400    self.assertEqual(3, loaded_dict['recovery_api_version'])
1401    self.assertEqual(2, loaded_dict['fstab_version'])
1402    self.assertIn('/', loaded_dict['fstab'])
1403    self.assertIn('/system', loaded_dict['fstab'])
1404    self.assertEqual(
1405        os.path.join(unzipped, 'ROOT'), loaded_dict['root_dir'])
1406    self.assertEqual(
1407        os.path.join(unzipped, 'META', 'root_filesystem_config.txt'),
1408        loaded_dict['root_fs_config'])
1409
1410  def test_LoadInfoDict_repackingWithZipFileInput(self):
1411    target_files = self._test_LoadInfoDict_createTargetFiles(
1412        self.INFO_DICT_DEFAULT,
1413        'BOOT/RAMDISK/system/etc/recovery.fstab')
1414    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
1415      self.assertRaises(
1416          AssertionError, common.LoadInfoDict, target_files_zip, True)
1417
1418  def test_MergeDynamicPartitionInfoDicts_ReturnsMergedDict(self):
1419    framework_dict = {
1420        'super_partition_groups': 'group_a',
1421        'dynamic_partition_list': 'system',
1422        'super_group_a_partition_list': 'system',
1423    }
1424    vendor_dict = {
1425        'super_partition_groups': 'group_a group_b',
1426        'dynamic_partition_list': 'vendor product',
1427        'super_group_a_partition_list': 'vendor',
1428        'super_group_a_group_size': '1000',
1429        'super_group_b_partition_list': 'product',
1430        'super_group_b_group_size': '2000',
1431    }
1432    merged_dict = common.MergeDynamicPartitionInfoDicts(
1433        framework_dict=framework_dict,
1434        vendor_dict=vendor_dict)
1435    expected_merged_dict = {
1436        'super_partition_groups': 'group_a group_b',
1437        'dynamic_partition_list': 'system vendor product',
1438        'super_group_a_partition_list': 'system vendor',
1439        'super_group_a_group_size': '1000',
1440        'super_group_b_partition_list': 'product',
1441        'super_group_b_group_size': '2000',
1442    }
1443    self.assertEqual(merged_dict, expected_merged_dict)
1444
1445  def test_MergeDynamicPartitionInfoDicts_IgnoringFrameworkGroupSize(self):
1446    framework_dict = {
1447        'super_partition_groups': 'group_a',
1448        'dynamic_partition_list': 'system',
1449        'super_group_a_partition_list': 'system',
1450        'super_group_a_group_size': '5000',
1451    }
1452    vendor_dict = {
1453        'super_partition_groups': 'group_a group_b',
1454        'dynamic_partition_list': 'vendor product',
1455        'super_group_a_partition_list': 'vendor',
1456        'super_group_a_group_size': '1000',
1457        'super_group_b_partition_list': 'product',
1458        'super_group_b_group_size': '2000',
1459    }
1460    merged_dict = common.MergeDynamicPartitionInfoDicts(
1461        framework_dict=framework_dict,
1462        vendor_dict=vendor_dict)
1463    expected_merged_dict = {
1464        'super_partition_groups': 'group_a group_b',
1465        'dynamic_partition_list': 'system vendor product',
1466        'super_group_a_partition_list': 'system vendor',
1467        'super_group_a_group_size': '1000',
1468        'super_group_b_partition_list': 'product',
1469        'super_group_b_group_size': '2000',
1470    }
1471    self.assertEqual(merged_dict, expected_merged_dict)
1472
1473  def test_GetAvbPartitionArg(self):
1474    info_dict = {}
1475    cmd = common.GetAvbPartitionArg('system', '/path/to/system.img', info_dict)
1476    self.assertEqual(
1477        ['--include_descriptors_from_image', '/path/to/system.img'], cmd)
1478
1479  @test_utils.SkipIfExternalToolsUnavailable()
1480  def test_AppendVBMetaArgsForPartition_vendorAsChainedPartition(self):
1481    testdata_dir = test_utils.get_testdata_dir()
1482    pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
1483    info_dict = {
1484        'avb_avbtool': 'avbtool',
1485        'avb_vendor_key_path': pubkey,
1486        'avb_vendor_rollback_index_location': 5,
1487    }
1488    cmd = common.GetAvbPartitionArg('vendor', '/path/to/vendor.img', info_dict)
1489    self.assertEqual(2, len(cmd))
1490    self.assertEqual('--chain_partition', cmd[0])
1491    chained_partition_args = cmd[1].split(':')
1492    self.assertEqual(3, len(chained_partition_args))
1493    self.assertEqual('vendor', chained_partition_args[0])
1494    self.assertEqual('5', chained_partition_args[1])
1495    self.assertTrue(os.path.exists(chained_partition_args[2]))
1496
1497  @test_utils.SkipIfExternalToolsUnavailable()
1498  def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_nonAb(self):
1499    testdata_dir = test_utils.get_testdata_dir()
1500    pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
1501    info_dict = {
1502        'avb_avbtool': 'avbtool',
1503        'avb_recovery_key_path': pubkey,
1504        'avb_recovery_rollback_index_location': 3,
1505    }
1506    cmd = common.GetAvbPartitionArg(
1507        'recovery', '/path/to/recovery.img', info_dict)
1508    self.assertFalse(cmd)
1509
1510  @test_utils.SkipIfExternalToolsUnavailable()
1511  def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_ab(self):
1512    testdata_dir = test_utils.get_testdata_dir()
1513    pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
1514    info_dict = {
1515        'ab_update': 'true',
1516        'avb_avbtool': 'avbtool',
1517        'avb_recovery_key_path': pubkey,
1518        'avb_recovery_rollback_index_location': 3,
1519    }
1520    cmd = common.GetAvbPartitionArg(
1521        'recovery', '/path/to/recovery.img', info_dict)
1522    self.assertEqual(2, len(cmd))
1523    self.assertEqual('--chain_partition', cmd[0])
1524    chained_partition_args = cmd[1].split(':')
1525    self.assertEqual(3, len(chained_partition_args))
1526    self.assertEqual('recovery', chained_partition_args[0])
1527    self.assertEqual('3', chained_partition_args[1])
1528    self.assertTrue(os.path.exists(chained_partition_args[2]))
1529
1530
1531class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase):
1532  """Checks the format of install-recovery.sh.
1533
1534  Its format should match between common.py and validate_target_files.py.
1535  """
1536
1537  def setUp(self):
1538    self._tempdir = common.MakeTempDir()
1539    # Create a dummy dict that contains the fstab info for boot&recovery.
1540    self._info = {"fstab" : {}}
1541    dummy_fstab = [
1542        "/dev/soc.0/by-name/boot /boot emmc defaults defaults",
1543        "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
1544    self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab)
1545    # Construct the gzipped recovery.img and boot.img
1546    self.recovery_data = bytearray([
1547        0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
1548        0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
1549        0x08, 0x00, 0x00, 0x00
1550    ])
1551    # echo -n "boot" | gzip -f | hd
1552    self.boot_data = bytearray([
1553        0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
1554        0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
1555    ])
1556
1557  def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
1558    loc = os.path.join(self._tempdir, prefix, name)
1559    if not os.path.exists(os.path.dirname(loc)):
1560      os.makedirs(os.path.dirname(loc))
1561    with open(loc, "wb") as f:
1562      f.write(data)
1563
1564  def test_full_recovery(self):
1565    recovery_image = common.File("recovery.img", self.recovery_data)
1566    boot_image = common.File("boot.img", self.boot_data)
1567    self._info["full_recovery_image"] = "true"
1568
1569    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1570                             recovery_image, boot_image, self._info)
1571    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1572                                                        self._info)
1573
1574  @test_utils.SkipIfExternalToolsUnavailable()
1575  def test_recovery_from_boot(self):
1576    recovery_image = common.File("recovery.img", self.recovery_data)
1577    self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
1578    boot_image = common.File("boot.img", self.boot_data)
1579    self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
1580
1581    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1582                             recovery_image, boot_image, self._info)
1583    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1584                                                        self._info)
1585    # Validate 'recovery-from-boot' with bonus argument.
1586    self._out_tmp_sink("etc/recovery-resource.dat", b"bonus", "SYSTEM")
1587    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1588                             recovery_image, boot_image, self._info)
1589    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1590                                                        self._info)
1591
1592
1593class MockBlockDifference(object):
1594
1595  def __init__(self, partition, tgt, src=None):
1596    self.partition = partition
1597    self.tgt = tgt
1598    self.src = src
1599
1600  def WriteScript(self, script, _, progress=None,
1601                  write_verify_script=False):
1602    if progress:
1603      script.AppendExtra("progress({})".format(progress))
1604    script.AppendExtra("patch({});".format(self.partition))
1605    if write_verify_script:
1606      self.WritePostInstallVerifyScript(script)
1607
1608  def WritePostInstallVerifyScript(self, script):
1609    script.AppendExtra("verify({});".format(self.partition))
1610
1611
1612class FakeSparseImage(object):
1613
1614  def __init__(self, size):
1615    self.blocksize = 4096
1616    self.total_blocks = size // 4096
1617    assert size % 4096 == 0, "{} is not a multiple of 4096".format(size)
1618
1619
1620class DynamicPartitionsDifferenceTest(test_utils.ReleaseToolsTestCase):
1621
1622  @staticmethod
1623  def get_op_list(output_path):
1624    with zipfile.ZipFile(output_path) as output_zip:
1625      with output_zip.open('dynamic_partitions_op_list') as op_list:
1626        return [line.decode().strip() for line in op_list.readlines()
1627                if not line.startswith(b'#')]
1628
1629  def setUp(self):
1630    self.script = test_utils.MockScriptWriter()
1631    self.output_path = common.MakeTempFile(suffix='.zip')
1632
1633  def test_full(self):
1634    target_info = common.LoadDictionaryFromLines("""
1635dynamic_partition_list=system vendor
1636super_partition_groups=group_foo
1637super_group_foo_group_size={group_size}
1638super_group_foo_partition_list=system vendor
1639""".format(group_size=4 * GiB).split("\n"))
1640    block_diffs = [MockBlockDifference("system", FakeSparseImage(3 * GiB)),
1641                   MockBlockDifference("vendor", FakeSparseImage(1 * GiB))]
1642
1643    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs)
1644    with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1645      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1646
1647    self.assertEqual(str(self.script).strip(), """
1648assert(update_dynamic_partitions(package_extract_file("dynamic_partitions_op_list")));
1649patch(system);
1650verify(system);
1651unmap_partition("system");
1652patch(vendor);
1653verify(vendor);
1654unmap_partition("vendor");
1655""".strip())
1656
1657    lines = self.get_op_list(self.output_path)
1658
1659    remove_all_groups = lines.index("remove_all_groups")
1660    add_group = lines.index("add_group group_foo 4294967296")
1661    add_vendor = lines.index("add vendor group_foo")
1662    add_system = lines.index("add system group_foo")
1663    resize_vendor = lines.index("resize vendor 1073741824")
1664    resize_system = lines.index("resize system 3221225472")
1665
1666    self.assertLess(remove_all_groups, add_group,
1667                    "Should add groups after removing all groups")
1668    self.assertLess(add_group, min(add_vendor, add_system),
1669                    "Should add partitions after adding group")
1670    self.assertLess(add_system, resize_system,
1671                    "Should resize system after adding it")
1672    self.assertLess(add_vendor, resize_vendor,
1673                    "Should resize vendor after adding it")
1674
1675  def test_inc_groups(self):
1676    source_info = common.LoadDictionaryFromLines("""
1677super_partition_groups=group_foo group_bar group_baz
1678super_group_foo_group_size={group_foo_size}
1679super_group_bar_group_size={group_bar_size}
1680""".format(group_foo_size=4 * GiB, group_bar_size=3 * GiB).split("\n"))
1681    target_info = common.LoadDictionaryFromLines("""
1682super_partition_groups=group_foo group_baz group_qux
1683super_group_foo_group_size={group_foo_size}
1684super_group_baz_group_size={group_baz_size}
1685super_group_qux_group_size={group_qux_size}
1686""".format(group_foo_size=3 * GiB, group_baz_size=4 * GiB,
1687           group_qux_size=1 * GiB).split("\n"))
1688
1689    dp_diff = common.DynamicPartitionsDifference(target_info,
1690                                                 block_diffs=[],
1691                                                 source_info_dict=source_info)
1692    with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1693      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1694
1695    lines = self.get_op_list(self.output_path)
1696
1697    removed = lines.index("remove_group group_bar")
1698    shrunk = lines.index("resize_group group_foo 3221225472")
1699    grown = lines.index("resize_group group_baz 4294967296")
1700    added = lines.index("add_group group_qux 1073741824")
1701
1702    self.assertLess(max(removed, shrunk),
1703                    min(grown, added),
1704                    "ops that remove / shrink partitions must precede ops that "
1705                    "grow / add partitions")
1706
1707  def test_incremental(self):
1708    source_info = common.LoadDictionaryFromLines("""
1709dynamic_partition_list=system vendor product system_ext
1710super_partition_groups=group_foo
1711super_group_foo_group_size={group_foo_size}
1712super_group_foo_partition_list=system vendor product system_ext
1713""".format(group_foo_size=4 * GiB).split("\n"))
1714    target_info = common.LoadDictionaryFromLines("""
1715dynamic_partition_list=system vendor product odm
1716super_partition_groups=group_foo group_bar
1717super_group_foo_group_size={group_foo_size}
1718super_group_foo_partition_list=system vendor odm
1719super_group_bar_group_size={group_bar_size}
1720super_group_bar_partition_list=product
1721""".format(group_foo_size=3 * GiB, group_bar_size=1 * GiB).split("\n"))
1722
1723    block_diffs = [MockBlockDifference("system", FakeSparseImage(1536 * MiB),
1724                                       src=FakeSparseImage(1024 * MiB)),
1725                   MockBlockDifference("vendor", FakeSparseImage(512 * MiB),
1726                                       src=FakeSparseImage(1024 * MiB)),
1727                   MockBlockDifference("product", FakeSparseImage(1024 * MiB),
1728                                       src=FakeSparseImage(1024 * MiB)),
1729                   MockBlockDifference("system_ext", None,
1730                                       src=FakeSparseImage(1024 * MiB)),
1731                   MockBlockDifference("odm", FakeSparseImage(1024 * MiB),
1732                                       src=None)]
1733
1734    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
1735                                                 source_info_dict=source_info)
1736    with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1737      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1738
1739    metadata_idx = self.script.lines.index(
1740        'assert(update_dynamic_partitions(package_extract_file('
1741        '"dynamic_partitions_op_list")));')
1742    self.assertLess(self.script.lines.index('patch(vendor);'), metadata_idx)
1743    self.assertLess(metadata_idx, self.script.lines.index('verify(vendor);'))
1744    for p in ("product", "system", "odm"):
1745      patch_idx = self.script.lines.index("patch({});".format(p))
1746      verify_idx = self.script.lines.index("verify({});".format(p))
1747      self.assertLess(metadata_idx, patch_idx,
1748                      "Should patch {} after updating metadata".format(p))
1749      self.assertLess(patch_idx, verify_idx,
1750                      "Should verify {} after patching".format(p))
1751
1752    self.assertNotIn("patch(system_ext);", self.script.lines)
1753
1754    lines = self.get_op_list(self.output_path)
1755
1756    remove = lines.index("remove system_ext")
1757    move_product_out = lines.index("move product default")
1758    shrink = lines.index("resize vendor 536870912")
1759    shrink_group = lines.index("resize_group group_foo 3221225472")
1760    add_group_bar = lines.index("add_group group_bar 1073741824")
1761    add_odm = lines.index("add odm group_foo")
1762    grow_existing = lines.index("resize system 1610612736")
1763    grow_added = lines.index("resize odm 1073741824")
1764    move_product_in = lines.index("move product group_bar")
1765
1766    max_idx_move_partition_out_foo = max(remove, move_product_out, shrink)
1767    min_idx_move_partition_in_foo = min(add_odm, grow_existing, grow_added)
1768
1769    self.assertLess(max_idx_move_partition_out_foo, shrink_group,
1770                    "Must shrink group after partitions inside group are shrunk"
1771                    " / removed")
1772
1773    self.assertLess(add_group_bar, move_product_in,
1774                    "Must add partitions to group after group is added")
1775
1776    self.assertLess(max_idx_move_partition_out_foo,
1777                    min_idx_move_partition_in_foo,
1778                    "Must shrink partitions / remove partitions from group"
1779                    "before adding / moving partitions into group")
1780
1781  def test_remove_partition(self):
1782    source_info = common.LoadDictionaryFromLines("""
1783blockimgdiff_versions=3,4
1784use_dynamic_partitions=true
1785dynamic_partition_list=foo
1786super_partition_groups=group_foo
1787super_group_foo_group_size={group_foo_size}
1788super_group_foo_partition_list=foo
1789""".format(group_foo_size=4 * GiB).split("\n"))
1790    target_info = common.LoadDictionaryFromLines("""
1791blockimgdiff_versions=3,4
1792use_dynamic_partitions=true
1793super_partition_groups=group_foo
1794super_group_foo_group_size={group_foo_size}
1795""".format(group_foo_size=4 * GiB).split("\n"))
1796
1797    common.OPTIONS.info_dict = target_info
1798    common.OPTIONS.target_info_dict = target_info
1799    common.OPTIONS.source_info_dict = source_info
1800    common.OPTIONS.cache_size = 4 * 4096
1801
1802    block_diffs = [common.BlockDifference("foo", EmptyImage(),
1803                                          src=DataImage("source", pad=True))]
1804
1805    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
1806                                                 source_info_dict=source_info)
1807    with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1808      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1809
1810    self.assertNotIn("block_image_update", str(self.script),
1811                     "Removed partition should not be patched.")
1812
1813    lines = self.get_op_list(self.output_path)
1814    self.assertEqual(lines, ["remove foo"])
1815
1816
1817class PartitionBuildPropsTest(test_utils.ReleaseToolsTestCase):
1818  def setUp(self):
1819    self.odm_build_prop = [
1820        'ro.odm.build.date.utc=1578430045',
1821        'ro.odm.build.fingerprint='
1822        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1823        'ro.product.odm.device=coral',
1824        'import /odm/etc/build_${ro.boot.product.device_name}.prop',
1825    ]
1826
1827  @staticmethod
1828  def _BuildZipFile(entries):
1829    input_file = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1830    with zipfile.ZipFile(input_file, 'w') as input_zip:
1831      for name, content in entries.items():
1832        input_zip.writestr(name, content)
1833
1834    return input_file
1835
1836  def test_parseBuildProps_noImportStatement(self):
1837    build_prop = [
1838        'ro.odm.build.date.utc=1578430045',
1839        'ro.odm.build.fingerprint='
1840        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1841        'ro.product.odm.device=coral',
1842    ]
1843    input_file = self._BuildZipFile({
1844        'ODM/etc/build.prop': '\n'.join(build_prop),
1845    })
1846
1847    with zipfile.ZipFile(input_file, 'r') as input_zip:
1848      placeholder_values = {
1849          'ro.boot.product.device_name': ['std', 'pro']
1850      }
1851      partition_props = common.PartitionBuildProps.FromInputFile(
1852          input_zip, 'odm', placeholder_values)
1853
1854    self.assertEqual({
1855        'ro.odm.build.date.utc': '1578430045',
1856        'ro.odm.build.fingerprint':
1857        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1858        'ro.product.odm.device': 'coral',
1859    }, partition_props.build_props)
1860
1861    self.assertEqual(set(), partition_props.prop_overrides)
1862
1863  def test_parseBuildProps_singleImportStatement(self):
1864    build_std_prop = [
1865        'ro.product.odm.device=coral',
1866        'ro.product.odm.name=product1',
1867    ]
1868    build_pro_prop = [
1869        'ro.product.odm.device=coralpro',
1870        'ro.product.odm.name=product2',
1871    ]
1872
1873    input_file = self._BuildZipFile({
1874        'ODM/etc/build.prop': '\n'.join(self.odm_build_prop),
1875        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
1876        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
1877    })
1878
1879    with zipfile.ZipFile(input_file, 'r') as input_zip:
1880      placeholder_values = {
1881          'ro.boot.product.device_name': 'std'
1882      }
1883      partition_props = common.PartitionBuildProps.FromInputFile(
1884          input_zip, 'odm', placeholder_values)
1885
1886    self.assertEqual({
1887      'ro.odm.build.date.utc': '1578430045',
1888      'ro.odm.build.fingerprint':
1889      'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1890      'ro.product.odm.device': 'coral',
1891      'ro.product.odm.name': 'product1',
1892    }, partition_props.build_props)
1893
1894    with zipfile.ZipFile(input_file, 'r') as input_zip:
1895      placeholder_values = {
1896          'ro.boot.product.device_name': 'pro'
1897      }
1898      partition_props = common.PartitionBuildProps.FromInputFile(
1899          input_zip, 'odm', placeholder_values)
1900
1901    self.assertEqual({
1902        'ro.odm.build.date.utc': '1578430045',
1903        'ro.odm.build.fingerprint':
1904        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1905        'ro.product.odm.device': 'coralpro',
1906        'ro.product.odm.name': 'product2',
1907    }, partition_props.build_props)
1908
1909  def test_parseBuildProps_noPlaceHolders(self):
1910    build_prop = copy.copy(self.odm_build_prop)
1911    input_file = self._BuildZipFile({
1912        'ODM/etc/build.prop': '\n'.join(build_prop),
1913    })
1914
1915    with zipfile.ZipFile(input_file, 'r') as input_zip:
1916      partition_props = common.PartitionBuildProps.FromInputFile(
1917          input_zip, 'odm')
1918
1919    self.assertEqual({
1920        'ro.odm.build.date.utc': '1578430045',
1921        'ro.odm.build.fingerprint':
1922        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1923        'ro.product.odm.device': 'coral',
1924    }, partition_props.build_props)
1925
1926    self.assertEqual(set(), partition_props.prop_overrides)
1927
1928  def test_parseBuildProps_multipleImportStatements(self):
1929    build_prop = copy.deepcopy(self.odm_build_prop)
1930    build_prop.append(
1931        'import /odm/etc/build_${ro.boot.product.product_name}.prop')
1932
1933    build_std_prop = [
1934        'ro.product.odm.device=coral',
1935    ]
1936    build_pro_prop = [
1937        'ro.product.odm.device=coralpro',
1938    ]
1939
1940    product1_prop = [
1941        'ro.product.odm.name=product1',
1942        'ro.product.not_care=not_care',
1943    ]
1944
1945    product2_prop = [
1946        'ro.product.odm.name=product2',
1947        'ro.product.not_care=not_care',
1948    ]
1949
1950    input_file = self._BuildZipFile({
1951        'ODM/etc/build.prop': '\n'.join(build_prop),
1952        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
1953        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
1954        'ODM/etc/build_product1.prop': '\n'.join(product1_prop),
1955        'ODM/etc/build_product2.prop': '\n'.join(product2_prop),
1956    })
1957
1958    with zipfile.ZipFile(input_file, 'r') as input_zip:
1959      placeholder_values = {
1960          'ro.boot.product.device_name': 'std',
1961          'ro.boot.product.product_name': 'product1',
1962          'ro.boot.product.not_care': 'not_care',
1963      }
1964      partition_props = common.PartitionBuildProps.FromInputFile(
1965          input_zip, 'odm', placeholder_values)
1966
1967    self.assertEqual({
1968        'ro.odm.build.date.utc': '1578430045',
1969        'ro.odm.build.fingerprint':
1970        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1971        'ro.product.odm.device': 'coral',
1972        'ro.product.odm.name': 'product1'
1973    }, partition_props.build_props)
1974
1975    with zipfile.ZipFile(input_file, 'r') as input_zip:
1976      placeholder_values = {
1977          'ro.boot.product.device_name': 'pro',
1978          'ro.boot.product.product_name': 'product2',
1979          'ro.boot.product.not_care': 'not_care',
1980      }
1981      partition_props = common.PartitionBuildProps.FromInputFile(
1982          input_zip, 'odm', placeholder_values)
1983
1984    self.assertEqual({
1985        'ro.odm.build.date.utc': '1578430045',
1986        'ro.odm.build.fingerprint':
1987        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1988        'ro.product.odm.device': 'coralpro',
1989        'ro.product.odm.name': 'product2'
1990    }, partition_props.build_props)
1991
1992  def test_parseBuildProps_defineAfterOverride(self):
1993    build_prop = copy.deepcopy(self.odm_build_prop)
1994    build_prop.append('ro.product.odm.device=coral')
1995
1996    build_std_prop = [
1997        'ro.product.odm.device=coral',
1998    ]
1999    build_pro_prop = [
2000        'ro.product.odm.device=coralpro',
2001    ]
2002
2003    input_file = self._BuildZipFile({
2004        'ODM/etc/build.prop': '\n'.join(build_prop),
2005        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2006        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2007    })
2008
2009    with zipfile.ZipFile(input_file, 'r') as input_zip:
2010      placeholder_values = {
2011          'ro.boot.product.device_name': 'std',
2012      }
2013
2014      self.assertRaises(ValueError, common.PartitionBuildProps.FromInputFile,
2015                        input_zip, 'odm', placeholder_values)
2016
2017  def test_parseBuildProps_duplicateOverride(self):
2018    build_prop = copy.deepcopy(self.odm_build_prop)
2019    build_prop.append(
2020        'import /odm/etc/build_${ro.boot.product.product_name}.prop')
2021
2022    build_std_prop = [
2023        'ro.product.odm.device=coral',
2024        'ro.product.odm.name=product1',
2025    ]
2026    build_pro_prop = [
2027        'ro.product.odm.device=coralpro',
2028    ]
2029
2030    product1_prop = [
2031        'ro.product.odm.name=product1',
2032    ]
2033
2034    product2_prop = [
2035        'ro.product.odm.name=product2',
2036    ]
2037
2038    input_file = self._BuildZipFile({
2039        'ODM/etc/build.prop': '\n'.join(build_prop),
2040        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2041        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2042        'ODM/etc/build_product1.prop': '\n'.join(product1_prop),
2043        'ODM/etc/build_product2.prop': '\n'.join(product2_prop),
2044    })
2045
2046    with zipfile.ZipFile(input_file, 'r') as input_zip:
2047      placeholder_values = {
2048          'ro.boot.product.device_name': 'std',
2049          'ro.boot.product.product_name': 'product1',
2050      }
2051      self.assertRaises(ValueError, common.PartitionBuildProps.FromInputFile,
2052                        input_zip, 'odm', placeholder_values)
2053