• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Copyright (C) 2018 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import copy
18import os
19import os.path
20import zipfile
21
22import common
23import ota_metadata_pb2
24import test_utils
25from ota_utils import (
26    BuildLegacyOtaMetadata, CalculateRuntimeDevicesAndFingerprints,
27    ConstructOtaApexInfo, FinalizeMetadata, GetPackageMetadata, PropertyFiles)
28from ota_from_target_files import (
29    _LoadOemDicts, AbOtaPropertyFiles,
30    GetTargetFilesZipForCustomImagesUpdates,
31    GetTargetFilesZipForPartialUpdates,
32    GetTargetFilesZipForSecondaryImages,
33    GetTargetFilesZipWithoutPostinstallConfig,
34    Payload, PayloadSigner, POSTINSTALL_CONFIG,
35    StreamingPropertyFiles, AB_PARTITIONS)
36from apex_utils import GetApexInfoFromTargetFiles
37from test_utils import PropertyFilesTestCase
38
39
40def construct_target_files(secondary=False, compressedApex=False):
41  """Returns a target-files.zip file for generating OTA packages."""
42  target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
43  with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
44    # META/update_engine_config.txt
45    target_files_zip.writestr(
46        'META/update_engine_config.txt',
47        "PAYLOAD_MAJOR_VERSION=2\nPAYLOAD_MINOR_VERSION=4\n")
48
49    # META/postinstall_config.txt
50    target_files_zip.writestr(
51        POSTINSTALL_CONFIG,
52        '\n'.join([
53            "RUN_POSTINSTALL_system=true",
54            "POSTINSTALL_PATH_system=system/bin/otapreopt_script",
55            "FILESYSTEM_TYPE_system=ext4",
56            "POSTINSTALL_OPTIONAL_system=true",
57        ]))
58
59    ab_partitions = [
60        ('IMAGES', 'boot'),
61        ('IMAGES', 'system'),
62        ('IMAGES', 'vendor'),
63        ('RADIO', 'bootloader'),
64        ('RADIO', 'modem'),
65    ]
66    # META/ab_partitions.txt
67    target_files_zip.writestr(
68        'META/ab_partitions.txt',
69        '\n'.join([partition[1] for partition in ab_partitions]))
70
71    # Create fake images for each of them.
72    for path, partition in ab_partitions:
73      target_files_zip.writestr(
74          '{}/{}.img'.format(path, partition),
75          os.urandom(len(partition)))
76
77    # system_other shouldn't appear in META/ab_partitions.txt.
78    if secondary:
79      target_files_zip.writestr('IMAGES/system_other.img',
80                                os.urandom(len("system_other")))
81
82    if compressedApex:
83      apex_file_name = 'com.android.apex.compressed.v1.capex'
84      apex_file = os.path.join(test_utils.get_current_dir(), apex_file_name)
85      target_files_zip.write(apex_file, 'SYSTEM/apex/' + apex_file_name)
86
87  return target_files
88
89
90class LoadOemDictsTest(test_utils.ReleaseToolsTestCase):
91
92  def test_NoneDict(self):
93    self.assertIsNone(_LoadOemDicts(None))
94
95  def test_SingleDict(self):
96    dict_file = common.MakeTempFile()
97    with open(dict_file, 'w') as dict_fp:
98      dict_fp.write('abc=1\ndef=2\nxyz=foo\na.b.c=bar\n')
99
100    oem_dicts = _LoadOemDicts([dict_file])
101    self.assertEqual(1, len(oem_dicts))
102    self.assertEqual('foo', oem_dicts[0]['xyz'])
103    self.assertEqual('bar', oem_dicts[0]['a.b.c'])
104
105  def test_MultipleDicts(self):
106    oem_source = []
107    for i in range(3):
108      dict_file = common.MakeTempFile()
109      with open(dict_file, 'w') as dict_fp:
110        dict_fp.write(
111            'ro.build.index={}\ndef=2\nxyz=foo\na.b.c=bar\n'.format(i))
112      oem_source.append(dict_file)
113
114    oem_dicts = _LoadOemDicts(oem_source)
115    self.assertEqual(3, len(oem_dicts))
116    for i, oem_dict in enumerate(oem_dicts):
117      self.assertEqual('2', oem_dict['def'])
118      self.assertEqual('foo', oem_dict['xyz'])
119      self.assertEqual('bar', oem_dict['a.b.c'])
120      self.assertEqual('{}'.format(i), oem_dict['ro.build.index'])
121
122
123class OtaFromTargetFilesTest(test_utils.ReleaseToolsTestCase):
124  TEST_TARGET_INFO_DICT = {
125      'build.prop': common.PartitionBuildProps.FromDictionary(
126          'system', {
127              'ro.product.device': 'product-device',
128              'ro.build.fingerprint': 'build-fingerprint-target',
129              'ro.build.version.incremental': 'build-version-incremental-target',
130              'ro.build.version.sdk': '27',
131              'ro.build.version.security_patch': '2017-12-01',
132              'ro.build.date.utc': '1500000000'}
133      )
134  }
135
136  TEST_SOURCE_INFO_DICT = {
137      'build.prop': common.PartitionBuildProps.FromDictionary(
138          'system', {
139              'ro.product.device': 'product-device',
140              'ro.build.fingerprint': 'build-fingerprint-source',
141              'ro.build.version.incremental': 'build-version-incremental-source',
142              'ro.build.version.sdk': '25',
143              'ro.build.version.security_patch': '2016-12-01',
144              'ro.build.date.utc': '1400000000'}
145      )
146  }
147
148  TEST_INFO_DICT_USES_OEM_PROPS = {
149      'build.prop': common.PartitionBuildProps.FromDictionary(
150          'system', {
151              'ro.product.name': 'product-name',
152              'ro.build.thumbprint': 'build-thumbprint',
153              'ro.build.bar': 'build-bar'}
154      ),
155      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
156          'vendor', {
157              'ro.vendor.build.fingerprint': 'vendor-build-fingerprint'}
158      ),
159      'property1': 'value1',
160      'property2': 4096,
161      'oem_fingerprint_properties': 'ro.product.device ro.product.brand',
162  }
163
164  def setUp(self):
165    self.testdata_dir = test_utils.get_testdata_dir()
166    self.assertTrue(os.path.exists(self.testdata_dir))
167
168    # Reset the global options as in ota_from_target_files.py.
169    common.OPTIONS.incremental_source = None
170    common.OPTIONS.downgrade = False
171    common.OPTIONS.retrofit_dynamic_partitions = False
172    common.OPTIONS.timestamp = False
173    common.OPTIONS.wipe_user_data = False
174    common.OPTIONS.no_signing = False
175    common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
176    common.OPTIONS.key_passwords = {
177        common.OPTIONS.package_key: None,
178    }
179
180    common.OPTIONS.search_path = test_utils.get_search_path()
181
182  @staticmethod
183  def GetLegacyOtaMetadata(target_info, source_info=None):
184    metadata_proto = GetPackageMetadata(target_info, source_info)
185    return BuildLegacyOtaMetadata(metadata_proto)
186
187  def test_GetPackageMetadata_abOta_full(self):
188    target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
189    target_info_dict['ab_update'] = 'true'
190    target_info_dict['ab_partitions'] = []
191    target_info = common.BuildInfo(target_info_dict, None)
192    metadata = self.GetLegacyOtaMetadata(target_info)
193    self.assertDictEqual(
194        {
195            'ota-type': 'AB',
196            'ota-required-cache': '0',
197            'post-build': 'build-fingerprint-target',
198            'post-build-incremental': 'build-version-incremental-target',
199            'post-sdk-level': '27',
200            'post-security-patch-level': '2017-12-01',
201            'post-timestamp': '1500000000',
202            'pre-device': 'product-device',
203        },
204        metadata)
205
206  def test_GetPackageMetadata_abOta_incremental(self):
207    target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
208    target_info_dict['ab_update'] = 'true'
209    target_info_dict['ab_partitions'] = []
210    target_info = common.BuildInfo(target_info_dict, None)
211    source_info = common.BuildInfo(self.TEST_SOURCE_INFO_DICT, None)
212    common.OPTIONS.incremental_source = ''
213    metadata = self.GetLegacyOtaMetadata(target_info, source_info)
214    self.assertDictEqual(
215        {
216            'ota-type': 'AB',
217            'ota-required-cache': '0',
218            'post-build': 'build-fingerprint-target',
219            'post-build-incremental': 'build-version-incremental-target',
220            'post-sdk-level': '27',
221            'post-security-patch-level': '2017-12-01',
222            'post-timestamp': '1500000000',
223            'pre-device': 'product-device',
224            'pre-build': 'build-fingerprint-source',
225            'pre-build-incremental': 'build-version-incremental-source',
226        },
227        metadata)
228
229  def test_GetPackageMetadata_nonAbOta_full(self):
230    target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
231    metadata = self.GetLegacyOtaMetadata(target_info)
232    self.assertDictEqual(
233        {
234            'ota-type': 'BLOCK',
235            'ota-required-cache': '0',
236            'post-build': 'build-fingerprint-target',
237            'post-build-incremental': 'build-version-incremental-target',
238            'post-sdk-level': '27',
239            'post-security-patch-level': '2017-12-01',
240            'post-timestamp': '1500000000',
241            'pre-device': 'product-device',
242        },
243        metadata)
244
245  def test_GetPackageMetadata_nonAbOta_incremental(self):
246    target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
247    source_info = common.BuildInfo(self.TEST_SOURCE_INFO_DICT, None)
248    common.OPTIONS.incremental_source = ''
249    metadata = self.GetLegacyOtaMetadata(target_info, source_info)
250    self.assertDictEqual(
251        {
252            'ota-type': 'BLOCK',
253            'ota-required-cache': '0',
254            'post-build': 'build-fingerprint-target',
255            'post-build-incremental': 'build-version-incremental-target',
256            'post-sdk-level': '27',
257            'post-security-patch-level': '2017-12-01',
258            'post-timestamp': '1500000000',
259            'pre-device': 'product-device',
260            'pre-build': 'build-fingerprint-source',
261            'pre-build-incremental': 'build-version-incremental-source',
262        },
263        metadata)
264
265  def test_GetPackageMetadata_wipe(self):
266    target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
267    common.OPTIONS.wipe_user_data = True
268    metadata = self.GetLegacyOtaMetadata(target_info)
269    self.assertDictEqual(
270        {
271            'ota-type': 'BLOCK',
272            'ota-required-cache': '0',
273            'ota-wipe': 'yes',
274            'post-build': 'build-fingerprint-target',
275            'post-build-incremental': 'build-version-incremental-target',
276            'post-sdk-level': '27',
277            'post-security-patch-level': '2017-12-01',
278            'post-timestamp': '1500000000',
279            'pre-device': 'product-device',
280        },
281        metadata)
282
283  @test_utils.SkipIfExternalToolsUnavailable()
284  def test_GetApexInfoFromTargetFiles(self):
285    target_files = construct_target_files(compressedApex=True)
286    apex_infos = GetApexInfoFromTargetFiles(target_files, 'system')
287    self.assertEqual(len(apex_infos), 1)
288    self.assertEqual(apex_infos[0].package_name, "com.android.apex.compressed")
289    self.assertEqual(apex_infos[0].version, 1)
290    self.assertEqual(apex_infos[0].is_compressed, True)
291    # Compare the decompressed APEX size with the original uncompressed APEX
292    original_apex_name = 'com.android.apex.compressed.v1_original.apex'
293    original_apex_filepath = os.path.join(
294        test_utils.get_current_dir(), original_apex_name)
295    uncompressed_apex_size = os.path.getsize(original_apex_filepath)
296    self.assertEqual(apex_infos[0].decompressed_size, uncompressed_apex_size)
297
298  @staticmethod
299  def construct_tf_with_apex_info(infos):
300    apex_metadata_proto = ota_metadata_pb2.ApexMetadata()
301    apex_metadata_proto.apex_info.extend(infos)
302
303    output = common.MakeTempFile(suffix='.zip')
304    with zipfile.ZipFile(output, 'w') as zfp:
305      common.ZipWriteStr(zfp, "META/apex_info.pb",
306                         apex_metadata_proto.SerializeToString())
307    return output
308
309  def test_ConstructOtaApexInfo_incremental_package(self):
310    infos = [ota_metadata_pb2.ApexInfo(package_name='com.android.apex.1',
311                                       version=1000, is_compressed=False),
312             ota_metadata_pb2.ApexInfo(package_name='com.android.apex.2',
313                                       version=2000, is_compressed=True)]
314    target_file = self.construct_tf_with_apex_info(infos)
315
316    with zipfile.ZipFile(target_file) as target_zip:
317      info_bytes = ConstructOtaApexInfo(target_zip, source_file=target_file)
318    apex_metadata_proto = ota_metadata_pb2.ApexMetadata()
319    apex_metadata_proto.ParseFromString(info_bytes)
320
321    info_list = apex_metadata_proto.apex_info
322    self.assertEqual(2, len(info_list))
323    self.assertEqual('com.android.apex.1', info_list[0].package_name)
324    self.assertEqual(1000, info_list[0].version)
325    self.assertEqual(1000, info_list[0].source_version)
326
327  def test_GetPackageMetadata_retrofitDynamicPartitions(self):
328    target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
329    common.OPTIONS.retrofit_dynamic_partitions = True
330    metadata = self.GetLegacyOtaMetadata(target_info)
331    self.assertDictEqual(
332        {
333            'ota-retrofit-dynamic-partitions': 'yes',
334            'ota-type': 'BLOCK',
335            'ota-required-cache': '0',
336            'post-build': 'build-fingerprint-target',
337            'post-build-incremental': 'build-version-incremental-target',
338            'post-sdk-level': '27',
339            'post-security-patch-level': '2017-12-01',
340            'post-timestamp': '1500000000',
341            'pre-device': 'product-device',
342        },
343        metadata)
344
345  @staticmethod
346  def _test_GetPackageMetadata_swapBuildTimestamps(target_info, source_info):
347    (target_info['build.prop'].build_props['ro.build.date.utc'],
348     source_info['build.prop'].build_props['ro.build.date.utc']) = (
349         source_info['build.prop'].build_props['ro.build.date.utc'],
350         target_info['build.prop'].build_props['ro.build.date.utc'])
351
352  def test_GetPackageMetadata_unintentionalDowngradeDetected(self):
353    target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
354    source_info_dict = copy.deepcopy(self.TEST_SOURCE_INFO_DICT)
355    self._test_GetPackageMetadata_swapBuildTimestamps(
356        target_info_dict, source_info_dict)
357
358    target_info = common.BuildInfo(target_info_dict, None)
359    source_info = common.BuildInfo(source_info_dict, None)
360    common.OPTIONS.incremental_source = ''
361    self.assertRaises(RuntimeError, self.GetLegacyOtaMetadata, target_info,
362                      source_info)
363
364  def test_GetPackageMetadata_downgrade(self):
365    target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
366    source_info_dict = copy.deepcopy(self.TEST_SOURCE_INFO_DICT)
367    self._test_GetPackageMetadata_swapBuildTimestamps(
368        target_info_dict, source_info_dict)
369
370    target_info = common.BuildInfo(target_info_dict, None)
371    source_info = common.BuildInfo(source_info_dict, None)
372    common.OPTIONS.incremental_source = ''
373    common.OPTIONS.downgrade = True
374    common.OPTIONS.wipe_user_data = True
375    common.OPTIONS.spl_downgrade = True
376    metadata = self.GetLegacyOtaMetadata(target_info, source_info)
377    # Reset spl_downgrade so other tests are unaffected
378    common.OPTIONS.spl_downgrade = False
379
380    self.assertDictEqual(
381        {
382            'ota-downgrade': 'yes',
383            'ota-type': 'BLOCK',
384            'ota-required-cache': '0',
385            'ota-wipe': 'yes',
386            'post-build': 'build-fingerprint-target',
387            'post-build-incremental': 'build-version-incremental-target',
388            'post-sdk-level': '27',
389            'post-security-patch-level': '2017-12-01',
390            'post-timestamp': '1400000000',
391            'pre-device': 'product-device',
392            'pre-build': 'build-fingerprint-source',
393            'pre-build-incremental': 'build-version-incremental-source',
394            'spl-downgrade': 'yes',
395        },
396        metadata)
397
398  @test_utils.SkipIfExternalToolsUnavailable()
399  def test_GetTargetFilesZipForSecondaryImages(self):
400    input_file = construct_target_files(secondary=True)
401    target_file = GetTargetFilesZipForSecondaryImages(input_file)
402
403    with zipfile.ZipFile(target_file) as verify_zip:
404      namelist = verify_zip.namelist()
405      ab_partitions = verify_zip.read('META/ab_partitions.txt').decode()
406
407    self.assertIn('META/ab_partitions.txt', namelist)
408    self.assertIn('IMAGES/system.img', namelist)
409    self.assertIn('RADIO/bootloader.img', namelist)
410    self.assertIn(POSTINSTALL_CONFIG, namelist)
411
412    self.assertNotIn('IMAGES/boot.img', namelist)
413    self.assertNotIn('IMAGES/system_other.img', namelist)
414    self.assertNotIn('IMAGES/system.map', namelist)
415    self.assertNotIn('RADIO/modem.img', namelist)
416
417    expected_ab_partitions = ['system', 'bootloader']
418    self.assertEqual('\n'.join(expected_ab_partitions), ab_partitions)
419
420  @test_utils.SkipIfExternalToolsUnavailable()
421  def test_GetTargetFilesZipForSecondaryImages_skipPostinstall(self):
422    input_file = construct_target_files(secondary=True)
423    target_file = GetTargetFilesZipForSecondaryImages(
424        input_file, skip_postinstall=True)
425
426    with zipfile.ZipFile(target_file) as verify_zip:
427      namelist = verify_zip.namelist()
428
429    self.assertIn('META/ab_partitions.txt', namelist)
430    self.assertIn('IMAGES/system.img', namelist)
431    self.assertIn('RADIO/bootloader.img', namelist)
432
433    self.assertNotIn('IMAGES/boot.img', namelist)
434    self.assertNotIn('IMAGES/system_other.img', namelist)
435    self.assertNotIn('IMAGES/system.map', namelist)
436    self.assertNotIn('RADIO/modem.img', namelist)
437    self.assertNotIn(POSTINSTALL_CONFIG, namelist)
438
439  @test_utils.SkipIfExternalToolsUnavailable()
440  def test_GetTargetFilesZipForSecondaryImages_withoutRadioImages(self):
441    input_file = construct_target_files(secondary=True)
442    common.ZipDelete(input_file, 'RADIO/bootloader.img')
443    common.ZipDelete(input_file, 'RADIO/modem.img')
444    target_file = GetTargetFilesZipForSecondaryImages(input_file)
445
446    with zipfile.ZipFile(target_file) as verify_zip:
447      namelist = verify_zip.namelist()
448
449    self.assertIn('META/ab_partitions.txt', namelist)
450    self.assertIn('IMAGES/system.img', namelist)
451    self.assertIn(POSTINSTALL_CONFIG, namelist)
452
453    self.assertNotIn('IMAGES/boot.img', namelist)
454    self.assertNotIn('IMAGES/system_other.img', namelist)
455    self.assertNotIn('IMAGES/system.map', namelist)
456    self.assertNotIn('RADIO/bootloader.img', namelist)
457    self.assertNotIn('RADIO/modem.img', namelist)
458
459  @test_utils.SkipIfExternalToolsUnavailable()
460  def test_GetTargetFilesZipForSecondaryImages_dynamicPartitions(self):
461    input_file = construct_target_files(secondary=True)
462    misc_info = '\n'.join([
463        'use_dynamic_partition_size=true',
464        'use_dynamic_partitions=true',
465        'dynamic_partition_list=system vendor product',
466        'super_partition_groups=google_dynamic_partitions',
467        'super_google_dynamic_partitions_group_size=4873781248',
468        'super_google_dynamic_partitions_partition_list=system vendor product',
469    ])
470    dynamic_partitions_info = '\n'.join([
471        'super_partition_groups=google_dynamic_partitions',
472        'super_google_dynamic_partitions_group_size=4873781248',
473        'super_google_dynamic_partitions_partition_list=system vendor product',
474    ])
475
476    with zipfile.ZipFile(input_file, 'a', allowZip64=True) as append_zip:
477      common.ZipWriteStr(append_zip, 'META/misc_info.txt', misc_info)
478      common.ZipWriteStr(append_zip, 'META/dynamic_partitions_info.txt',
479                         dynamic_partitions_info)
480
481    target_file = GetTargetFilesZipForSecondaryImages(input_file)
482
483    with zipfile.ZipFile(target_file) as verify_zip:
484      namelist = verify_zip.namelist()
485      updated_misc_info = verify_zip.read('META/misc_info.txt').decode()
486      updated_dynamic_partitions_info = verify_zip.read(
487          'META/dynamic_partitions_info.txt').decode()
488
489    self.assertIn('META/ab_partitions.txt', namelist)
490    self.assertIn('IMAGES/system.img', namelist)
491    self.assertIn(POSTINSTALL_CONFIG, namelist)
492    self.assertIn('META/misc_info.txt', namelist)
493    self.assertIn('META/dynamic_partitions_info.txt', namelist)
494
495    self.assertNotIn('IMAGES/boot.img', namelist)
496    self.assertNotIn('IMAGES/system_other.img', namelist)
497    self.assertNotIn('IMAGES/system.map', namelist)
498
499    # Check the vendor & product are removed from the partitions list.
500    expected_misc_info = misc_info.replace('system vendor product',
501                                           'system')
502    expected_dynamic_partitions_info = dynamic_partitions_info.replace(
503        'system vendor product', 'system')
504    self.assertEqual(expected_misc_info, updated_misc_info)
505    self.assertEqual(expected_dynamic_partitions_info,
506                     updated_dynamic_partitions_info)
507
508  @test_utils.SkipIfExternalToolsUnavailable()
509  def test_GetTargetFilesZipForPartialUpdates_singlePartition(self):
510    input_file = construct_target_files()
511    with zipfile.ZipFile(input_file, 'a', allowZip64=True) as append_zip:
512      common.ZipWriteStr(append_zip, 'IMAGES/system.map', 'fake map')
513
514    target_file = GetTargetFilesZipForPartialUpdates(input_file, ['system'])
515    with zipfile.ZipFile(target_file) as verify_zip:
516      namelist = verify_zip.namelist()
517      ab_partitions = verify_zip.read('META/ab_partitions.txt').decode()
518
519    self.assertIn('META/ab_partitions.txt', namelist)
520    self.assertIn('META/update_engine_config.txt', namelist)
521    self.assertIn('IMAGES/system.img', namelist)
522    self.assertIn('IMAGES/system.map', namelist)
523
524    self.assertNotIn('IMAGES/boot.img', namelist)
525    self.assertNotIn('IMAGES/system_other.img', namelist)
526    self.assertNotIn('RADIO/bootloader.img', namelist)
527    self.assertNotIn('RADIO/modem.img', namelist)
528
529    self.assertEqual('system', ab_partitions)
530
531  @test_utils.SkipIfExternalToolsUnavailable()
532  def test_GetTargetFilesZipForPartialUpdates_unrecognizedPartition(self):
533    input_file = construct_target_files()
534    self.assertRaises(ValueError, GetTargetFilesZipForPartialUpdates,
535                      input_file, ['product'])
536
537  @test_utils.SkipIfExternalToolsUnavailable()
538  def test_GetTargetFilesZipForPartialUpdates_dynamicPartitions(self):
539    input_file = construct_target_files(secondary=True)
540    misc_info = '\n'.join([
541        'use_dynamic_partition_size=true',
542        'use_dynamic_partitions=true',
543        'dynamic_partition_list=system vendor product',
544        'super_partition_groups=google_dynamic_partitions',
545        'super_google_dynamic_partitions_group_size=4873781248',
546        'super_google_dynamic_partitions_partition_list=system vendor product',
547    ])
548    dynamic_partitions_info = '\n'.join([
549        'super_partition_groups=google_dynamic_partitions',
550        'super_google_dynamic_partitions_group_size=4873781248',
551        'super_google_dynamic_partitions_partition_list=system vendor product',
552    ])
553
554    with zipfile.ZipFile(input_file, 'a', allowZip64=True) as append_zip:
555      common.ZipWriteStr(append_zip, 'META/misc_info.txt', misc_info)
556      common.ZipWriteStr(append_zip, 'META/dynamic_partitions_info.txt',
557                         dynamic_partitions_info)
558
559    target_file = GetTargetFilesZipForPartialUpdates(input_file,
560                                                     ['boot', 'system'])
561    with zipfile.ZipFile(target_file) as verify_zip:
562      namelist = verify_zip.namelist()
563      ab_partitions = verify_zip.read('META/ab_partitions.txt').decode()
564      updated_misc_info = verify_zip.read('META/misc_info.txt').decode()
565      updated_dynamic_partitions_info = verify_zip.read(
566          'META/dynamic_partitions_info.txt').decode()
567
568    self.assertIn('META/ab_partitions.txt', namelist)
569    self.assertIn('IMAGES/boot.img', namelist)
570    self.assertIn('IMAGES/system.img', namelist)
571    self.assertIn('META/misc_info.txt', namelist)
572    self.assertIn('META/dynamic_partitions_info.txt', namelist)
573
574    self.assertNotIn('IMAGES/system_other.img', namelist)
575    self.assertNotIn('RADIO/bootloader.img', namelist)
576    self.assertNotIn('RADIO/modem.img', namelist)
577
578    # Check the vendor & product are removed from the partitions list.
579    expected_misc_info = misc_info.replace('system vendor product',
580                                           'system')
581    expected_dynamic_partitions_info = dynamic_partitions_info.replace(
582        'system vendor product', 'system')
583    self.assertEqual(expected_misc_info, updated_misc_info)
584    self.assertEqual(expected_dynamic_partitions_info,
585                     updated_dynamic_partitions_info)
586    self.assertEqual('boot\nsystem', ab_partitions)
587
588  @test_utils.SkipIfExternalToolsUnavailable()
589  def test_GetTargetFilesZipWithoutPostinstallConfig(self):
590    input_file = construct_target_files()
591    target_file = GetTargetFilesZipWithoutPostinstallConfig(input_file)
592    with zipfile.ZipFile(target_file) as verify_zip:
593      self.assertNotIn(POSTINSTALL_CONFIG, verify_zip.namelist())
594
595  @test_utils.SkipIfExternalToolsUnavailable()
596  def test_GetTargetFilesZipWithoutPostinstallConfig_missingEntry(self):
597    input_file = construct_target_files()
598    common.ZipDelete(input_file, POSTINSTALL_CONFIG)
599    target_file = GetTargetFilesZipWithoutPostinstallConfig(input_file)
600    with zipfile.ZipFile(target_file) as verify_zip:
601      self.assertNotIn(POSTINSTALL_CONFIG, verify_zip.namelist())
602
603  @test_utils.SkipIfExternalToolsUnavailable()
604  def test_GetTargetFilesZipForCustomImagesUpdates_oemDefaultImage(self):
605    input_file = construct_target_files()
606    with zipfile.ZipFile(input_file, 'a', allowZip64=True) as append_zip:
607      common.ZipWriteStr(append_zip, 'IMAGES/oem.img', 'oem')
608      common.ZipWriteStr(append_zip, 'IMAGES/oem_test.img', 'oem_test')
609
610    target_file = GetTargetFilesZipForCustomImagesUpdates(
611        input_file, {'oem': 'oem.img'})
612
613    with zipfile.ZipFile(target_file) as verify_zip:
614      namelist = verify_zip.namelist()
615      ab_partitions = verify_zip.read('META/ab_partitions.txt').decode()
616      oem_image = verify_zip.read('IMAGES/oem.img').decode()
617
618    self.assertIn('META/ab_partitions.txt', namelist)
619    self.assertEqual('boot\nsystem\nvendor\nbootloader\nmodem', ab_partitions)
620    self.assertIn('IMAGES/oem.img', namelist)
621    self.assertEqual('oem', oem_image)
622
623  @test_utils.SkipIfExternalToolsUnavailable()
624  def test_GetTargetFilesZipForCustomImagesUpdates_oemTestImage(self):
625    input_file = construct_target_files()
626    with zipfile.ZipFile(input_file, 'a', allowZip64=True) as append_zip:
627      common.ZipWriteStr(append_zip, 'IMAGES/oem.img', 'oem')
628      common.ZipWriteStr(append_zip, 'IMAGES/oem_test.img', 'oem_test')
629
630    target_file = GetTargetFilesZipForCustomImagesUpdates(
631        input_file, {'oem': 'oem_test.img'})
632
633    with zipfile.ZipFile(target_file) as verify_zip:
634      namelist = verify_zip.namelist()
635      ab_partitions = verify_zip.read('META/ab_partitions.txt').decode()
636      oem_image = verify_zip.read('IMAGES/oem.img').decode()
637
638    self.assertIn('META/ab_partitions.txt', namelist)
639    self.assertEqual('boot\nsystem\nvendor\nbootloader\nmodem', ab_partitions)
640    self.assertIn('IMAGES/oem.img', namelist)
641    self.assertEqual('oem_test', oem_image)
642
643  def _test_FinalizeMetadata(self, large_entry=False):
644    entries = [
645        'required-entry1',
646        'required-entry2',
647    ]
648    zip_file = PropertyFilesTest.construct_zip_package(entries)
649    # Add a large entry of 1 GiB if requested.
650    if large_entry:
651      with zipfile.ZipFile(zip_file, 'a', allowZip64=True) as zip_fp:
652        zip_fp.writestr(
653            # Using 'zoo' so that the entry stays behind others after signing.
654            'zoo',
655            'A' * 1024 * 1024 * 1024,
656            zipfile.ZIP_STORED)
657
658    metadata = ota_metadata_pb2.OtaMetadata()
659    output_file = common.MakeTempFile(suffix='.zip')
660    needed_property_files = (
661        TestPropertyFiles(),
662    )
663    FinalizeMetadata(metadata, zip_file, output_file, needed_property_files)
664    self.assertIn('ota-test-property-files', metadata.property_files)
665
666  @test_utils.SkipIfExternalToolsUnavailable()
667  def test_FinalizeMetadata(self):
668    self._test_FinalizeMetadata()
669
670  @test_utils.SkipIfExternalToolsUnavailable()
671  def test_FinalizeMetadata_withNoSigning(self):
672    common.OPTIONS.no_signing = True
673    self._test_FinalizeMetadata()
674
675  @test_utils.SkipIfExternalToolsUnavailable()
676  def test_FinalizeMetadata_largeEntry(self):
677    self._test_FinalizeMetadata(large_entry=True)
678
679  @test_utils.SkipIfExternalToolsUnavailable()
680  def test_FinalizeMetadata_largeEntry_withNoSigning(self):
681    common.OPTIONS.no_signing = True
682    self._test_FinalizeMetadata(large_entry=True)
683
684  @test_utils.SkipIfExternalToolsUnavailable()
685  def test_FinalizeMetadata_insufficientSpace(self):
686    entries = [
687        'required-entry1',
688        'required-entry2',
689        'optional-entry1',
690        'optional-entry2',
691    ]
692    zip_file = PropertyFilesTest.construct_zip_package(entries)
693    with zipfile.ZipFile(zip_file, 'a', allowZip64=True) as zip_fp:
694      zip_fp.writestr(
695          # 'foo-entry1' will appear ahead of all other entries (in alphabetical
696          # order) after the signing, which will in turn trigger the
697          # InsufficientSpaceException and an automatic retry.
698          'foo-entry1',
699          'A' * 1024 * 1024,
700          zipfile.ZIP_STORED)
701
702    metadata = ota_metadata_pb2.OtaMetadata()
703    needed_property_files = (
704        TestPropertyFiles(),
705    )
706    output_file = common.MakeTempFile(suffix='.zip')
707    FinalizeMetadata(metadata, zip_file, output_file, needed_property_files)
708    self.assertIn('ota-test-property-files', metadata.property_files)
709
710
711class TestPropertyFiles(PropertyFiles):
712  """A class that extends PropertyFiles for testing purpose."""
713
714  def __init__(self):
715    super(TestPropertyFiles, self).__init__()
716    self.name = 'ota-test-property-files'
717    self.required = (
718        'required-entry1',
719        'required-entry2',
720    )
721    self.optional = (
722        'optional-entry1',
723        'optional-entry2',
724    )
725
726
727class PropertyFilesTest(PropertyFilesTestCase):
728
729  @test_utils.SkipIfExternalToolsUnavailable()
730  def test_Compute(self):
731    entries = (
732        'required-entry1',
733        'required-entry2',
734    )
735    zip_file = self.construct_zip_package(entries)
736    property_files = TestPropertyFiles()
737    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
738      property_files_string = property_files.Compute(zip_fp)
739
740    tokens = self._parse_property_files_string(property_files_string)
741    self.assertEqual(4, len(tokens))
742    self._verify_entries(zip_file, tokens, entries)
743
744  def test_Compute_withOptionalEntries(self):
745    entries = (
746        'required-entry1',
747        'required-entry2',
748        'optional-entry1',
749        'optional-entry2',
750    )
751    zip_file = self.construct_zip_package(entries)
752    property_files = TestPropertyFiles()
753    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
754      property_files_string = property_files.Compute(zip_fp)
755
756    tokens = self._parse_property_files_string(property_files_string)
757    self.assertEqual(6, len(tokens))
758    self._verify_entries(zip_file, tokens, entries)
759
760  def test_Compute_missingRequiredEntry(self):
761    entries = (
762        'required-entry2',
763    )
764    zip_file = self.construct_zip_package(entries)
765    property_files = TestPropertyFiles()
766    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
767      self.assertRaises(KeyError, property_files.Compute, zip_fp)
768
769  @test_utils.SkipIfExternalToolsUnavailable()
770  def test_Finalize(self):
771    entries = [
772        'required-entry1',
773        'required-entry2',
774        'META-INF/com/android/metadata',
775        'META-INF/com/android/metadata.pb',
776    ]
777    zip_file = self.construct_zip_package(entries)
778    property_files = TestPropertyFiles()
779    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
780      raw_metadata = property_files.GetPropertyFilesString(
781          zip_fp, reserve_space=False)
782      streaming_metadata = property_files.Finalize(zip_fp, len(raw_metadata))
783    tokens = self._parse_property_files_string(streaming_metadata)
784
785    self.assertEqual(4, len(tokens))
786    # 'META-INF/com/android/metadata' will be key'd as 'metadata' in the
787    # streaming metadata.
788    entries[2] = 'metadata'
789    entries[3] = 'metadata.pb'
790    self._verify_entries(zip_file, tokens, entries)
791
792  @test_utils.SkipIfExternalToolsUnavailable()
793  def test_Finalize_assertReservedLength(self):
794    entries = (
795        'required-entry1',
796        'required-entry2',
797        'optional-entry1',
798        'optional-entry2',
799        'META-INF/com/android/metadata',
800        'META-INF/com/android/metadata.pb',
801    )
802    zip_file = self.construct_zip_package(entries)
803    property_files = TestPropertyFiles()
804    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
805      # First get the raw metadata string (i.e. without padding space).
806      raw_metadata = property_files.GetPropertyFilesString(
807          zip_fp, reserve_space=False)
808      raw_length = len(raw_metadata)
809
810      # Now pass in the exact expected length.
811      streaming_metadata = property_files.Finalize(zip_fp, raw_length)
812      self.assertEqual(raw_length, len(streaming_metadata))
813
814      # Or pass in insufficient length.
815      self.assertRaises(
816          PropertyFiles.InsufficientSpaceException,
817          property_files.Finalize,
818          zip_fp,
819          raw_length - 1)
820
821      # Or pass in a much larger size.
822      streaming_metadata = property_files.Finalize(
823          zip_fp,
824          raw_length + 20)
825      self.assertEqual(raw_length + 20, len(streaming_metadata))
826      self.assertEqual(' ' * 20, streaming_metadata[raw_length:])
827
828  def test_Verify(self):
829    entries = (
830        'required-entry1',
831        'required-entry2',
832        'optional-entry1',
833        'optional-entry2',
834        'META-INF/com/android/metadata',
835        'META-INF/com/android/metadata.pb',
836    )
837    zip_file = self.construct_zip_package(entries)
838    property_files = TestPropertyFiles()
839    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
840      # First get the raw metadata string (i.e. without padding space).
841      raw_metadata = property_files.GetPropertyFilesString(
842          zip_fp, reserve_space=False)
843
844      # Should pass the test if verification passes.
845      property_files.Verify(zip_fp, raw_metadata)
846
847      # Or raise on verification failure.
848      self.assertRaises(
849          AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x')
850
851
852class StreamingPropertyFilesTest(PropertyFilesTestCase):
853  """Additional validity checks specialized for StreamingPropertyFiles."""
854
855  def test_init(self):
856    property_files = StreamingPropertyFiles()
857    self.assertEqual('ota-streaming-property-files', property_files.name)
858    self.assertEqual(
859        (
860            'payload.bin',
861            'payload_properties.txt',
862        ),
863        property_files.required)
864    self.assertEqual(
865        (
866            'apex_info.pb',
867            'care_map.pb',
868            'care_map.txt',
869            'compatibility.zip',
870        ),
871        property_files.optional)
872
873  def test_Compute(self):
874    entries = (
875        'payload.bin',
876        'payload_properties.txt',
877        'care_map.txt',
878        'compatibility.zip',
879    )
880    zip_file = self.construct_zip_package(entries)
881    property_files = StreamingPropertyFiles()
882    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
883      property_files_string = property_files.Compute(zip_fp)
884
885    tokens = self._parse_property_files_string(property_files_string)
886    self.assertEqual(6, len(tokens))
887    self._verify_entries(zip_file, tokens, entries)
888
889  def test_Finalize(self):
890    entries = [
891        'payload.bin',
892        'payload_properties.txt',
893        'care_map.txt',
894        'compatibility.zip',
895        'META-INF/com/android/metadata',
896        'META-INF/com/android/metadata.pb',
897    ]
898    zip_file = self.construct_zip_package(entries)
899    property_files = StreamingPropertyFiles()
900    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
901      raw_metadata = property_files.GetPropertyFilesString(
902          zip_fp, reserve_space=False)
903      streaming_metadata = property_files.Finalize(zip_fp, len(raw_metadata))
904    tokens = self._parse_property_files_string(streaming_metadata)
905
906    self.assertEqual(6, len(tokens))
907    # 'META-INF/com/android/metadata' will be key'd as 'metadata' in the
908    # streaming metadata.
909    entries[4] = 'metadata'
910    entries[5] = 'metadata.pb'
911    self._verify_entries(zip_file, tokens, entries)
912
913  def test_Verify(self):
914    entries = (
915        'payload.bin',
916        'payload_properties.txt',
917        'care_map.txt',
918        'compatibility.zip',
919        'META-INF/com/android/metadata',
920        'META-INF/com/android/metadata.pb',
921    )
922    zip_file = self.construct_zip_package(entries)
923    property_files = StreamingPropertyFiles()
924    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
925      # First get the raw metadata string (i.e. without padding space).
926      raw_metadata = property_files.GetPropertyFilesString(
927          zip_fp, reserve_space=False)
928
929      # Should pass the test if verification passes.
930      property_files.Verify(zip_fp, raw_metadata)
931
932      # Or raise on verification failure.
933      self.assertRaises(
934          AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x')
935
936
937class AbOtaPropertyFilesTest(PropertyFilesTestCase):
938  """Additional validity checks specialized for AbOtaPropertyFiles."""
939
940  # The size for payload and metadata signature size.
941  SIGNATURE_SIZE = 256
942
943  def setUp(self):
944    self.testdata_dir = test_utils.get_testdata_dir()
945    self.assertTrue(os.path.exists(self.testdata_dir))
946
947    common.OPTIONS.wipe_user_data = False
948    common.OPTIONS.payload_signer = None
949    common.OPTIONS.payload_signer_args = None
950    common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
951    common.OPTIONS.key_passwords = {
952        common.OPTIONS.package_key: None,
953    }
954
955  def test_init(self):
956    property_files = AbOtaPropertyFiles()
957    self.assertEqual('ota-property-files', property_files.name)
958    self.assertEqual(
959        (
960            'payload.bin',
961            'payload_properties.txt',
962        ),
963        property_files.required)
964    self.assertEqual(
965        (
966            'apex_info.pb',
967            'care_map.pb',
968            'care_map.txt',
969            'compatibility.zip',
970        ),
971        property_files.optional)
972
973  @test_utils.SkipIfExternalToolsUnavailable()
974  def test_GetPayloadMetadataOffsetAndSize(self):
975    target_file = construct_target_files()
976    payload = Payload()
977    payload.Generate(target_file)
978
979    payload_signer = PayloadSigner()
980    payload.Sign(payload_signer)
981
982    output_file = common.MakeTempFile(suffix='.zip')
983    with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
984      payload.WriteToZip(output_zip)
985
986    # Find out the payload metadata offset and size.
987    property_files = AbOtaPropertyFiles()
988    with zipfile.ZipFile(output_file) as input_zip:
989      # pylint: disable=protected-access
990      payload_offset, metadata_total = (
991          property_files._GetPayloadMetadataOffsetAndSize(input_zip))
992
993    # The signature proto has the following format (details in
994    #  /platform/system/update_engine/update_metadata.proto):
995    #  message Signature {
996    #    optional uint32 version = 1;
997    #    optional bytes data = 2;
998    #    optional fixed32 unpadded_signature_size = 3;
999    #  }
1000    #
1001    # According to the protobuf encoding, the tail of the signature message will
1002    # be [signature string(256 bytes) + encoding of the fixed32 number 256]. And
1003    # 256 is encoded as 'x1d\x00\x01\x00\x00':
1004    # [3 (field number) << 3 | 5 (type) + byte reverse of 0x100 (256)].
1005    # Details in (https://developers.google.com/protocol-buffers/docs/encoding)
1006    signature_tail_length = self.SIGNATURE_SIZE + 5
1007    self.assertGreater(metadata_total, signature_tail_length)
1008    with open(output_file, 'rb') as verify_fp:
1009      verify_fp.seek(payload_offset + metadata_total - signature_tail_length)
1010      metadata_signature_proto_tail = verify_fp.read(signature_tail_length)
1011
1012    self.assertEqual(b'\x1d\x00\x01\x00\x00',
1013                     metadata_signature_proto_tail[-5:])
1014    metadata_signature = metadata_signature_proto_tail[:-5]
1015
1016    # Now we extract the metadata hash via brillo_update_payload script, which
1017    # will serve as the oracle result.
1018    payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
1019    metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
1020    cmd = ['brillo_update_payload', 'hash',
1021           '--unsigned_payload', payload.payload_file,
1022           '--signature_size', str(self.SIGNATURE_SIZE),
1023           '--metadata_hash_file', metadata_sig_file,
1024           '--payload_hash_file', payload_sig_file]
1025    proc = common.Run(cmd)
1026    stdoutdata, _ = proc.communicate()
1027    self.assertEqual(
1028        0, proc.returncode,
1029        'Failed to run brillo_update_payload:\n{}'.format(stdoutdata))
1030
1031    signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file)
1032
1033    # Finally we can compare the two signatures.
1034    with open(signed_metadata_sig_file, 'rb') as verify_fp:
1035      self.assertEqual(verify_fp.read(), metadata_signature)
1036
1037  @staticmethod
1038  def construct_zip_package_withValidPayload(with_metadata=False):
1039    # Cannot use construct_zip_package() since we need a "valid" payload.bin.
1040    target_file = construct_target_files()
1041    payload = Payload()
1042    payload.Generate(target_file)
1043
1044    payload_signer = PayloadSigner()
1045    payload.Sign(payload_signer)
1046
1047    zip_file = common.MakeTempFile(suffix='.zip')
1048    with zipfile.ZipFile(zip_file, 'w', allowZip64=True) as zip_fp:
1049      # 'payload.bin',
1050      payload.WriteToZip(zip_fp)
1051
1052      # Other entries.
1053      entries = ['care_map.txt', 'compatibility.zip']
1054
1055      # Put META-INF/com/android/metadata if needed.
1056      if with_metadata:
1057        entries.append('META-INF/com/android/metadata')
1058        entries.append('META-INF/com/android/metadata.pb')
1059
1060      for entry in entries:
1061        zip_fp.writestr(
1062            entry, entry.replace('.', '-').upper(), zipfile.ZIP_STORED)
1063
1064    return zip_file
1065
1066  @test_utils.SkipIfExternalToolsUnavailable()
1067  def test_Compute(self):
1068    zip_file = self.construct_zip_package_withValidPayload()
1069    property_files = AbOtaPropertyFiles()
1070    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
1071      property_files_string = property_files.Compute(zip_fp)
1072
1073    tokens = self._parse_property_files_string(property_files_string)
1074    # "7" indcludes the four entries above, two metadata entries, and one entry
1075    # for payload-metadata.bin.
1076    self.assertEqual(7, len(tokens))
1077    self._verify_entries(
1078        zip_file, tokens, ('care_map.txt', 'compatibility.zip'))
1079
1080  @test_utils.SkipIfExternalToolsUnavailable()
1081  def test_Finalize(self):
1082    zip_file = self.construct_zip_package_withValidPayload(with_metadata=True)
1083    property_files = AbOtaPropertyFiles()
1084    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
1085      raw_metadata = property_files.GetPropertyFilesString(
1086          zip_fp, reserve_space=False)
1087      property_files_string = property_files.Finalize(
1088          zip_fp, len(raw_metadata))
1089
1090    tokens = self._parse_property_files_string(property_files_string)
1091    # "7" includes the four entries above, two metadata entries, and one entry
1092    # for payload-metadata.bin.
1093    self.assertEqual(7, len(tokens))
1094    self._verify_entries(
1095        zip_file, tokens, ('care_map.txt', 'compatibility.zip'))
1096
1097  @test_utils.SkipIfExternalToolsUnavailable()
1098  def test_Verify(self):
1099    zip_file = self.construct_zip_package_withValidPayload(with_metadata=True)
1100    property_files = AbOtaPropertyFiles()
1101    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
1102      raw_metadata = property_files.GetPropertyFilesString(
1103          zip_fp, reserve_space=False)
1104
1105      property_files.Verify(zip_fp, raw_metadata)
1106
1107
1108class PayloadSignerTest(test_utils.ReleaseToolsTestCase):
1109
1110  SIGFILE = 'sigfile.bin'
1111  SIGNED_SIGFILE = 'signed-sigfile.bin'
1112
1113  def setUp(self):
1114    self.testdata_dir = test_utils.get_testdata_dir()
1115    self.assertTrue(os.path.exists(self.testdata_dir))
1116
1117    common.OPTIONS.payload_signer = None
1118    common.OPTIONS.payload_signer_args = []
1119    common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
1120    common.OPTIONS.key_passwords = {
1121        common.OPTIONS.package_key: None,
1122    }
1123
1124  def _assertFilesEqual(self, file1, file2):
1125    with open(file1, 'rb') as fp1, open(file2, 'rb') as fp2:
1126      self.assertEqual(fp1.read(), fp2.read())
1127
1128  @test_utils.SkipIfExternalToolsUnavailable()
1129  def test_init(self):
1130    payload_signer = PayloadSigner()
1131    self.assertEqual('openssl', payload_signer.signer)
1132    self.assertEqual(256, payload_signer.maximum_signature_size)
1133
1134  @test_utils.SkipIfExternalToolsUnavailable()
1135  def test_init_withPassword(self):
1136    common.OPTIONS.package_key = os.path.join(
1137        self.testdata_dir, 'testkey_with_passwd')
1138    common.OPTIONS.key_passwords = {
1139        common.OPTIONS.package_key: 'foo',
1140    }
1141    payload_signer = PayloadSigner()
1142    self.assertEqual('openssl', payload_signer.signer)
1143
1144  def test_init_withExternalSigner(self):
1145    common.OPTIONS.payload_signer = 'abc'
1146    common.OPTIONS.payload_signer_args = ['arg1', 'arg2']
1147    common.OPTIONS.payload_signer_maximum_signature_size = '512'
1148    payload_signer = PayloadSigner()
1149    self.assertEqual('abc', payload_signer.signer)
1150    self.assertEqual(['arg1', 'arg2'], payload_signer.signer_args)
1151    self.assertEqual(512, payload_signer.maximum_signature_size)
1152
1153  @test_utils.SkipIfExternalToolsUnavailable()
1154  def test_GetMaximumSignatureSizeInBytes_512Bytes(self):
1155    signing_key = os.path.join(self.testdata_dir, 'testkey_RSA4096.key')
1156    # pylint: disable=protected-access
1157    signature_size = PayloadSigner._GetMaximumSignatureSizeInBytes(signing_key)
1158    self.assertEqual(512, signature_size)
1159
1160  @test_utils.SkipIfExternalToolsUnavailable()
1161  def test_GetMaximumSignatureSizeInBytes_ECKey(self):
1162    signing_key = os.path.join(self.testdata_dir, 'testkey_EC.key')
1163    # pylint: disable=protected-access
1164    signature_size = PayloadSigner._GetMaximumSignatureSizeInBytes(signing_key)
1165    self.assertEqual(72, signature_size)
1166
1167  @test_utils.SkipIfExternalToolsUnavailable()
1168  def test_Sign(self):
1169    payload_signer = PayloadSigner()
1170    input_file = os.path.join(self.testdata_dir, self.SIGFILE)
1171    signed_file = payload_signer.Sign(input_file)
1172
1173    verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
1174    self._assertFilesEqual(verify_file, signed_file)
1175
1176  def test_Sign_withExternalSigner_openssl(self):
1177    """Uses openssl as the external payload signer."""
1178    common.OPTIONS.payload_signer = 'openssl'
1179    common.OPTIONS.payload_signer_args = [
1180        'pkeyutl', '-sign', '-keyform', 'DER', '-inkey',
1181        os.path.join(self.testdata_dir, 'testkey.pk8'),
1182        '-pkeyopt', 'digest:sha256']
1183    payload_signer = PayloadSigner()
1184    input_file = os.path.join(self.testdata_dir, self.SIGFILE)
1185    signed_file = payload_signer.Sign(input_file)
1186
1187    verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
1188    self._assertFilesEqual(verify_file, signed_file)
1189
1190  def test_Sign_withExternalSigner_script(self):
1191    """Uses testdata/payload_signer.sh as the external payload signer."""
1192    common.OPTIONS.payload_signer = os.path.join(
1193        self.testdata_dir, 'payload_signer.sh')
1194    os.chmod(common.OPTIONS.payload_signer, 0o700)
1195    common.OPTIONS.payload_signer_args = [
1196        os.path.join(self.testdata_dir, 'testkey.pk8')]
1197    payload_signer = PayloadSigner()
1198    input_file = os.path.join(self.testdata_dir, self.SIGFILE)
1199    signed_file = payload_signer.Sign(input_file)
1200
1201    verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
1202    self._assertFilesEqual(verify_file, signed_file)
1203
1204
1205class PayloadTest(test_utils.ReleaseToolsTestCase):
1206
1207  def setUp(self):
1208    self.testdata_dir = test_utils.get_testdata_dir()
1209    self.assertTrue(os.path.exists(self.testdata_dir))
1210
1211    common.OPTIONS.wipe_user_data = False
1212    common.OPTIONS.payload_signer = None
1213    common.OPTIONS.payload_signer_args = None
1214    common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
1215    common.OPTIONS.key_passwords = {
1216        common.OPTIONS.package_key: None,
1217    }
1218
1219  @staticmethod
1220  def _create_payload_full(secondary=False):
1221    target_file = construct_target_files(secondary)
1222    payload = Payload(secondary)
1223    payload.Generate(target_file)
1224    return payload
1225
1226  @staticmethod
1227  def _create_payload_incremental():
1228    target_file = construct_target_files()
1229    source_file = construct_target_files()
1230    payload = Payload()
1231    payload.Generate(target_file, source_file)
1232    return payload
1233
1234  @test_utils.SkipIfExternalToolsUnavailable()
1235  def test_Generate_full(self):
1236    payload = self._create_payload_full()
1237    self.assertTrue(os.path.exists(payload.payload_file))
1238
1239  @test_utils.SkipIfExternalToolsUnavailable()
1240  def test_Generate_incremental(self):
1241    payload = self._create_payload_incremental()
1242    self.assertTrue(os.path.exists(payload.payload_file))
1243
1244  @test_utils.SkipIfExternalToolsUnavailable()
1245  def test_Generate_additionalArgs(self):
1246    target_file = construct_target_files()
1247    source_file = construct_target_files()
1248    payload = Payload()
1249    # This should work the same as calling payload.Generate(target_file,
1250    # source_file).
1251    payload.Generate(
1252        target_file, additional_args=["--source_image", source_file])
1253    self.assertTrue(os.path.exists(payload.payload_file))
1254
1255  @test_utils.SkipIfExternalToolsUnavailable()
1256  def test_Generate_invalidInput(self):
1257    target_file = construct_target_files()
1258    common.ZipDelete(target_file, 'IMAGES/vendor.img')
1259    payload = Payload()
1260    self.assertRaises(common.ExternalError, payload.Generate, target_file)
1261
1262  @test_utils.SkipIfExternalToolsUnavailable()
1263  def test_Sign_full(self):
1264    payload = self._create_payload_full()
1265    payload.Sign(PayloadSigner())
1266
1267    output_file = common.MakeTempFile(suffix='.zip')
1268    with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
1269      payload.WriteToZip(output_zip)
1270
1271    import check_ota_package_signature
1272    check_ota_package_signature.VerifyAbOtaPayload(
1273        os.path.join(self.testdata_dir, 'testkey.x509.pem'),
1274        output_file)
1275
1276  @test_utils.SkipIfExternalToolsUnavailable()
1277  def test_Sign_incremental(self):
1278    payload = self._create_payload_incremental()
1279    payload.Sign(PayloadSigner())
1280
1281    output_file = common.MakeTempFile(suffix='.zip')
1282    with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
1283      payload.WriteToZip(output_zip)
1284
1285    import check_ota_package_signature
1286    check_ota_package_signature.VerifyAbOtaPayload(
1287        os.path.join(self.testdata_dir, 'testkey.x509.pem'),
1288        output_file)
1289
1290  @test_utils.SkipIfExternalToolsUnavailable()
1291  def test_Sign_withDataWipe(self):
1292    common.OPTIONS.wipe_user_data = True
1293    payload = self._create_payload_full()
1294    payload.Sign(PayloadSigner())
1295
1296    with open(payload.payload_properties) as properties_fp:
1297      self.assertIn("POWERWASH=1", properties_fp.read())
1298
1299  @test_utils.SkipIfExternalToolsUnavailable()
1300  def test_Sign_secondary(self):
1301    payload = self._create_payload_full(secondary=True)
1302    payload.Sign(PayloadSigner())
1303
1304    with open(payload.payload_properties) as properties_fp:
1305      self.assertIn("SWITCH_SLOT_ON_REBOOT=0", properties_fp.read())
1306
1307  @test_utils.SkipIfExternalToolsUnavailable()
1308  def test_Sign_badSigner(self):
1309    """Tests that signing failure can be captured."""
1310    payload = self._create_payload_full()
1311    payload_signer = PayloadSigner()
1312    payload_signer.signer_args.append('bad-option')
1313    self.assertRaises(common.ExternalError, payload.Sign, payload_signer)
1314
1315  @test_utils.SkipIfExternalToolsUnavailable()
1316  def test_WriteToZip(self):
1317    payload = self._create_payload_full()
1318    payload.Sign(PayloadSigner())
1319
1320    output_file = common.MakeTempFile(suffix='.zip')
1321    with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
1322      payload.WriteToZip(output_zip)
1323
1324    with zipfile.ZipFile(output_file) as verify_zip:
1325      # First make sure we have the essential entries.
1326      namelist = verify_zip.namelist()
1327      self.assertIn(Payload.PAYLOAD_BIN, namelist)
1328      self.assertIn(Payload.PAYLOAD_PROPERTIES_TXT, namelist)
1329
1330      # Then assert these entries are stored.
1331      for entry_info in verify_zip.infolist():
1332        if entry_info.filename not in (Payload.PAYLOAD_BIN,
1333                                       Payload.PAYLOAD_PROPERTIES_TXT):
1334          continue
1335        self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type)
1336
1337  @test_utils.SkipIfExternalToolsUnavailable()
1338  def test_WriteToZip_unsignedPayload(self):
1339    """Unsigned payloads should not be allowed to be written to zip."""
1340    payload = self._create_payload_full()
1341
1342    output_file = common.MakeTempFile(suffix='.zip')
1343    with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
1344      self.assertRaises(AssertionError, payload.WriteToZip, output_zip)
1345
1346    # Also test with incremental payload.
1347    payload = self._create_payload_incremental()
1348
1349    output_file = common.MakeTempFile(suffix='.zip')
1350    with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
1351      self.assertRaises(AssertionError, payload.WriteToZip, output_zip)
1352
1353  @test_utils.SkipIfExternalToolsUnavailable()
1354  def test_WriteToZip_secondary(self):
1355    payload = self._create_payload_full(secondary=True)
1356    payload.Sign(PayloadSigner())
1357
1358    output_file = common.MakeTempFile(suffix='.zip')
1359    with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
1360      payload.WriteToZip(output_zip)
1361
1362    with zipfile.ZipFile(output_file) as verify_zip:
1363      # First make sure we have the essential entries.
1364      namelist = verify_zip.namelist()
1365      self.assertIn(Payload.SECONDARY_PAYLOAD_BIN, namelist)
1366      self.assertIn(Payload.SECONDARY_PAYLOAD_PROPERTIES_TXT, namelist)
1367
1368      # Then assert these entries are stored.
1369      for entry_info in verify_zip.infolist():
1370        if entry_info.filename not in (
1371                Payload.SECONDARY_PAYLOAD_BIN,
1372                Payload.SECONDARY_PAYLOAD_PROPERTIES_TXT):
1373          continue
1374        self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type)
1375
1376
1377class RuntimeFingerprintTest(test_utils.ReleaseToolsTestCase):
1378  MISC_INFO = [
1379      'recovery_api_version=3',
1380      'fstab_version=2',
1381      'recovery_as_boot=true',
1382      'ab_update=true',
1383  ]
1384
1385  BUILD_PROP = [
1386      'ro.build.id=build-id',
1387      'ro.build.version.incremental=version-incremental',
1388      'ro.build.type=build-type',
1389      'ro.build.tags=build-tags',
1390      'ro.build.version.release=version-release',
1391      'ro.build.version.release_or_codename=version-release',
1392      'ro.build.version.sdk=30',
1393      'ro.build.version.security_patch=2020',
1394      'ro.build.date.utc=12345678',
1395      'ro.system.build.version.release=version-release',
1396      'ro.system.build.id=build-id',
1397      'ro.system.build.version.incremental=version-incremental',
1398      'ro.system.build.type=build-type',
1399      'ro.system.build.tags=build-tags',
1400      'ro.system.build.version.sdk=30',
1401      'ro.system.build.version.security_patch=2020',
1402      'ro.system.build.date.utc=12345678',
1403      'ro.product.system.brand=generic',
1404      'ro.product.system.name=generic',
1405      'ro.product.system.device=generic',
1406  ]
1407
1408  VENDOR_BUILD_PROP = [
1409      'ro.vendor.build.version.release=version-release',
1410      'ro.vendor.build.id=build-id',
1411      'ro.vendor.build.version.incremental=version-incremental',
1412      'ro.vendor.build.type=build-type',
1413      'ro.vendor.build.tags=build-tags',
1414      'ro.vendor.build.version.sdk=30',
1415      'ro.vendor.build.version.security_patch=2020',
1416      'ro.vendor.build.date.utc=12345678',
1417      'ro.product.vendor.brand=vendor-product-brand',
1418      'ro.product.vendor.name=vendor-product-name',
1419      'ro.product.vendor.device=vendor-product-device'
1420  ]
1421
1422  def setUp(self):
1423    common.OPTIONS.oem_dicts = None
1424    self.test_dir = common.MakeTempDir()
1425    self.writeFiles({'META/misc_info.txt': '\n'.join(self.MISC_INFO)},
1426                    self.test_dir)
1427
1428  def writeFiles(self, contents_dict, out_dir):
1429    for path, content in contents_dict.items():
1430      abs_path = os.path.join(out_dir, path)
1431      dir_name = os.path.dirname(abs_path)
1432      if not os.path.exists(dir_name):
1433        os.makedirs(dir_name)
1434      with open(abs_path, 'w') as f:
1435        f.write(content)
1436
1437  @staticmethod
1438  def constructFingerprint(prefix):
1439    return '{}:version-release/build-id/version-incremental:' \
1440           'build-type/build-tags'.format(prefix)
1441
1442  def test_CalculatePossibleFingerprints_no_dynamic_fingerprint(self):
1443    build_prop = copy.deepcopy(self.BUILD_PROP)
1444    build_prop.extend([
1445        'ro.product.brand=product-brand',
1446        'ro.product.name=product-name',
1447        'ro.product.device=product-device',
1448    ])
1449    self.writeFiles({
1450        'SYSTEM/build.prop': '\n'.join(build_prop),
1451        'VENDOR/build.prop': '\n'.join(self.VENDOR_BUILD_PROP),
1452    }, self.test_dir)
1453
1454    build_info = common.BuildInfo(common.LoadInfoDict(self.test_dir))
1455    expected = ({'product-device'},
1456                {self.constructFingerprint(
1457                    'product-brand/product-name/product-device')})
1458    self.assertEqual(expected,
1459                     CalculateRuntimeDevicesAndFingerprints(build_info, {}))
1460
1461  def test_CalculatePossibleFingerprints_single_override(self):
1462    vendor_build_prop = copy.deepcopy(self.VENDOR_BUILD_PROP)
1463    vendor_build_prop.extend([
1464        'import /vendor/etc/build_${ro.boot.sku_name}.prop',
1465    ])
1466    self.writeFiles({
1467        'SYSTEM/build.prop': '\n'.join(self.BUILD_PROP),
1468        'VENDOR/build.prop': '\n'.join(vendor_build_prop),
1469        'VENDOR/etc/build_std.prop':
1470        'ro.product.vendor.name=vendor-product-std',
1471        'VENDOR/etc/build_pro.prop':
1472        'ro.product.vendor.name=vendor-product-pro',
1473    }, self.test_dir)
1474
1475    build_info = common.BuildInfo(common.LoadInfoDict(self.test_dir))
1476    boot_variable_values = {'ro.boot.sku_name': ['std', 'pro']}
1477
1478    expected = ({'vendor-product-device'}, {
1479        self.constructFingerprint(
1480            'vendor-product-brand/vendor-product-name/vendor-product-device'),
1481        self.constructFingerprint(
1482            'vendor-product-brand/vendor-product-std/vendor-product-device'),
1483        self.constructFingerprint(
1484            'vendor-product-brand/vendor-product-pro/vendor-product-device'),
1485    })
1486    self.assertEqual(
1487        expected, CalculateRuntimeDevicesAndFingerprints(
1488            build_info, boot_variable_values))
1489
1490  def test_CalculatePossibleFingerprints_multiple_overrides(self):
1491    vendor_build_prop = copy.deepcopy(self.VENDOR_BUILD_PROP)
1492    vendor_build_prop.extend([
1493        'import /vendor/etc/build_${ro.boot.sku_name}.prop',
1494        'import /vendor/etc/build_${ro.boot.device_name}.prop',
1495    ])
1496    self.writeFiles({
1497        'SYSTEM/build.prop': '\n'.join(self.BUILD_PROP),
1498        'VENDOR/build.prop': '\n'.join(vendor_build_prop),
1499        'VENDOR/etc/build_std.prop':
1500        'ro.product.vendor.name=vendor-product-std',
1501        'VENDOR/etc/build_product1.prop':
1502        'ro.product.vendor.device=vendor-device-product1',
1503        'VENDOR/etc/build_pro.prop':
1504        'ro.product.vendor.name=vendor-product-pro',
1505        'VENDOR/etc/build_product2.prop':
1506        'ro.product.vendor.device=vendor-device-product2',
1507    }, self.test_dir)
1508
1509    build_info = common.BuildInfo(common.LoadInfoDict(self.test_dir))
1510    boot_variable_values = {
1511        'ro.boot.sku_name': ['std', 'pro'],
1512        'ro.boot.device_name': ['product1', 'product2'],
1513    }
1514
1515    expected_devices = {'vendor-product-device', 'vendor-device-product1',
1516                        'vendor-device-product2'}
1517    expected_fingerprints = {
1518        self.constructFingerprint(
1519            'vendor-product-brand/vendor-product-name/vendor-product-device'),
1520        self.constructFingerprint(
1521            'vendor-product-brand/vendor-product-std/vendor-device-product1'),
1522        self.constructFingerprint(
1523            'vendor-product-brand/vendor-product-pro/vendor-device-product1'),
1524        self.constructFingerprint(
1525            'vendor-product-brand/vendor-product-std/vendor-device-product2'),
1526        self.constructFingerprint(
1527            'vendor-product-brand/vendor-product-pro/vendor-device-product2')
1528    }
1529    self.assertEqual((expected_devices, expected_fingerprints),
1530                     CalculateRuntimeDevicesAndFingerprints(
1531                         build_info, boot_variable_values))
1532
1533  def test_GetPackageMetadata_full_package(self):
1534    vendor_build_prop = copy.deepcopy(self.VENDOR_BUILD_PROP)
1535    vendor_build_prop.extend([
1536        'import /vendor/etc/build_${ro.boot.sku_name}.prop',
1537    ])
1538    self.writeFiles({
1539        'SYSTEM/build.prop': '\n'.join(self.BUILD_PROP),
1540        'VENDOR/build.prop': '\n'.join(vendor_build_prop),
1541        'VENDOR/etc/build_std.prop':
1542        'ro.product.vendor.name=vendor-product-std',
1543        'VENDOR/etc/build_pro.prop':
1544        'ro.product.vendor.name=vendor-product-pro',
1545        AB_PARTITIONS: '\n'.join(['system', 'vendor']),
1546    }, self.test_dir)
1547
1548    common.OPTIONS.boot_variable_file = common.MakeTempFile()
1549    with open(common.OPTIONS.boot_variable_file, 'w') as f:
1550      f.write('ro.boot.sku_name=std,pro')
1551
1552    build_info = common.BuildInfo(common.LoadInfoDict(self.test_dir))
1553    metadata_dict = BuildLegacyOtaMetadata(GetPackageMetadata(build_info))
1554    self.assertEqual('vendor-product-device', metadata_dict['pre-device'])
1555    fingerprints = [
1556        self.constructFingerprint(
1557            'vendor-product-brand/vendor-product-name/vendor-product-device'),
1558        self.constructFingerprint(
1559            'vendor-product-brand/vendor-product-pro/vendor-product-device'),
1560        self.constructFingerprint(
1561            'vendor-product-brand/vendor-product-std/vendor-product-device'),
1562    ]
1563    self.assertEqual('|'.join(fingerprints), metadata_dict['post-build'])
1564
1565  def CheckMetadataEqual(self, metadata_dict, metadata_proto):
1566    post_build = metadata_proto.postcondition
1567    self.assertEqual('|'.join(post_build.build),
1568                     metadata_dict['post-build'])
1569    self.assertEqual(post_build.build_incremental,
1570                     metadata_dict['post-build-incremental'])
1571    self.assertEqual(post_build.sdk_level,
1572                     metadata_dict['post-sdk-level'])
1573    self.assertEqual(post_build.security_patch_level,
1574                     metadata_dict['post-security-patch-level'])
1575
1576    if metadata_proto.type == ota_metadata_pb2.OtaMetadata.AB:
1577      ota_type = 'AB'
1578    elif metadata_proto.type == ota_metadata_pb2.OtaMetadata.BLOCK:
1579      ota_type = 'BLOCK'
1580    else:
1581      ota_type = ''
1582    self.assertEqual(ota_type, metadata_dict['ota-type'])
1583    self.assertEqual(metadata_proto.wipe,
1584                     metadata_dict.get('ota-wipe') == 'yes')
1585    self.assertEqual(metadata_proto.required_cache,
1586                     int(metadata_dict.get('ota-required-cache', 0)))
1587    self.assertEqual(metadata_proto.retrofit_dynamic_partitions,
1588                     metadata_dict.get(
1589                         'ota-retrofit-dynamic-partitions') == 'yes')
1590
1591  def test_GetPackageMetadata_incremental_package(self):
1592    vendor_build_prop = copy.deepcopy(self.VENDOR_BUILD_PROP)
1593    vendor_build_prop.extend([
1594        'import /vendor/etc/build_${ro.boot.sku_name}.prop',
1595    ])
1596    self.writeFiles({
1597        'META/misc_info.txt': '\n'.join(self.MISC_INFO),
1598        'META/ab_partitions.txt': '\n'.join(['system', 'vendor', 'product']),
1599        'SYSTEM/build.prop': '\n'.join(self.BUILD_PROP),
1600        'VENDOR/build.prop': '\n'.join(vendor_build_prop),
1601        'VENDOR/etc/build_std.prop':
1602        'ro.product.vendor.device=vendor-device-std',
1603        'VENDOR/etc/build_pro.prop':
1604        'ro.product.vendor.device=vendor-device-pro',
1605    }, self.test_dir)
1606
1607    common.OPTIONS.boot_variable_file = common.MakeTempFile()
1608    with open(common.OPTIONS.boot_variable_file, 'w') as f:
1609      f.write('ro.boot.sku_name=std,pro')
1610
1611    source_dir = common.MakeTempDir()
1612    source_build_prop = [
1613        'ro.build.version.release=source-version-release',
1614        'ro.build.id=source-build-id',
1615        'ro.build.version.incremental=source-version-incremental',
1616        'ro.build.type=build-type',
1617        'ro.build.tags=build-tags',
1618        'ro.build.version.sdk=29',
1619        'ro.build.version.security_patch=2020',
1620        'ro.build.date.utc=12340000',
1621        'ro.system.build.version.release=source-version-release',
1622        'ro.system.build.id=source-build-id',
1623        'ro.system.build.version.incremental=source-version-incremental',
1624        'ro.system.build.type=build-type',
1625        'ro.system.build.tags=build-tags',
1626        'ro.system.build.version.sdk=29',
1627        'ro.system.build.version.security_patch=2020',
1628        'ro.system.build.date.utc=12340000',
1629        'ro.product.system.brand=generic',
1630        'ro.product.system.name=generic',
1631        'ro.product.system.device=generic',
1632    ]
1633    self.writeFiles({
1634        'META/misc_info.txt': '\n'.join(self.MISC_INFO),
1635        'META/ab_partitions.txt': '\n'.join(['system', 'vendor', 'product']),
1636        'SYSTEM/build.prop': '\n'.join(source_build_prop),
1637        'VENDOR/build.prop': '\n'.join(vendor_build_prop),
1638        'VENDOR/etc/build_std.prop':
1639        'ro.product.vendor.device=vendor-device-std',
1640        'VENDOR/etc/build_pro.prop':
1641        'ro.product.vendor.device=vendor-device-pro',
1642    }, source_dir)
1643    common.OPTIONS.incremental_source = source_dir
1644
1645    target_info = common.BuildInfo(common.LoadInfoDict(self.test_dir))
1646    source_info = common.BuildInfo(common.LoadInfoDict(source_dir))
1647
1648    metadata_proto = GetPackageMetadata(target_info, source_info)
1649    metadata_dict = BuildLegacyOtaMetadata(metadata_proto)
1650    self.assertEqual(
1651        'vendor-device-pro|vendor-device-std|vendor-product-device',
1652        metadata_dict['pre-device'])
1653    source_suffix = ':source-version-release/source-build-id/' \
1654                    'source-version-incremental:build-type/build-tags'
1655    pre_fingerprints = [
1656        'vendor-product-brand/vendor-product-name/vendor-device-pro'
1657        '{}'.format(source_suffix),
1658        'vendor-product-brand/vendor-product-name/vendor-device-std'
1659        '{}'.format(source_suffix),
1660        'vendor-product-brand/vendor-product-name/vendor-product-device'
1661        '{}'.format(source_suffix),
1662    ]
1663    self.assertEqual('|'.join(pre_fingerprints), metadata_dict['pre-build'])
1664
1665    post_fingerprints = [
1666        self.constructFingerprint(
1667            'vendor-product-brand/vendor-product-name/vendor-device-pro'),
1668        self.constructFingerprint(
1669            'vendor-product-brand/vendor-product-name/vendor-device-std'),
1670        self.constructFingerprint(
1671            'vendor-product-brand/vendor-product-name/vendor-product-device'),
1672    ]
1673    self.assertEqual('|'.join(post_fingerprints), metadata_dict['post-build'])
1674
1675    self.CheckMetadataEqual(metadata_dict, metadata_proto)
1676
1677    pre_partition_states = metadata_proto.precondition.partition_state
1678    self.assertEqual(2, len(pre_partition_states))
1679    self.assertEqual('system', pre_partition_states[0].partition_name)
1680    self.assertEqual(['generic'], pre_partition_states[0].device)
1681    self.assertEqual(['generic/generic/generic{}'.format(source_suffix)],
1682                     pre_partition_states[0].build)
1683
1684    self.assertEqual('vendor', pre_partition_states[1].partition_name)
1685    self.assertEqual(['vendor-device-pro', 'vendor-device-std',
1686                      'vendor-product-device'], pre_partition_states[1].device)
1687    vendor_fingerprints = post_fingerprints
1688    self.assertEqual(vendor_fingerprints, pre_partition_states[1].build)
1689
1690    post_partition_states = metadata_proto.postcondition.partition_state
1691    self.assertEqual(2, len(post_partition_states))
1692    self.assertEqual('system', post_partition_states[0].partition_name)
1693    self.assertEqual(['generic'], post_partition_states[0].device)
1694    self.assertEqual([self.constructFingerprint('generic/generic/generic')],
1695                     post_partition_states[0].build)
1696
1697    self.assertEqual('vendor', post_partition_states[1].partition_name)
1698    self.assertEqual(['vendor-device-pro', 'vendor-device-std',
1699                      'vendor-product-device'], post_partition_states[1].device)
1700    self.assertEqual(vendor_fingerprints, post_partition_states[1].build)
1701