• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import copy
18import os
19import subprocess
20import tempfile
21import time
22import zipfile
23from hashlib import sha1
24
25import common
26import test_utils
27import validate_target_files
28from rangelib import RangeSet
29
30from blockimgdiff import EmptyImage, DataImage
31
32KiB = 1024
33MiB = 1024 * KiB
34GiB = 1024 * MiB
35
36
37def get_2gb_string():
38  size = int(2 * GiB + 1)
39  block_size = 4 * KiB
40  step_size = 4 * MiB
41  # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
42  for _ in range(0, size, step_size):
43    yield os.urandom(block_size)
44    yield '\0' * (step_size - block_size)
45
46
47class CommonZipTest(test_utils.ReleaseToolsTestCase):
48
49  def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
50              test_file_name=None, expected_stat=None, expected_mode=0o644,
51              expected_compress_type=zipfile.ZIP_STORED):
52    # Verify the stat if present.
53    if test_file_name is not None:
54      new_stat = os.stat(test_file_name)
55      self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
56      self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
57
58    # Reopen the zip file to verify.
59    zip_file = zipfile.ZipFile(zip_file_name, "r")
60
61    # Verify the timestamp.
62    info = zip_file.getinfo(arcname)
63    self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
64
65    # Verify the file mode.
66    mode = (info.external_attr >> 16) & 0o777
67    self.assertEqual(mode, expected_mode)
68
69    # Verify the compress type.
70    self.assertEqual(info.compress_type, expected_compress_type)
71
72    # Verify the zip contents.
73    entry = zip_file.open(arcname)
74    sha1_hash = sha1()
75    for chunk in iter(lambda: entry.read(4 * MiB), ''):
76      sha1_hash.update(chunk)
77    self.assertEqual(expected_hash, sha1_hash.hexdigest())
78    self.assertIsNone(zip_file.testzip())
79
80  def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
81    extra_zipwrite_args = dict(extra_zipwrite_args or {})
82
83    test_file = tempfile.NamedTemporaryFile(delete=False)
84    test_file_name = test_file.name
85
86    zip_file = tempfile.NamedTemporaryFile(delete=False)
87    zip_file_name = zip_file.name
88
89    # File names within an archive strip the leading slash.
90    arcname = extra_zipwrite_args.get("arcname", test_file_name)
91    if arcname[0] == "/":
92      arcname = arcname[1:]
93
94    zip_file.close()
95    zip_file = zipfile.ZipFile(zip_file_name, "w")
96
97    try:
98      sha1_hash = sha1()
99      for data in contents:
100        sha1_hash.update(data)
101        test_file.write(data)
102      test_file.close()
103
104      expected_stat = os.stat(test_file_name)
105      expected_mode = extra_zipwrite_args.get("perms", 0o644)
106      expected_compress_type = extra_zipwrite_args.get("compress_type",
107                                                       zipfile.ZIP_STORED)
108      time.sleep(5)  # Make sure the atime/mtime will change measurably.
109
110      common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
111      common.ZipClose(zip_file)
112
113      self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
114                   test_file_name, expected_stat, expected_mode,
115                   expected_compress_type)
116    finally:
117      os.remove(test_file_name)
118      os.remove(zip_file_name)
119
120  def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
121    extra_args = dict(extra_args or {})
122
123    zip_file = tempfile.NamedTemporaryFile(delete=False)
124    zip_file_name = zip_file.name
125    zip_file.close()
126
127    zip_file = zipfile.ZipFile(zip_file_name, "w")
128
129    try:
130      expected_compress_type = extra_args.get("compress_type",
131                                              zipfile.ZIP_STORED)
132      time.sleep(5)  # Make sure the atime/mtime will change measurably.
133
134      if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
135        arcname = zinfo_or_arcname
136        expected_mode = extra_args.get("perms", 0o644)
137      else:
138        arcname = zinfo_or_arcname.filename
139        expected_mode = extra_args.get("perms",
140                                       zinfo_or_arcname.external_attr >> 16)
141
142      common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
143      common.ZipClose(zip_file)
144
145      self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
146                   expected_mode=expected_mode,
147                   expected_compress_type=expected_compress_type)
148    finally:
149      os.remove(zip_file_name)
150
151  def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
152    extra_args = dict(extra_args or {})
153
154    zip_file = tempfile.NamedTemporaryFile(delete=False)
155    zip_file_name = zip_file.name
156
157    test_file = tempfile.NamedTemporaryFile(delete=False)
158    test_file_name = test_file.name
159
160    arcname_large = test_file_name
161    arcname_small = "bar"
162
163    # File names within an archive strip the leading slash.
164    if arcname_large[0] == "/":
165      arcname_large = arcname_large[1:]
166
167    zip_file.close()
168    zip_file = zipfile.ZipFile(zip_file_name, "w")
169
170    try:
171      sha1_hash = sha1()
172      for data in large:
173        sha1_hash.update(data)
174        test_file.write(data)
175      test_file.close()
176
177      expected_stat = os.stat(test_file_name)
178      expected_mode = 0o644
179      expected_compress_type = extra_args.get("compress_type",
180                                              zipfile.ZIP_STORED)
181      time.sleep(5)  # Make sure the atime/mtime will change measurably.
182
183      common.ZipWrite(zip_file, test_file_name, **extra_args)
184      common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
185      common.ZipClose(zip_file)
186
187      # Verify the contents written by ZipWrite().
188      self._verify(zip_file, zip_file_name, arcname_large,
189                   sha1_hash.hexdigest(), test_file_name, expected_stat,
190                   expected_mode, expected_compress_type)
191
192      # Verify the contents written by ZipWriteStr().
193      self._verify(zip_file, zip_file_name, arcname_small,
194                   sha1(small).hexdigest(),
195                   expected_compress_type=expected_compress_type)
196    finally:
197      os.remove(zip_file_name)
198      os.remove(test_file_name)
199
200  def _test_reset_ZIP64_LIMIT(self, func, *args):
201    default_limit = (1 << 31) - 1
202    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
203    func(*args)
204    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
205
206  def test_ZipWrite(self):
207    file_contents = os.urandom(1024)
208    self._test_ZipWrite(file_contents)
209
210  def test_ZipWrite_with_opts(self):
211    file_contents = os.urandom(1024)
212    self._test_ZipWrite(file_contents, {
213        "arcname": "foobar",
214        "perms": 0o777,
215        "compress_type": zipfile.ZIP_DEFLATED,
216    })
217    self._test_ZipWrite(file_contents, {
218        "arcname": "foobar",
219        "perms": 0o700,
220        "compress_type": zipfile.ZIP_STORED,
221    })
222
223  def test_ZipWrite_large_file(self):
224    file_contents = get_2gb_string()
225    self._test_ZipWrite(file_contents, {
226        "compress_type": zipfile.ZIP_DEFLATED,
227    })
228
229  def test_ZipWrite_resets_ZIP64_LIMIT(self):
230    self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
231
232  def test_ZipWriteStr(self):
233    random_string = os.urandom(1024)
234    # Passing arcname
235    self._test_ZipWriteStr("foo", random_string)
236
237    # Passing zinfo
238    zinfo = zipfile.ZipInfo(filename="foo")
239    self._test_ZipWriteStr(zinfo, random_string)
240
241    # Timestamp in the zinfo should be overwritten.
242    zinfo.date_time = (2015, 3, 1, 15, 30, 0)
243    self._test_ZipWriteStr(zinfo, random_string)
244
245  def test_ZipWriteStr_with_opts(self):
246    random_string = os.urandom(1024)
247    # Passing arcname
248    self._test_ZipWriteStr("foo", random_string, {
249        "perms": 0o700,
250        "compress_type": zipfile.ZIP_DEFLATED,
251    })
252    self._test_ZipWriteStr("bar", random_string, {
253        "compress_type": zipfile.ZIP_STORED,
254    })
255
256    # Passing zinfo
257    zinfo = zipfile.ZipInfo(filename="foo")
258    self._test_ZipWriteStr(zinfo, random_string, {
259        "compress_type": zipfile.ZIP_DEFLATED,
260    })
261    self._test_ZipWriteStr(zinfo, random_string, {
262        "perms": 0o600,
263        "compress_type": zipfile.ZIP_STORED,
264    })
265
266  def test_ZipWriteStr_large_file(self):
267    # zipfile.writestr() doesn't work when the str size is over 2GiB even with
268    # the workaround. We will only test the case of writing a string into a
269    # large archive.
270    long_string = get_2gb_string()
271    short_string = os.urandom(1024)
272    self._test_ZipWriteStr_large_file(long_string, short_string, {
273        "compress_type": zipfile.ZIP_DEFLATED,
274    })
275
276  def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
277    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
278    zinfo = zipfile.ZipInfo(filename="foo")
279    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
280
281  def test_bug21309935(self):
282    zip_file = tempfile.NamedTemporaryFile(delete=False)
283    zip_file_name = zip_file.name
284    zip_file.close()
285
286    try:
287      random_string = os.urandom(1024)
288      zip_file = zipfile.ZipFile(zip_file_name, "w")
289      # Default perms should be 0o644 when passing the filename.
290      common.ZipWriteStr(zip_file, "foo", random_string)
291      # Honor the specified perms.
292      common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
293      # The perms in zinfo should be untouched.
294      zinfo = zipfile.ZipInfo(filename="baz")
295      zinfo.external_attr = 0o740 << 16
296      common.ZipWriteStr(zip_file, zinfo, random_string)
297      # Explicitly specified perms has the priority.
298      zinfo = zipfile.ZipInfo(filename="qux")
299      zinfo.external_attr = 0o700 << 16
300      common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
301      common.ZipClose(zip_file)
302
303      self._verify(zip_file, zip_file_name, "foo",
304                   sha1(random_string).hexdigest(),
305                   expected_mode=0o644)
306      self._verify(zip_file, zip_file_name, "bar",
307                   sha1(random_string).hexdigest(),
308                   expected_mode=0o755)
309      self._verify(zip_file, zip_file_name, "baz",
310                   sha1(random_string).hexdigest(),
311                   expected_mode=0o740)
312      self._verify(zip_file, zip_file_name, "qux",
313                   sha1(random_string).hexdigest(),
314                   expected_mode=0o400)
315    finally:
316      os.remove(zip_file_name)
317
318  def test_ZipDelete(self):
319    zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
320    output_zip = zipfile.ZipFile(zip_file.name, 'w',
321                                 compression=zipfile.ZIP_DEFLATED)
322    with tempfile.NamedTemporaryFile() as entry_file:
323      entry_file.write(os.urandom(1024))
324      common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
325      common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
326      common.ZipWrite(output_zip, entry_file.name, arcname='Test3')
327      common.ZipClose(output_zip)
328    zip_file.close()
329
330    try:
331      common.ZipDelete(zip_file.name, 'Test2')
332      with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
333        entries = check_zip.namelist()
334        self.assertTrue('Test1' in entries)
335        self.assertFalse('Test2' in entries)
336        self.assertTrue('Test3' in entries)
337
338      self.assertRaises(
339          common.ExternalError, common.ZipDelete, zip_file.name, 'Test2')
340      with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
341        entries = check_zip.namelist()
342        self.assertTrue('Test1' in entries)
343        self.assertFalse('Test2' in entries)
344        self.assertTrue('Test3' in entries)
345
346      common.ZipDelete(zip_file.name, ['Test3'])
347      with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
348        entries = check_zip.namelist()
349        self.assertTrue('Test1' in entries)
350        self.assertFalse('Test2' in entries)
351        self.assertFalse('Test3' in entries)
352
353      common.ZipDelete(zip_file.name, ['Test1', 'Test2'])
354      with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
355        entries = check_zip.namelist()
356        self.assertFalse('Test1' in entries)
357        self.assertFalse('Test2' in entries)
358        self.assertFalse('Test3' in entries)
359    finally:
360      os.remove(zip_file.name)
361
362  @staticmethod
363  def _test_UnzipTemp_createZipFile():
364    zip_file = common.MakeTempFile(suffix='.zip')
365    output_zip = zipfile.ZipFile(
366        zip_file, 'w', compression=zipfile.ZIP_DEFLATED)
367    contents = os.urandom(1024)
368    with tempfile.NamedTemporaryFile() as entry_file:
369      entry_file.write(contents)
370      common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
371      common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
372      common.ZipWrite(output_zip, entry_file.name, arcname='Foo3')
373      common.ZipWrite(output_zip, entry_file.name, arcname='Bar4')
374      common.ZipWrite(output_zip, entry_file.name, arcname='Dir5/Baz5')
375      common.ZipClose(output_zip)
376    common.ZipClose(output_zip)
377    return zip_file
378
379  def test_UnzipTemp(self):
380    zip_file = self._test_UnzipTemp_createZipFile()
381    unzipped_dir = common.UnzipTemp(zip_file)
382    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
383    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
384    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
385    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
386    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
387
388  def test_UnzipTemp_withPatterns(self):
389    zip_file = self._test_UnzipTemp_createZipFile()
390
391    unzipped_dir = common.UnzipTemp(zip_file, ['Test1'])
392    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
393    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
394    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
395    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
396    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
397
398    unzipped_dir = common.UnzipTemp(zip_file, ['Test1', 'Foo3'])
399    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
400    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
401    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
402    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
403    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
404
405    unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Foo3*'])
406    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
407    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
408    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
409    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
410    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
411
412    unzipped_dir = common.UnzipTemp(zip_file, ['*Test1', '*Baz*'])
413    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
414    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
415    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
416    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
417    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
418
419  def test_UnzipTemp_withEmptyPatterns(self):
420    zip_file = self._test_UnzipTemp_createZipFile()
421    unzipped_dir = common.UnzipTemp(zip_file, [])
422    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
423    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
424    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
425    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
426    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
427
428  def test_UnzipTemp_withPartiallyMatchingPatterns(self):
429    zip_file = self._test_UnzipTemp_createZipFile()
430    unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Nonexistent*'])
431    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
432    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
433    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
434    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
435    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
436
437  def test_UnzipTemp_withNoMatchingPatterns(self):
438    zip_file = self._test_UnzipTemp_createZipFile()
439    unzipped_dir = common.UnzipTemp(zip_file, ['Foo4', 'Nonexistent*'])
440    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
441    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
442    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
443    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
444    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
445
446
447class CommonApkUtilsTest(test_utils.ReleaseToolsTestCase):
448  """Tests the APK utils related functions."""
449
450  APKCERTS_TXT1 = (
451      'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
452      ' private_key="certs/devkey.pk8"\n'
453      'name="Settings.apk"'
454      ' certificate="build/target/product/security/platform.x509.pem"'
455      ' private_key="build/target/product/security/platform.pk8"\n'
456      'name="TV.apk" certificate="PRESIGNED" private_key=""\n'
457  )
458
459  APKCERTS_CERTMAP1 = {
460      'RecoveryLocalizer.apk' : 'certs/devkey',
461      'Settings.apk' : 'build/target/product/security/platform',
462      'TV.apk' : 'PRESIGNED',
463  }
464
465  APKCERTS_TXT2 = (
466      'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"'
467      ' private_key="certs/compressed1.pk8" compressed="gz"\n'
468      'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"'
469      ' private_key="certs/compressed2.pk8" compressed="gz"\n'
470      'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"'
471      ' private_key="certs/compressed2.pk8" compressed="gz"\n'
472      'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"'
473      ' private_key="certs/compressed3.pk8" compressed="gz"\n'
474  )
475
476  APKCERTS_CERTMAP2 = {
477      'Compressed1.apk' : 'certs/compressed1',
478      'Compressed2a.apk' : 'certs/compressed2',
479      'Compressed2b.apk' : 'certs/compressed2',
480      'Compressed3.apk' : 'certs/compressed3',
481  }
482
483  APKCERTS_TXT3 = (
484      'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"'
485      ' private_key="certs/compressed4.pk8" compressed="xz"\n'
486  )
487
488  APKCERTS_CERTMAP3 = {
489      'Compressed4.apk' : 'certs/compressed4',
490  }
491
492  def setUp(self):
493    self.testdata_dir = test_utils.get_testdata_dir()
494
495  @staticmethod
496  def _write_apkcerts_txt(apkcerts_txt, additional=None):
497    if additional is None:
498      additional = []
499    target_files = common.MakeTempFile(suffix='.zip')
500    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
501      target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt)
502      for entry in additional:
503        target_files_zip.writestr(entry, '')
504    return target_files
505
506  def test_ReadApkCerts_NoncompressedApks(self):
507    target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1)
508    with zipfile.ZipFile(target_files, 'r') as input_zip:
509      certmap, ext = common.ReadApkCerts(input_zip)
510
511    self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap)
512    self.assertIsNone(ext)
513
514  def test_ReadApkCerts_CompressedApks(self):
515    # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is
516    # not stored in '.gz' format, so it shouldn't be considered as installed.
517    target_files = self._write_apkcerts_txt(
518        self.APKCERTS_TXT2,
519        ['Compressed1.apk.gz', 'Compressed3.apk'])
520
521    with zipfile.ZipFile(target_files, 'r') as input_zip:
522      certmap, ext = common.ReadApkCerts(input_zip)
523
524    self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap)
525    self.assertEqual('.gz', ext)
526
527    # Alternative case with '.xz'.
528    target_files = self._write_apkcerts_txt(
529        self.APKCERTS_TXT3, ['Compressed4.apk.xz'])
530
531    with zipfile.ZipFile(target_files, 'r') as input_zip:
532      certmap, ext = common.ReadApkCerts(input_zip)
533
534    self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap)
535    self.assertEqual('.xz', ext)
536
537  def test_ReadApkCerts_CompressedAndNoncompressedApks(self):
538    target_files = self._write_apkcerts_txt(
539        self.APKCERTS_TXT1 + self.APKCERTS_TXT2,
540        ['Compressed1.apk.gz', 'Compressed3.apk'])
541
542    with zipfile.ZipFile(target_files, 'r') as input_zip:
543      certmap, ext = common.ReadApkCerts(input_zip)
544
545    certmap_merged = self.APKCERTS_CERTMAP1.copy()
546    certmap_merged.update(self.APKCERTS_CERTMAP2)
547    self.assertDictEqual(certmap_merged, certmap)
548    self.assertEqual('.gz', ext)
549
550  def test_ReadApkCerts_MultipleCompressionMethods(self):
551    target_files = self._write_apkcerts_txt(
552        self.APKCERTS_TXT2 + self.APKCERTS_TXT3,
553        ['Compressed1.apk.gz', 'Compressed4.apk.xz'])
554
555    with zipfile.ZipFile(target_files, 'r') as input_zip:
556      self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
557
558  def test_ReadApkCerts_MismatchingKeys(self):
559    malformed_apkcerts_txt = (
560        'name="App1.apk" certificate="certs/cert1.x509.pem"'
561        ' private_key="certs/cert2.pk8"\n'
562    )
563    target_files = self._write_apkcerts_txt(malformed_apkcerts_txt)
564
565    with zipfile.ZipFile(target_files, 'r') as input_zip:
566      self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
567
568  def test_ExtractPublicKey(self):
569    cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
570    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
571    with open(pubkey, 'rb') as pubkey_fp:
572      self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert))
573
574  def test_ExtractPublicKey_invalidInput(self):
575    wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8')
576    self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input)
577
578  def test_ExtractAvbPublicKey(self):
579    privkey = os.path.join(self.testdata_dir, 'testkey.key')
580    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
581    with open(common.ExtractAvbPublicKey(privkey)) as privkey_fp, \
582        open(common.ExtractAvbPublicKey(pubkey)) as pubkey_fp:
583      self.assertEqual(privkey_fp.read(), pubkey_fp.read())
584
585  def test_ParseCertificate(self):
586    cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
587
588    cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER']
589    proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
590    expected, _ = proc.communicate()
591    self.assertEqual(0, proc.returncode)
592
593    with open(cert) as cert_fp:
594      actual = common.ParseCertificate(cert_fp.read())
595    self.assertEqual(expected, actual)
596
597  def test_GetMinSdkVersion(self):
598    test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
599    self.assertEqual('24', common.GetMinSdkVersion(test_app))
600
601  def test_GetMinSdkVersion_invalidInput(self):
602    self.assertRaises(
603        common.ExternalError, common.GetMinSdkVersion, 'does-not-exist.apk')
604
605  def test_GetMinSdkVersionInt(self):
606    test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
607    self.assertEqual(24, common.GetMinSdkVersionInt(test_app, {}))
608
609  def test_GetMinSdkVersionInt_invalidInput(self):
610    self.assertRaises(
611        common.ExternalError, common.GetMinSdkVersionInt, 'does-not-exist.apk',
612        {})
613
614
615class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
616
617  def setUp(self):
618    self.testdata_dir = test_utils.get_testdata_dir()
619
620  def test_GetSparseImage_emptyBlockMapFile(self):
621    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
622    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
623      target_files_zip.write(
624          test_utils.construct_sparse_image([
625              (0xCAC1, 6),
626              (0xCAC3, 3),
627              (0xCAC1, 4)]),
628          arcname='IMAGES/system.img')
629      target_files_zip.writestr('IMAGES/system.map', '')
630      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
631      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
632
633    tempdir = common.UnzipTemp(target_files)
634    with zipfile.ZipFile(target_files, 'r') as input_zip:
635      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
636
637    self.assertDictEqual(
638        {
639            '__COPY': RangeSet("0"),
640            '__NONZERO-0': RangeSet("1-5 9-12"),
641        },
642        sparse_image.file_map)
643
644  def test_GetSparseImage_missingImageFile(self):
645    self.assertRaises(
646        AssertionError, common.GetSparseImage, 'system2', self.testdata_dir,
647        None, False)
648    self.assertRaises(
649        AssertionError, common.GetSparseImage, 'unknown', self.testdata_dir,
650        None, False)
651
652  def test_GetSparseImage_missingBlockMapFile(self):
653    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
654    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
655      target_files_zip.write(
656          test_utils.construct_sparse_image([
657              (0xCAC1, 6),
658              (0xCAC3, 3),
659              (0xCAC1, 4)]),
660          arcname='IMAGES/system.img')
661      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
662      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
663
664    tempdir = common.UnzipTemp(target_files)
665    with zipfile.ZipFile(target_files, 'r') as input_zip:
666      self.assertRaises(
667          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
668          False)
669
670  def test_GetSparseImage_sharedBlocks_notAllowed(self):
671    """Tests the case of having overlapping blocks but disallowed."""
672    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
673    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
674      target_files_zip.write(
675          test_utils.construct_sparse_image([(0xCAC2, 16)]),
676          arcname='IMAGES/system.img')
677      # Block 10 is shared between two files.
678      target_files_zip.writestr(
679          'IMAGES/system.map',
680          '\n'.join([
681              '/system/file1 1-5 9-10',
682              '/system/file2 10-12']))
683      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
684      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
685
686    tempdir = common.UnzipTemp(target_files)
687    with zipfile.ZipFile(target_files, 'r') as input_zip:
688      self.assertRaises(
689          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
690          False)
691
692  def test_GetSparseImage_sharedBlocks_allowed(self):
693    """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true."""
694    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
695    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
696      # Construct an image with a care_map of "0-5 9-12".
697      target_files_zip.write(
698          test_utils.construct_sparse_image([(0xCAC2, 16)]),
699          arcname='IMAGES/system.img')
700      # Block 10 is shared between two files.
701      target_files_zip.writestr(
702          'IMAGES/system.map',
703          '\n'.join([
704              '/system/file1 1-5 9-10',
705              '/system/file2 10-12']))
706      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
707      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
708
709    tempdir = common.UnzipTemp(target_files)
710    with zipfile.ZipFile(target_files, 'r') as input_zip:
711      sparse_image = common.GetSparseImage('system', tempdir, input_zip, True)
712
713    self.assertDictEqual(
714        {
715            '__COPY': RangeSet("0"),
716            '__NONZERO-0': RangeSet("6-8 13-15"),
717            '/system/file1': RangeSet("1-5 9-10"),
718            '/system/file2': RangeSet("11-12"),
719        },
720        sparse_image.file_map)
721
722    # '/system/file2' should be marked with 'uses_shared_blocks', but not with
723    # 'incomplete'.
724    self.assertTrue(
725        sparse_image.file_map['/system/file2'].extra['uses_shared_blocks'])
726    self.assertNotIn(
727        'incomplete', sparse_image.file_map['/system/file2'].extra)
728
729    # All other entries should look normal without any tags.
730    self.assertFalse(sparse_image.file_map['__COPY'].extra)
731    self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra)
732    self.assertFalse(sparse_image.file_map['/system/file1'].extra)
733
734  def test_GetSparseImage_incompleteRanges(self):
735    """Tests the case of ext4 images with holes."""
736    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
737    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
738      target_files_zip.write(
739          test_utils.construct_sparse_image([(0xCAC2, 16)]),
740          arcname='IMAGES/system.img')
741      target_files_zip.writestr(
742          'IMAGES/system.map',
743          '\n'.join([
744              '/system/file1 1-5 9-10',
745              '/system/file2 11-12']))
746      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
747      # '/system/file2' has less blocks listed (2) than actual (3).
748      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
749
750    tempdir = common.UnzipTemp(target_files)
751    with zipfile.ZipFile(target_files, 'r') as input_zip:
752      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
753
754    self.assertFalse(sparse_image.file_map['/system/file1'].extra)
755    self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete'])
756
757  def test_GetSparseImage_systemRootImage_filenameWithExtraLeadingSlash(self):
758    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
759    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
760      target_files_zip.write(
761          test_utils.construct_sparse_image([(0xCAC2, 16)]),
762          arcname='IMAGES/system.img')
763      target_files_zip.writestr(
764          'IMAGES/system.map',
765          '\n'.join([
766              '//system/file1 1-5 9-10',
767              '//system/file2 11-12',
768              '/system/app/file3 13-15']))
769      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
770      # '/system/file2' has less blocks listed (2) than actual (3).
771      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
772      # '/system/app/file3' has less blocks listed (3) than actual (4).
773      target_files_zip.writestr('SYSTEM/app/file3', os.urandom(4096 * 4))
774
775    tempdir = common.UnzipTemp(target_files)
776    with zipfile.ZipFile(target_files, 'r') as input_zip:
777      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
778
779    self.assertFalse(sparse_image.file_map['//system/file1'].extra)
780    self.assertTrue(sparse_image.file_map['//system/file2'].extra['incomplete'])
781    self.assertTrue(
782        sparse_image.file_map['/system/app/file3'].extra['incomplete'])
783
784  def test_GetSparseImage_systemRootImage_nonSystemFiles(self):
785    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
786    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
787      target_files_zip.write(
788          test_utils.construct_sparse_image([(0xCAC2, 16)]),
789          arcname='IMAGES/system.img')
790      target_files_zip.writestr(
791          'IMAGES/system.map',
792          '\n'.join([
793              '//system/file1 1-5 9-10',
794              '//init.rc 13-15']))
795      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
796      # '/init.rc' has less blocks listed (3) than actual (4).
797      target_files_zip.writestr('ROOT/init.rc', os.urandom(4096 * 4))
798
799    tempdir = common.UnzipTemp(target_files)
800    with zipfile.ZipFile(target_files, 'r') as input_zip:
801      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
802
803    self.assertFalse(sparse_image.file_map['//system/file1'].extra)
804    self.assertTrue(sparse_image.file_map['//init.rc'].extra['incomplete'])
805
806  def test_GetSparseImage_fileNotFound(self):
807    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
808    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
809      target_files_zip.write(
810          test_utils.construct_sparse_image([(0xCAC2, 16)]),
811          arcname='IMAGES/system.img')
812      target_files_zip.writestr(
813          'IMAGES/system.map',
814          '\n'.join([
815              '//system/file1 1-5 9-10',
816              '//system/file2 11-12']))
817      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
818
819    tempdir = common.UnzipTemp(target_files)
820    with zipfile.ZipFile(target_files, 'r') as input_zip:
821      self.assertRaises(
822          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
823          False)
824
825  def test_GetAvbChainedPartitionArg(self):
826    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
827    info_dict = {
828        'avb_avbtool': 'avbtool',
829        'avb_system_key_path': pubkey,
830        'avb_system_rollback_index_location': 2,
831    }
832    args = common.GetAvbChainedPartitionArg('system', info_dict).split(':')
833    self.assertEqual(3, len(args))
834    self.assertEqual('system', args[0])
835    self.assertEqual('2', args[1])
836    self.assertTrue(os.path.exists(args[2]))
837
838  def test_GetAvbChainedPartitionArg_withPrivateKey(self):
839    key = os.path.join(self.testdata_dir, 'testkey.key')
840    info_dict = {
841        'avb_avbtool': 'avbtool',
842        'avb_product_key_path': key,
843        'avb_product_rollback_index_location': 2,
844    }
845    args = common.GetAvbChainedPartitionArg('product', info_dict).split(':')
846    self.assertEqual(3, len(args))
847    self.assertEqual('product', args[0])
848    self.assertEqual('2', args[1])
849    self.assertTrue(os.path.exists(args[2]))
850
851  def test_GetAvbChainedPartitionArg_withSpecifiedKey(self):
852    info_dict = {
853        'avb_avbtool': 'avbtool',
854        'avb_system_key_path': 'does-not-exist',
855        'avb_system_rollback_index_location': 2,
856    }
857    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
858    args = common.GetAvbChainedPartitionArg(
859        'system', info_dict, pubkey).split(':')
860    self.assertEqual(3, len(args))
861    self.assertEqual('system', args[0])
862    self.assertEqual('2', args[1])
863    self.assertTrue(os.path.exists(args[2]))
864
865  def test_GetAvbChainedPartitionArg_invalidKey(self):
866    pubkey = os.path.join(self.testdata_dir, 'testkey_with_passwd.x509.pem')
867    info_dict = {
868        'avb_avbtool': 'avbtool',
869        'avb_system_key_path': pubkey,
870        'avb_system_rollback_index_location': 2,
871    }
872    self.assertRaises(
873        common.ExternalError, common.GetAvbChainedPartitionArg, 'system',
874        info_dict)
875
876  INFO_DICT_DEFAULT = {
877      'recovery_api_version': 3,
878      'fstab_version': 2,
879      'system_root_image': 'true',
880      'no_recovery' : 'true',
881      'recovery_as_boot': 'true',
882  }
883
884  @staticmethod
885  def _test_LoadInfoDict_createTargetFiles(info_dict, fstab_path):
886    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
887    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
888      info_values = ''.join(
889          ['{}={}\n'.format(k, v) for k, v in sorted(info_dict.iteritems())])
890      common.ZipWriteStr(target_files_zip, 'META/misc_info.txt', info_values)
891
892      FSTAB_TEMPLATE = "/dev/block/system {} ext4 ro,barrier=1 defaults"
893      if info_dict.get('system_root_image') == 'true':
894        fstab_values = FSTAB_TEMPLATE.format('/')
895      else:
896        fstab_values = FSTAB_TEMPLATE.format('/system')
897      common.ZipWriteStr(target_files_zip, fstab_path, fstab_values)
898
899      common.ZipWriteStr(
900          target_files_zip, 'META/file_contexts', 'file-contexts')
901    return target_files
902
903  def test_LoadInfoDict(self):
904    target_files = self._test_LoadInfoDict_createTargetFiles(
905        self.INFO_DICT_DEFAULT,
906        'BOOT/RAMDISK/system/etc/recovery.fstab')
907    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
908      loaded_dict = common.LoadInfoDict(target_files_zip)
909      self.assertEqual(3, loaded_dict['recovery_api_version'])
910      self.assertEqual(2, loaded_dict['fstab_version'])
911      self.assertIn('/', loaded_dict['fstab'])
912      self.assertIn('/system', loaded_dict['fstab'])
913
914  def test_LoadInfoDict_legacyRecoveryFstabPath(self):
915    target_files = self._test_LoadInfoDict_createTargetFiles(
916        self.INFO_DICT_DEFAULT,
917        'BOOT/RAMDISK/etc/recovery.fstab')
918    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
919      loaded_dict = common.LoadInfoDict(target_files_zip)
920      self.assertEqual(3, loaded_dict['recovery_api_version'])
921      self.assertEqual(2, loaded_dict['fstab_version'])
922      self.assertIn('/', loaded_dict['fstab'])
923      self.assertIn('/system', loaded_dict['fstab'])
924
925  def test_LoadInfoDict_dirInput(self):
926    target_files = self._test_LoadInfoDict_createTargetFiles(
927        self.INFO_DICT_DEFAULT,
928        'BOOT/RAMDISK/system/etc/recovery.fstab')
929    unzipped = common.UnzipTemp(target_files)
930    loaded_dict = common.LoadInfoDict(unzipped)
931    self.assertEqual(3, loaded_dict['recovery_api_version'])
932    self.assertEqual(2, loaded_dict['fstab_version'])
933    self.assertIn('/', loaded_dict['fstab'])
934    self.assertIn('/system', loaded_dict['fstab'])
935
936  def test_LoadInfoDict_dirInput_legacyRecoveryFstabPath(self):
937    target_files = self._test_LoadInfoDict_createTargetFiles(
938        self.INFO_DICT_DEFAULT,
939        'BOOT/RAMDISK/system/etc/recovery.fstab')
940    unzipped = common.UnzipTemp(target_files)
941    loaded_dict = common.LoadInfoDict(unzipped)
942    self.assertEqual(3, loaded_dict['recovery_api_version'])
943    self.assertEqual(2, loaded_dict['fstab_version'])
944    self.assertIn('/', loaded_dict['fstab'])
945    self.assertIn('/system', loaded_dict['fstab'])
946
947  def test_LoadInfoDict_systemRootImageFalse(self):
948    # Devices not using system-as-root nor recovery-as-boot. Non-A/B devices
949    # launched prior to P will likely have this config.
950    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
951    del info_dict['no_recovery']
952    del info_dict['system_root_image']
953    del info_dict['recovery_as_boot']
954    target_files = self._test_LoadInfoDict_createTargetFiles(
955        info_dict,
956        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
957    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
958      loaded_dict = common.LoadInfoDict(target_files_zip)
959      self.assertEqual(3, loaded_dict['recovery_api_version'])
960      self.assertEqual(2, loaded_dict['fstab_version'])
961      self.assertNotIn('/', loaded_dict['fstab'])
962      self.assertIn('/system', loaded_dict['fstab'])
963
964  def test_LoadInfoDict_recoveryAsBootFalse(self):
965    # Devices using system-as-root, but with standalone recovery image. Non-A/B
966    # devices launched since P will likely have this config.
967    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
968    del info_dict['no_recovery']
969    del info_dict['recovery_as_boot']
970    target_files = self._test_LoadInfoDict_createTargetFiles(
971        info_dict,
972        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
973    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
974      loaded_dict = common.LoadInfoDict(target_files_zip)
975      self.assertEqual(3, loaded_dict['recovery_api_version'])
976      self.assertEqual(2, loaded_dict['fstab_version'])
977      self.assertIn('/', loaded_dict['fstab'])
978      self.assertIn('/system', loaded_dict['fstab'])
979
980  def test_LoadInfoDict_noRecoveryTrue(self):
981    # Device doesn't have a recovery partition at all.
982    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
983    del info_dict['recovery_as_boot']
984    target_files = self._test_LoadInfoDict_createTargetFiles(
985        info_dict,
986        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
987    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
988      loaded_dict = common.LoadInfoDict(target_files_zip)
989      self.assertEqual(3, loaded_dict['recovery_api_version'])
990      self.assertEqual(2, loaded_dict['fstab_version'])
991      self.assertIsNone(loaded_dict['fstab'])
992
993  def test_LoadInfoDict_missingMetaMiscInfoTxt(self):
994    target_files = self._test_LoadInfoDict_createTargetFiles(
995        self.INFO_DICT_DEFAULT,
996        'BOOT/RAMDISK/system/etc/recovery.fstab')
997    common.ZipDelete(target_files, 'META/misc_info.txt')
998    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
999      self.assertRaises(ValueError, common.LoadInfoDict, target_files_zip)
1000
1001  def test_LoadInfoDict_repacking(self):
1002    target_files = self._test_LoadInfoDict_createTargetFiles(
1003        self.INFO_DICT_DEFAULT,
1004        'BOOT/RAMDISK/system/etc/recovery.fstab')
1005    unzipped = common.UnzipTemp(target_files)
1006    loaded_dict = common.LoadInfoDict(unzipped, True)
1007    self.assertEqual(3, loaded_dict['recovery_api_version'])
1008    self.assertEqual(2, loaded_dict['fstab_version'])
1009    self.assertIn('/', loaded_dict['fstab'])
1010    self.assertIn('/system', loaded_dict['fstab'])
1011    self.assertEqual(
1012        os.path.join(unzipped, 'ROOT'), loaded_dict['root_dir'])
1013    self.assertEqual(
1014        os.path.join(unzipped, 'META', 'root_filesystem_config.txt'),
1015        loaded_dict['root_fs_config'])
1016
1017  def test_LoadInfoDict_repackingWithZipFileInput(self):
1018    target_files = self._test_LoadInfoDict_createTargetFiles(
1019        self.INFO_DICT_DEFAULT,
1020        'BOOT/RAMDISK/system/etc/recovery.fstab')
1021    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
1022      self.assertRaises(
1023          AssertionError, common.LoadInfoDict, target_files_zip, True)
1024
1025
1026class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase):
1027  """Checks the format of install-recovery.sh.
1028
1029  Its format should match between common.py and validate_target_files.py.
1030  """
1031
1032  def setUp(self):
1033    self._tempdir = common.MakeTempDir()
1034    # Create a dummy dict that contains the fstab info for boot&recovery.
1035    self._info = {"fstab" : {}}
1036    dummy_fstab = [
1037        "/dev/soc.0/by-name/boot /boot emmc defaults defaults",
1038        "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
1039    self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab)
1040    # Construct the gzipped recovery.img and boot.img
1041    self.recovery_data = bytearray([
1042        0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
1043        0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
1044        0x08, 0x00, 0x00, 0x00
1045    ])
1046    # echo -n "boot" | gzip -f | hd
1047    self.boot_data = bytearray([
1048        0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
1049        0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
1050    ])
1051
1052  def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
1053    loc = os.path.join(self._tempdir, prefix, name)
1054    if not os.path.exists(os.path.dirname(loc)):
1055      os.makedirs(os.path.dirname(loc))
1056    with open(loc, "w+") as f:
1057      f.write(data)
1058
1059  def test_full_recovery(self):
1060    recovery_image = common.File("recovery.img", self.recovery_data)
1061    boot_image = common.File("boot.img", self.boot_data)
1062    self._info["full_recovery_image"] = "true"
1063
1064    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1065                             recovery_image, boot_image, self._info)
1066    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1067                                                        self._info)
1068
1069  def test_recovery_from_boot(self):
1070    recovery_image = common.File("recovery.img", self.recovery_data)
1071    self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
1072    boot_image = common.File("boot.img", self.boot_data)
1073    self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
1074
1075    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1076                             recovery_image, boot_image, self._info)
1077    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1078                                                        self._info)
1079    # Validate 'recovery-from-boot' with bonus argument.
1080    self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM")
1081    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1082                             recovery_image, boot_image, self._info)
1083    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1084                                                        self._info)
1085
1086
1087class MockScriptWriter(object):
1088  """A class that mocks edify_generator.EdifyGenerator.
1089  """
1090  def __init__(self, enable_comments=False):
1091    self.lines = []
1092    self.enable_comments = enable_comments
1093  def Comment(self, comment):
1094    if self.enable_comments:
1095      self.lines.append("# {}".format(comment))
1096  def AppendExtra(self, extra):
1097    self.lines.append(extra)
1098  def __str__(self):
1099    return "\n".join(self.lines)
1100
1101
1102class MockBlockDifference(object):
1103  def __init__(self, partition, tgt, src=None):
1104    self.partition = partition
1105    self.tgt = tgt
1106    self.src = src
1107  def WriteScript(self, script, _, progress=None,
1108                  write_verify_script=False):
1109    if progress:
1110      script.AppendExtra("progress({})".format(progress))
1111    script.AppendExtra("patch({});".format(self.partition))
1112    if write_verify_script:
1113      self.WritePostInstallVerifyScript(script)
1114  def WritePostInstallVerifyScript(self, script):
1115    script.AppendExtra("verify({});".format(self.partition))
1116
1117
1118class FakeSparseImage(object):
1119  def __init__(self, size):
1120    self.blocksize = 4096
1121    self.total_blocks = size // 4096
1122    assert size % 4096 == 0, "{} is not a multiple of 4096".format(size)
1123
1124
1125class DynamicPartitionsDifferenceTest(test_utils.ReleaseToolsTestCase):
1126  @staticmethod
1127  def get_op_list(output_path):
1128    with zipfile.ZipFile(output_path, 'r') as output_zip:
1129      with output_zip.open("dynamic_partitions_op_list") as op_list:
1130        return [line.strip() for line in op_list.readlines()
1131                if not line.startswith("#")]
1132
1133  def setUp(self):
1134    self.script = MockScriptWriter()
1135    self.output_path = common.MakeTempFile(suffix='.zip')
1136
1137  def test_full(self):
1138    target_info = common.LoadDictionaryFromLines("""
1139dynamic_partition_list=system vendor
1140super_partition_groups=group_foo
1141super_group_foo_group_size={group_size}
1142super_group_foo_partition_list=system vendor
1143""".format(group_size=4 * GiB).split("\n"))
1144    block_diffs = [MockBlockDifference("system", FakeSparseImage(3 * GiB)),
1145                   MockBlockDifference("vendor", FakeSparseImage(1 * GiB))]
1146
1147    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs)
1148    with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1149      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1150
1151    self.assertEqual(str(self.script).strip(), """
1152assert(update_dynamic_partitions(package_extract_file("dynamic_partitions_op_list")));
1153patch(vendor);
1154verify(vendor);
1155unmap_partition("vendor");
1156patch(system);
1157verify(system);
1158unmap_partition("system");
1159""".strip())
1160
1161    lines = self.get_op_list(self.output_path)
1162
1163    remove_all_groups = lines.index("remove_all_groups")
1164    add_group = lines.index("add_group group_foo 4294967296")
1165    add_vendor = lines.index("add vendor group_foo")
1166    add_system = lines.index("add system group_foo")
1167    resize_vendor = lines.index("resize vendor 1073741824")
1168    resize_system = lines.index("resize system 3221225472")
1169
1170    self.assertLess(remove_all_groups, add_group,
1171                    "Should add groups after removing all groups")
1172    self.assertLess(add_group, min(add_vendor, add_system),
1173                    "Should add partitions after adding group")
1174    self.assertLess(add_system, resize_system,
1175                    "Should resize system after adding it")
1176    self.assertLess(add_vendor, resize_vendor,
1177                    "Should resize vendor after adding it")
1178
1179  def test_inc_groups(self):
1180    source_info = common.LoadDictionaryFromLines("""
1181super_partition_groups=group_foo group_bar group_baz
1182super_group_foo_group_size={group_foo_size}
1183super_group_bar_group_size={group_bar_size}
1184""".format(group_foo_size=4 * GiB, group_bar_size=3 * GiB).split("\n"))
1185    target_info = common.LoadDictionaryFromLines("""
1186super_partition_groups=group_foo group_baz group_qux
1187super_group_foo_group_size={group_foo_size}
1188super_group_baz_group_size={group_baz_size}
1189super_group_qux_group_size={group_qux_size}
1190""".format(group_foo_size=3 * GiB, group_baz_size=4 * GiB,
1191           group_qux_size=1 * GiB).split("\n"))
1192
1193    dp_diff = common.DynamicPartitionsDifference(target_info,
1194                                                 block_diffs=[],
1195                                                 source_info_dict=source_info)
1196    with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1197      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1198
1199    lines = self.get_op_list(self.output_path)
1200
1201    removed = lines.index("remove_group group_bar")
1202    shrunk = lines.index("resize_group group_foo 3221225472")
1203    grown = lines.index("resize_group group_baz 4294967296")
1204    added = lines.index("add_group group_qux 1073741824")
1205
1206    self.assertLess(max(removed, shrunk) < min(grown, added),
1207                    "ops that remove / shrink partitions must precede ops that "
1208                    "grow / add partitions")
1209
1210  def test_incremental(self):
1211    source_info = common.LoadDictionaryFromLines("""
1212dynamic_partition_list=system vendor product product_services
1213super_partition_groups=group_foo
1214super_group_foo_group_size={group_foo_size}
1215super_group_foo_partition_list=system vendor product product_services
1216""".format(group_foo_size=4 * GiB).split("\n"))
1217    target_info = common.LoadDictionaryFromLines("""
1218dynamic_partition_list=system vendor product odm
1219super_partition_groups=group_foo group_bar
1220super_group_foo_group_size={group_foo_size}
1221super_group_foo_partition_list=system vendor odm
1222super_group_bar_group_size={group_bar_size}
1223super_group_bar_partition_list=product
1224""".format(group_foo_size=3 * GiB, group_bar_size=1 * GiB).split("\n"))
1225
1226    block_diffs = [MockBlockDifference("system", FakeSparseImage(1536 * MiB),
1227                                       src=FakeSparseImage(1024 * MiB)),
1228                   MockBlockDifference("vendor", FakeSparseImage(512 * MiB),
1229                                       src=FakeSparseImage(1024 * MiB)),
1230                   MockBlockDifference("product", FakeSparseImage(1024 * MiB),
1231                                       src=FakeSparseImage(1024 * MiB)),
1232                   MockBlockDifference("product_services", None,
1233                                       src=FakeSparseImage(1024 * MiB)),
1234                   MockBlockDifference("odm", FakeSparseImage(1024 * MiB),
1235                                       src=None)]
1236
1237    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
1238                                                 source_info_dict=source_info)
1239    with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1240      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1241
1242    metadata_idx = self.script.lines.index(
1243        'assert(update_dynamic_partitions(package_extract_file('
1244        '"dynamic_partitions_op_list")));')
1245    self.assertLess(self.script.lines.index('patch(vendor);'), metadata_idx)
1246    self.assertLess(metadata_idx, self.script.lines.index('verify(vendor);'))
1247    for p in ("product", "system", "odm"):
1248      patch_idx = self.script.lines.index("patch({});".format(p))
1249      verify_idx = self.script.lines.index("verify({});".format(p))
1250      self.assertLess(metadata_idx, patch_idx,
1251                      "Should patch {} after updating metadata".format(p))
1252      self.assertLess(patch_idx, verify_idx,
1253                      "Should verify {} after patching".format(p))
1254
1255    self.assertNotIn("patch(product_services);", self.script.lines)
1256
1257    lines = self.get_op_list(self.output_path)
1258
1259    remove = lines.index("remove product_services")
1260    move_product_out = lines.index("move product default")
1261    shrink = lines.index("resize vendor 536870912")
1262    shrink_group = lines.index("resize_group group_foo 3221225472")
1263    add_group_bar = lines.index("add_group group_bar 1073741824")
1264    add_odm = lines.index("add odm group_foo")
1265    grow_existing = lines.index("resize system 1610612736")
1266    grow_added = lines.index("resize odm 1073741824")
1267    move_product_in = lines.index("move product group_bar")
1268
1269    max_idx_move_partition_out_foo = max(remove, move_product_out, shrink)
1270    min_idx_move_partition_in_foo = min(add_odm, grow_existing, grow_added)
1271
1272    self.assertLess(max_idx_move_partition_out_foo, shrink_group,
1273                    "Must shrink group after partitions inside group are shrunk"
1274                    " / removed")
1275
1276    self.assertLess(add_group_bar, move_product_in,
1277                    "Must add partitions to group after group is added")
1278
1279    self.assertLess(max_idx_move_partition_out_foo,
1280                    min_idx_move_partition_in_foo,
1281                    "Must shrink partitions / remove partitions from group"
1282                    "before adding / moving partitions into group")
1283
1284  def test_remove_partition(self):
1285    source_info = common.LoadDictionaryFromLines("""
1286blockimgdiff_versions=3,4
1287use_dynamic_partitions=true
1288dynamic_partition_list=foo
1289super_partition_groups=group_foo
1290super_group_foo_group_size={group_foo_size}
1291super_group_foo_partition_list=foo
1292""".format(group_foo_size=4 * GiB).split("\n"))
1293    target_info = common.LoadDictionaryFromLines("""
1294blockimgdiff_versions=3,4
1295use_dynamic_partitions=true
1296super_partition_groups=group_foo
1297super_group_foo_group_size={group_foo_size}
1298""".format(group_foo_size=4 * GiB).split("\n"))
1299
1300    common.OPTIONS.info_dict = target_info
1301    common.OPTIONS.target_info_dict = target_info
1302    common.OPTIONS.source_info_dict = source_info
1303    common.OPTIONS.cache_size = 4 * 4096
1304
1305    block_diffs = [common.BlockDifference("foo", EmptyImage(),
1306                                          src=DataImage("source", pad=True))]
1307
1308    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
1309                                                 source_info_dict=source_info)
1310    with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1311      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1312
1313    self.assertNotIn("block_image_update", str(self.script),
1314                     "Removed partition should not be patched.")
1315
1316    lines = self.get_op_list(self.output_path)
1317    self.assertEqual(lines, ["remove foo"])
1318