• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Copyright (C) 2018 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import copy
18import os
19import os.path
20import tempfile
21import zipfile
22
23import common
24import ota_metadata_pb2
25import test_utils
26from ota_utils import (
27    BuildLegacyOtaMetadata, CalculateRuntimeDevicesAndFingerprints,
28    ConstructOtaApexInfo, FinalizeMetadata, GetPackageMetadata, PropertyFiles, AbOtaPropertyFiles, PayloadGenerator, StreamingPropertyFiles)
29from ota_from_target_files import (
30    _LoadOemDicts,
31    GetTargetFilesZipForCustomImagesUpdates,
32    GetTargetFilesZipForPartialUpdates,
33    GetTargetFilesZipForSecondaryImages,
34    GetTargetFilesZipWithoutPostinstallConfig,
35    POSTINSTALL_CONFIG, AB_PARTITIONS)
36from apex_utils import GetApexInfoFromTargetFiles
37from test_utils import PropertyFilesTestCase
38from common import OPTIONS
39from payload_signer import PayloadSigner
40
41
42def construct_target_files(secondary=False, compressedApex=False):
43  """Returns a target-files.zip file for generating OTA packages."""
44  target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
45  with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
46    # META/update_engine_config.txt
47    target_files_zip.writestr(
48        'META/update_engine_config.txt',
49        "PAYLOAD_MAJOR_VERSION=2\nPAYLOAD_MINOR_VERSION=4\n")
50
51    # META/postinstall_config.txt
52    target_files_zip.writestr(
53        POSTINSTALL_CONFIG,
54        '\n'.join([
55            "RUN_POSTINSTALL_system=true",
56            "POSTINSTALL_PATH_system=system/bin/otapreopt_script",
57            "FILESYSTEM_TYPE_system=ext4",
58            "POSTINSTALL_OPTIONAL_system=true",
59        ]))
60
61    ab_partitions = [
62        ('IMAGES', 'boot'),
63        ('IMAGES', 'system'),
64        ('IMAGES', 'vendor'),
65        ('RADIO', 'bootloader'),
66        ('RADIO', 'modem'),
67    ]
68    # META/ab_partitions.txt
69    target_files_zip.writestr(
70        'META/ab_partitions.txt',
71        '\n'.join([partition[1] for partition in ab_partitions]))
72
73    # Create fake images for each of them.
74    for path, partition in ab_partitions:
75      target_files_zip.writestr(
76          '{}/{}.img'.format(path, partition),
77          os.urandom(len(partition)))
78
79    # system_other shouldn't appear in META/ab_partitions.txt.
80    if secondary:
81      target_files_zip.writestr('IMAGES/system_other.img',
82                                os.urandom(len("system_other")))
83
84    if compressedApex:
85      apex_file_name = 'com.android.apex.compressed.v1.capex'
86      apex_file = os.path.join(test_utils.get_current_dir(), apex_file_name)
87      target_files_zip.write(apex_file, 'SYSTEM/apex/' + apex_file_name)
88
89  return target_files
90
91
92class LoadOemDictsTest(test_utils.ReleaseToolsTestCase):
93
94  def test_NoneDict(self):
95    self.assertIsNone(_LoadOemDicts(None))
96
97  def test_SingleDict(self):
98    dict_file = common.MakeTempFile()
99    with open(dict_file, 'w') as dict_fp:
100      dict_fp.write('abc=1\ndef=2\nxyz=foo\na.b.c=bar\n')
101
102    oem_dicts = _LoadOemDicts([dict_file])
103    self.assertEqual(1, len(oem_dicts))
104    self.assertEqual('foo', oem_dicts[0]['xyz'])
105    self.assertEqual('bar', oem_dicts[0]['a.b.c'])
106
107  def test_MultipleDicts(self):
108    oem_source = []
109    for i in range(3):
110      dict_file = common.MakeTempFile()
111      with open(dict_file, 'w') as dict_fp:
112        dict_fp.write(
113            'ro.build.index={}\ndef=2\nxyz=foo\na.b.c=bar\n'.format(i))
114      oem_source.append(dict_file)
115
116    oem_dicts = _LoadOemDicts(oem_source)
117    self.assertEqual(3, len(oem_dicts))
118    for i, oem_dict in enumerate(oem_dicts):
119      self.assertEqual('2', oem_dict['def'])
120      self.assertEqual('foo', oem_dict['xyz'])
121      self.assertEqual('bar', oem_dict['a.b.c'])
122      self.assertEqual('{}'.format(i), oem_dict['ro.build.index'])
123
124
125class OtaFromTargetFilesTest(test_utils.ReleaseToolsTestCase):
126  TEST_TARGET_INFO_DICT = {
127      'build.prop': common.PartitionBuildProps.FromDictionary(
128          'system', {
129              'ro.product.device': 'product-device',
130              'ro.build.fingerprint': 'build-fingerprint-target',
131              'ro.build.version.incremental': 'build-version-incremental-target',
132              'ro.build.version.sdk': '27',
133              'ro.build.version.security_patch': '2017-12-01',
134              'ro.build.date.utc': '1500000000'}
135      )
136  }
137
138  TEST_SOURCE_INFO_DICT = {
139      'build.prop': common.PartitionBuildProps.FromDictionary(
140          'system', {
141              'ro.product.device': 'product-device',
142              'ro.build.fingerprint': 'build-fingerprint-source',
143              'ro.build.version.incremental': 'build-version-incremental-source',
144              'ro.build.version.sdk': '25',
145              'ro.build.version.security_patch': '2016-12-01',
146              'ro.build.date.utc': '1400000000'}
147      )
148  }
149
150  TEST_INFO_DICT_USES_OEM_PROPS = {
151      'build.prop': common.PartitionBuildProps.FromDictionary(
152          'system', {
153              'ro.product.name': 'product-name',
154              'ro.build.thumbprint': 'build-thumbprint',
155              'ro.build.bar': 'build-bar'}
156      ),
157      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
158          'vendor', {
159              'ro.vendor.build.fingerprint': 'vendor-build-fingerprint'}
160      ),
161      'property1': 'value1',
162      'property2': 4096,
163      'oem_fingerprint_properties': 'ro.product.device ro.product.brand',
164  }
165
166  def setUp(self):
167    self.testdata_dir = test_utils.get_testdata_dir()
168    self.assertTrue(os.path.exists(self.testdata_dir))
169
170    # Reset the global options as in ota_from_target_files.py.
171    common.OPTIONS.incremental_source = None
172    common.OPTIONS.downgrade = False
173    common.OPTIONS.retrofit_dynamic_partitions = False
174    common.OPTIONS.timestamp = False
175    common.OPTIONS.wipe_user_data = False
176    common.OPTIONS.no_signing = False
177    common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
178    common.OPTIONS.key_passwords = {
179        common.OPTIONS.package_key: None,
180    }
181
182    common.OPTIONS.search_path = test_utils.get_search_path()
183
184  @staticmethod
185  def GetLegacyOtaMetadata(target_info, source_info=None):
186    metadata_proto = GetPackageMetadata(target_info, source_info)
187    return BuildLegacyOtaMetadata(metadata_proto)
188
189  def test_GetPackageMetadata_abOta_full(self):
190    target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
191    target_info_dict['ab_update'] = 'true'
192    target_info_dict['ab_partitions'] = []
193    target_info = common.BuildInfo(target_info_dict, None)
194    metadata = self.GetLegacyOtaMetadata(target_info)
195    self.assertDictEqual(
196        {
197            'ota-type': 'AB',
198            'ota-required-cache': '0',
199            'post-build': 'build-fingerprint-target',
200            'post-build-incremental': 'build-version-incremental-target',
201            'post-sdk-level': '27',
202            'post-security-patch-level': '2017-12-01',
203            'post-timestamp': '1500000000',
204            'pre-device': 'product-device',
205        },
206        metadata)
207
208  def test_GetPackageMetadata_abOta_incremental(self):
209    target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
210    target_info_dict['ab_update'] = 'true'
211    target_info_dict['ab_partitions'] = []
212    target_info = common.BuildInfo(target_info_dict, None)
213    source_info = common.BuildInfo(self.TEST_SOURCE_INFO_DICT, None)
214    common.OPTIONS.incremental_source = ''
215    metadata = self.GetLegacyOtaMetadata(target_info, source_info)
216    self.assertDictEqual(
217        {
218            'ota-type': 'AB',
219            'ota-required-cache': '0',
220            'post-build': 'build-fingerprint-target',
221            'post-build-incremental': 'build-version-incremental-target',
222            'post-sdk-level': '27',
223            'post-security-patch-level': '2017-12-01',
224            'post-timestamp': '1500000000',
225            'pre-device': 'product-device',
226            'pre-build': 'build-fingerprint-source',
227            'pre-build-incremental': 'build-version-incremental-source',
228        },
229        metadata)
230
231  def test_GetPackageMetadata_nonAbOta_full(self):
232    target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
233    metadata = self.GetLegacyOtaMetadata(target_info)
234    self.assertDictEqual(
235        {
236            'ota-type': 'BLOCK',
237            'ota-required-cache': '0',
238            'post-build': 'build-fingerprint-target',
239            'post-build-incremental': 'build-version-incremental-target',
240            'post-sdk-level': '27',
241            'post-security-patch-level': '2017-12-01',
242            'post-timestamp': '1500000000',
243            'pre-device': 'product-device',
244        },
245        metadata)
246
247  def test_GetPackageMetadata_nonAbOta_incremental(self):
248    target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
249    source_info = common.BuildInfo(self.TEST_SOURCE_INFO_DICT, None)
250    common.OPTIONS.incremental_source = ''
251    metadata = self.GetLegacyOtaMetadata(target_info, source_info)
252    self.assertDictEqual(
253        {
254            'ota-type': 'BLOCK',
255            'ota-required-cache': '0',
256            'post-build': 'build-fingerprint-target',
257            'post-build-incremental': 'build-version-incremental-target',
258            'post-sdk-level': '27',
259            'post-security-patch-level': '2017-12-01',
260            'post-timestamp': '1500000000',
261            'pre-device': 'product-device',
262            'pre-build': 'build-fingerprint-source',
263            'pre-build-incremental': 'build-version-incremental-source',
264        },
265        metadata)
266
267  def test_GetPackageMetadata_wipe(self):
268    target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
269    common.OPTIONS.wipe_user_data = True
270    metadata = self.GetLegacyOtaMetadata(target_info)
271    self.assertDictEqual(
272        {
273            'ota-type': 'BLOCK',
274            'ota-required-cache': '0',
275            'ota-wipe': 'yes',
276            'post-build': 'build-fingerprint-target',
277            'post-build-incremental': 'build-version-incremental-target',
278            'post-sdk-level': '27',
279            'post-security-patch-level': '2017-12-01',
280            'post-timestamp': '1500000000',
281            'pre-device': 'product-device',
282        },
283        metadata)
284
285  @test_utils.SkipIfExternalToolsUnavailable()
286  def test_GetApexInfoFromTargetFiles(self):
287    target_files = construct_target_files(compressedApex=True)
288    apex_infos = GetApexInfoFromTargetFiles(target_files, 'system')
289    self.assertEqual(len(apex_infos), 1)
290    self.assertEqual(apex_infos[0].package_name, "com.android.apex.compressed")
291    self.assertEqual(apex_infos[0].version, 1)
292    self.assertEqual(apex_infos[0].is_compressed, True)
293    # Compare the decompressed APEX size with the original uncompressed APEX
294    original_apex_name = 'com.android.apex.compressed.v1_original.apex'
295    original_apex_filepath = os.path.join(
296        test_utils.get_current_dir(), original_apex_name)
297    uncompressed_apex_size = os.path.getsize(original_apex_filepath)
298    self.assertEqual(apex_infos[0].decompressed_size, uncompressed_apex_size)
299
300  @staticmethod
301  def construct_tf_with_apex_info(infos):
302    apex_metadata_proto = ota_metadata_pb2.ApexMetadata()
303    apex_metadata_proto.apex_info.extend(infos)
304
305    output = common.MakeTempFile(suffix='.zip')
306    with zipfile.ZipFile(output, 'w') as zfp:
307      common.ZipWriteStr(zfp, "META/apex_info.pb",
308                         apex_metadata_proto.SerializeToString())
309    return output
310
311  def test_ConstructOtaApexInfo_incremental_package(self):
312    infos = [ota_metadata_pb2.ApexInfo(package_name='com.android.apex.1',
313                                       version=1000, is_compressed=False),
314             ota_metadata_pb2.ApexInfo(package_name='com.android.apex.2',
315                                       version=2000, is_compressed=True)]
316    target_file = self.construct_tf_with_apex_info(infos)
317
318    with zipfile.ZipFile(target_file) as target_zip:
319      info_bytes = ConstructOtaApexInfo(target_zip, source_file=target_file)
320    apex_metadata_proto = ota_metadata_pb2.ApexMetadata()
321    apex_metadata_proto.ParseFromString(info_bytes)
322
323    info_list = apex_metadata_proto.apex_info
324    self.assertEqual(2, len(info_list))
325    self.assertEqual('com.android.apex.1', info_list[0].package_name)
326    self.assertEqual(1000, info_list[0].version)
327    self.assertEqual(1000, info_list[0].source_version)
328
329  def test_GetPackageMetadata_retrofitDynamicPartitions(self):
330    target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
331    common.OPTIONS.retrofit_dynamic_partitions = True
332    metadata = self.GetLegacyOtaMetadata(target_info)
333    self.assertDictEqual(
334        {
335            'ota-retrofit-dynamic-partitions': 'yes',
336            'ota-type': 'BLOCK',
337            'ota-required-cache': '0',
338            'post-build': 'build-fingerprint-target',
339            'post-build-incremental': 'build-version-incremental-target',
340            'post-sdk-level': '27',
341            'post-security-patch-level': '2017-12-01',
342            'post-timestamp': '1500000000',
343            'pre-device': 'product-device',
344        },
345        metadata)
346
347  @staticmethod
348  def _test_GetPackageMetadata_swapBuildTimestamps(target_info, source_info):
349    (target_info['build.prop'].build_props['ro.build.date.utc'],
350     source_info['build.prop'].build_props['ro.build.date.utc']) = (
351         source_info['build.prop'].build_props['ro.build.date.utc'],
352         target_info['build.prop'].build_props['ro.build.date.utc'])
353
354  def test_GetPackageMetadata_unintentionalDowngradeDetected(self):
355    target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
356    source_info_dict = copy.deepcopy(self.TEST_SOURCE_INFO_DICT)
357    self._test_GetPackageMetadata_swapBuildTimestamps(
358        target_info_dict, source_info_dict)
359
360    target_info = common.BuildInfo(target_info_dict, None)
361    source_info = common.BuildInfo(source_info_dict, None)
362    common.OPTIONS.incremental_source = ''
363    self.assertRaises(RuntimeError, self.GetLegacyOtaMetadata, target_info,
364                      source_info)
365
366  def test_GetPackageMetadata_downgrade(self):
367    target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
368    source_info_dict = copy.deepcopy(self.TEST_SOURCE_INFO_DICT)
369    self._test_GetPackageMetadata_swapBuildTimestamps(
370        target_info_dict, source_info_dict)
371
372    target_info = common.BuildInfo(target_info_dict, None)
373    source_info = common.BuildInfo(source_info_dict, None)
374    common.OPTIONS.incremental_source = ''
375    common.OPTIONS.downgrade = True
376    common.OPTIONS.wipe_user_data = True
377    common.OPTIONS.spl_downgrade = True
378    metadata = self.GetLegacyOtaMetadata(target_info, source_info)
379    # Reset spl_downgrade so other tests are unaffected
380    common.OPTIONS.spl_downgrade = False
381
382    self.assertDictEqual(
383        {
384            'ota-downgrade': 'yes',
385            'ota-type': 'BLOCK',
386            'ota-required-cache': '0',
387            'ota-wipe': 'yes',
388            'post-build': 'build-fingerprint-target',
389            'post-build-incremental': 'build-version-incremental-target',
390            'post-sdk-level': '27',
391            'post-security-patch-level': '2017-12-01',
392            'post-timestamp': '1400000000',
393            'pre-device': 'product-device',
394            'pre-build': 'build-fingerprint-source',
395            'pre-build-incremental': 'build-version-incremental-source',
396            'spl-downgrade': 'yes',
397        },
398        metadata)
399
400  @test_utils.SkipIfExternalToolsUnavailable()
401  def test_GetTargetFilesZipForSecondaryImages(self):
402    input_file = construct_target_files(secondary=True)
403    target_file = GetTargetFilesZipForSecondaryImages(input_file)
404
405    with zipfile.ZipFile(target_file) as verify_zip:
406      namelist = verify_zip.namelist()
407      ab_partitions = verify_zip.read('META/ab_partitions.txt').decode()
408
409    self.assertIn('META/ab_partitions.txt', namelist)
410    self.assertIn('IMAGES/system.img', namelist)
411    self.assertIn('RADIO/bootloader.img', namelist)
412    self.assertIn(POSTINSTALL_CONFIG, namelist)
413
414    self.assertNotIn('IMAGES/boot.img', namelist)
415    self.assertNotIn('IMAGES/system_other.img', namelist)
416    self.assertNotIn('IMAGES/system.map', namelist)
417    self.assertNotIn('RADIO/modem.img', namelist)
418
419    expected_ab_partitions = ['system', 'bootloader']
420    self.assertEqual('\n'.join(expected_ab_partitions), ab_partitions)
421
422  @test_utils.SkipIfExternalToolsUnavailable()
423  def test_GetTargetFilesZipForSecondaryImages_skipPostinstall(self):
424    input_file = construct_target_files(secondary=True)
425    target_file = GetTargetFilesZipForSecondaryImages(
426        input_file, skip_postinstall=True)
427
428    with zipfile.ZipFile(target_file) as verify_zip:
429      namelist = verify_zip.namelist()
430
431    self.assertIn('META/ab_partitions.txt', namelist)
432    self.assertIn('IMAGES/system.img', namelist)
433    self.assertIn('RADIO/bootloader.img', namelist)
434
435    self.assertNotIn('IMAGES/boot.img', namelist)
436    self.assertNotIn('IMAGES/system_other.img', namelist)
437    self.assertNotIn('IMAGES/system.map', namelist)
438    self.assertNotIn('RADIO/modem.img', namelist)
439    self.assertNotIn(POSTINSTALL_CONFIG, namelist)
440
441  @test_utils.SkipIfExternalToolsUnavailable()
442  def test_GetTargetFilesZipForSecondaryImages_withoutRadioImages(self):
443    input_file = construct_target_files(secondary=True)
444    common.ZipDelete(input_file, 'RADIO/bootloader.img')
445    common.ZipDelete(input_file, 'RADIO/modem.img')
446    target_file = GetTargetFilesZipForSecondaryImages(input_file)
447
448    with zipfile.ZipFile(target_file) as verify_zip:
449      namelist = verify_zip.namelist()
450
451    self.assertIn('META/ab_partitions.txt', namelist)
452    self.assertIn('IMAGES/system.img', namelist)
453    self.assertIn(POSTINSTALL_CONFIG, namelist)
454
455    self.assertNotIn('IMAGES/boot.img', namelist)
456    self.assertNotIn('IMAGES/system_other.img', namelist)
457    self.assertNotIn('IMAGES/system.map', namelist)
458    self.assertNotIn('RADIO/bootloader.img', namelist)
459    self.assertNotIn('RADIO/modem.img', namelist)
460
461  @test_utils.SkipIfExternalToolsUnavailable()
462  def test_GetTargetFilesZipForSecondaryImages_dynamicPartitions(self):
463    input_file = construct_target_files(secondary=True)
464    misc_info = '\n'.join([
465        'use_dynamic_partition_size=true',
466        'use_dynamic_partitions=true',
467        'dynamic_partition_list=system vendor product',
468        'super_partition_groups=google_dynamic_partitions',
469        'super_google_dynamic_partitions_group_size=4873781248',
470        'super_google_dynamic_partitions_partition_list=system vendor product',
471    ])
472    dynamic_partitions_info = '\n'.join([
473        'super_partition_groups=google_dynamic_partitions',
474        'super_google_dynamic_partitions_group_size=4873781248',
475        'super_google_dynamic_partitions_partition_list=system vendor product',
476    ])
477
478    with zipfile.ZipFile(input_file, 'a', allowZip64=True) as append_zip:
479      common.ZipWriteStr(append_zip, 'META/misc_info.txt', misc_info)
480      common.ZipWriteStr(append_zip, 'META/dynamic_partitions_info.txt',
481                         dynamic_partitions_info)
482
483    target_file = GetTargetFilesZipForSecondaryImages(input_file)
484
485    with zipfile.ZipFile(target_file) as verify_zip:
486      namelist = verify_zip.namelist()
487      updated_misc_info = verify_zip.read('META/misc_info.txt').decode()
488      updated_dynamic_partitions_info = verify_zip.read(
489          'META/dynamic_partitions_info.txt').decode()
490
491    self.assertIn('META/ab_partitions.txt', namelist)
492    self.assertIn('IMAGES/system.img', namelist)
493    self.assertIn(POSTINSTALL_CONFIG, namelist)
494    self.assertIn('META/misc_info.txt', namelist)
495    self.assertIn('META/dynamic_partitions_info.txt', namelist)
496
497    self.assertNotIn('IMAGES/boot.img', namelist)
498    self.assertNotIn('IMAGES/system_other.img', namelist)
499    self.assertNotIn('IMAGES/system.map', namelist)
500
501    # Check the vendor & product are removed from the partitions list.
502    expected_misc_info = misc_info.replace('system vendor product',
503                                           'system')
504    expected_dynamic_partitions_info = dynamic_partitions_info.replace(
505        'system vendor product', 'system')
506    self.assertEqual(expected_misc_info, updated_misc_info)
507    self.assertEqual(expected_dynamic_partitions_info,
508                     updated_dynamic_partitions_info)
509
510  @test_utils.SkipIfExternalToolsUnavailable()
511  def test_GetTargetFilesZipForPartialUpdates_singlePartition(self):
512    input_file = construct_target_files()
513    with zipfile.ZipFile(input_file, 'a', allowZip64=True) as append_zip:
514      common.ZipWriteStr(append_zip, 'IMAGES/system.map', 'fake map')
515
516    target_file = GetTargetFilesZipForPartialUpdates(input_file, ['system'])
517    with zipfile.ZipFile(target_file) as verify_zip:
518      namelist = verify_zip.namelist()
519      ab_partitions = verify_zip.read('META/ab_partitions.txt').decode()
520
521    self.assertIn('META/ab_partitions.txt', namelist)
522    self.assertIn('META/update_engine_config.txt', namelist)
523    self.assertIn('IMAGES/system.img', namelist)
524    self.assertIn('IMAGES/system.map', namelist)
525
526    self.assertNotIn('IMAGES/boot.img', namelist)
527    self.assertNotIn('IMAGES/system_other.img', namelist)
528    self.assertNotIn('RADIO/bootloader.img', namelist)
529    self.assertNotIn('RADIO/modem.img', namelist)
530
531    self.assertEqual('system', ab_partitions)
532
533  @test_utils.SkipIfExternalToolsUnavailable()
534  def test_GetTargetFilesZipForPartialUpdates_unrecognizedPartition(self):
535    input_file = construct_target_files()
536    self.assertRaises(ValueError, GetTargetFilesZipForPartialUpdates,
537                      input_file, ['product'])
538
539  @test_utils.SkipIfExternalToolsUnavailable()
540  def test_GetTargetFilesZipForPartialUpdates_dynamicPartitions(self):
541    input_file = construct_target_files(secondary=True)
542    misc_info = '\n'.join([
543        'use_dynamic_partition_size=true',
544        'use_dynamic_partitions=true',
545        'dynamic_partition_list=system vendor product',
546        'super_partition_groups=google_dynamic_partitions',
547        'super_google_dynamic_partitions_group_size=4873781248',
548        'super_google_dynamic_partitions_partition_list=system vendor product',
549    ])
550    dynamic_partitions_info = '\n'.join([
551        'super_partition_groups=google_dynamic_partitions',
552        'super_google_dynamic_partitions_group_size=4873781248',
553        'super_google_dynamic_partitions_partition_list=system vendor product',
554    ])
555
556    with zipfile.ZipFile(input_file, 'a', allowZip64=True) as append_zip:
557      common.ZipWriteStr(append_zip, 'META/misc_info.txt', misc_info)
558      common.ZipWriteStr(append_zip, 'META/dynamic_partitions_info.txt',
559                         dynamic_partitions_info)
560
561    target_file = GetTargetFilesZipForPartialUpdates(input_file,
562                                                     ['boot', 'system'])
563    with zipfile.ZipFile(target_file) as verify_zip:
564      namelist = verify_zip.namelist()
565      ab_partitions = verify_zip.read('META/ab_partitions.txt').decode()
566      updated_misc_info = verify_zip.read('META/misc_info.txt').decode()
567      updated_dynamic_partitions_info = verify_zip.read(
568          'META/dynamic_partitions_info.txt').decode()
569
570    self.assertIn('META/ab_partitions.txt', namelist)
571    self.assertIn('IMAGES/boot.img', namelist)
572    self.assertIn('IMAGES/system.img', namelist)
573    self.assertIn('META/misc_info.txt', namelist)
574    self.assertIn('META/dynamic_partitions_info.txt', namelist)
575
576    self.assertNotIn('IMAGES/system_other.img', namelist)
577    self.assertNotIn('RADIO/bootloader.img', namelist)
578    self.assertNotIn('RADIO/modem.img', namelist)
579
580    # Check the vendor & product are removed from the partitions list.
581    expected_misc_info = misc_info.replace('system vendor product',
582                                           'system')
583    expected_dynamic_partitions_info = dynamic_partitions_info.replace(
584        'system vendor product', 'system')
585    self.assertEqual(expected_misc_info, updated_misc_info)
586    self.assertEqual(expected_dynamic_partitions_info,
587                     updated_dynamic_partitions_info)
588    self.assertEqual('boot\nsystem', ab_partitions)
589
590  @test_utils.SkipIfExternalToolsUnavailable()
591  def test_GetTargetFilesZipWithoutPostinstallConfig(self):
592    input_file = construct_target_files()
593    target_file = GetTargetFilesZipWithoutPostinstallConfig(input_file)
594    with zipfile.ZipFile(target_file) as verify_zip:
595      self.assertNotIn(POSTINSTALL_CONFIG, verify_zip.namelist())
596
597  @test_utils.SkipIfExternalToolsUnavailable()
598  def test_GetTargetFilesZipWithoutPostinstallConfig_missingEntry(self):
599    input_file = construct_target_files()
600    common.ZipDelete(input_file, POSTINSTALL_CONFIG)
601    target_file = GetTargetFilesZipWithoutPostinstallConfig(input_file)
602    with zipfile.ZipFile(target_file) as verify_zip:
603      self.assertNotIn(POSTINSTALL_CONFIG, verify_zip.namelist())
604
605  @test_utils.SkipIfExternalToolsUnavailable()
606  def test_GetTargetFilesZipForCustomImagesUpdates_oemDefaultImage(self):
607    input_file = construct_target_files()
608    with zipfile.ZipFile(input_file, 'a', allowZip64=True) as append_zip:
609      common.ZipWriteStr(append_zip, 'IMAGES/oem.img', 'oem')
610      common.ZipWriteStr(append_zip, 'IMAGES/oem_test.img', 'oem_test')
611
612    target_file = GetTargetFilesZipForCustomImagesUpdates(
613        input_file, {'oem': 'oem.img'})
614
615    with zipfile.ZipFile(target_file) as verify_zip:
616      namelist = verify_zip.namelist()
617      ab_partitions = verify_zip.read('META/ab_partitions.txt').decode()
618      oem_image = verify_zip.read('IMAGES/oem.img').decode()
619
620    self.assertIn('META/ab_partitions.txt', namelist)
621    self.assertEqual('boot\nsystem\nvendor\nbootloader\nmodem', ab_partitions)
622    self.assertIn('IMAGES/oem.img', namelist)
623    self.assertEqual('oem', oem_image)
624
625  @test_utils.SkipIfExternalToolsUnavailable()
626  def test_GetTargetFilesZipForCustomImagesUpdates_oemTestImage(self):
627    input_file = construct_target_files()
628    with zipfile.ZipFile(input_file, 'a', allowZip64=True) as append_zip:
629      common.ZipWriteStr(append_zip, 'IMAGES/oem.img', 'oem')
630      common.ZipWriteStr(append_zip, 'IMAGES/oem_test.img', 'oem_test')
631
632    target_file = GetTargetFilesZipForCustomImagesUpdates(
633        input_file, {'oem': 'oem_test.img'})
634
635    with zipfile.ZipFile(target_file) as verify_zip:
636      namelist = verify_zip.namelist()
637      ab_partitions = verify_zip.read('META/ab_partitions.txt').decode()
638      oem_image = verify_zip.read('IMAGES/oem.img').decode()
639
640    self.assertIn('META/ab_partitions.txt', namelist)
641    self.assertEqual('boot\nsystem\nvendor\nbootloader\nmodem', ab_partitions)
642    self.assertIn('IMAGES/oem.img', namelist)
643    self.assertEqual('oem_test', oem_image)
644
645  def _test_FinalizeMetadata(self, large_entry=False):
646    entries = [
647        'required-entry1',
648        'required-entry2',
649    ]
650    zip_file = PropertyFilesTest.construct_zip_package(entries)
651    # Add a large entry of 1 GiB if requested.
652    if large_entry:
653      with zipfile.ZipFile(zip_file, 'a', allowZip64=True) as zip_fp:
654        zip_fp.writestr(
655            # Using 'zoo' so that the entry stays behind others after signing.
656            'zoo',
657            'A' * 1024 * 1024 * 1024,
658            zipfile.ZIP_STORED)
659
660    metadata = ota_metadata_pb2.OtaMetadata()
661    output_file = common.MakeTempFile(suffix='.zip')
662    needed_property_files = (
663        TestPropertyFiles(),
664    )
665    FinalizeMetadata(metadata, zip_file, output_file, needed_property_files)
666    self.assertIn('ota-test-property-files', metadata.property_files)
667
668  @test_utils.SkipIfExternalToolsUnavailable()
669  def test_FinalizeMetadata(self):
670    self._test_FinalizeMetadata()
671
672  @test_utils.SkipIfExternalToolsUnavailable()
673  def test_FinalizeMetadata_withNoSigning(self):
674    common.OPTIONS.no_signing = True
675    self._test_FinalizeMetadata()
676
677  @test_utils.SkipIfExternalToolsUnavailable()
678  def test_FinalizeMetadata_largeEntry(self):
679    self._test_FinalizeMetadata(large_entry=True)
680
681  @test_utils.SkipIfExternalToolsUnavailable()
682  def test_FinalizeMetadata_largeEntry_withNoSigning(self):
683    common.OPTIONS.no_signing = True
684    self._test_FinalizeMetadata(large_entry=True)
685
686  @test_utils.SkipIfExternalToolsUnavailable()
687  def test_FinalizeMetadata_insufficientSpace(self):
688    entries = [
689        'required-entry1',
690        'required-entry2',
691        'optional-entry1',
692        'optional-entry2',
693    ]
694    zip_file = PropertyFilesTest.construct_zip_package(entries)
695    with zipfile.ZipFile(zip_file, 'a', allowZip64=True) as zip_fp:
696      zip_fp.writestr(
697          # 'foo-entry1' will appear ahead of all other entries (in alphabetical
698          # order) after the signing, which will in turn trigger the
699          # InsufficientSpaceException and an automatic retry.
700          'foo-entry1',
701          'A' * 1024 * 1024,
702          zipfile.ZIP_STORED)
703
704    metadata = ota_metadata_pb2.OtaMetadata()
705    needed_property_files = (
706        TestPropertyFiles(),
707    )
708    output_file = common.MakeTempFile(suffix='.zip')
709    FinalizeMetadata(metadata, zip_file, output_file, needed_property_files)
710    self.assertIn('ota-test-property-files', metadata.property_files)
711
712
713class TestPropertyFiles(PropertyFiles):
714  """A class that extends PropertyFiles for testing purpose."""
715
716  def __init__(self):
717    super(TestPropertyFiles, self).__init__()
718    self.name = 'ota-test-property-files'
719    self.required = (
720        'required-entry1',
721        'required-entry2',
722    )
723    self.optional = (
724        'optional-entry1',
725        'optional-entry2',
726    )
727
728
729class PropertyFilesTest(PropertyFilesTestCase):
730
731  @test_utils.SkipIfExternalToolsUnavailable()
732  def test_Compute(self):
733    entries = (
734        'required-entry1',
735        'required-entry2',
736    )
737    zip_file = self.construct_zip_package(entries)
738    property_files = TestPropertyFiles()
739    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
740      property_files_string = property_files.Compute(zip_fp)
741
742    tokens = self._parse_property_files_string(property_files_string)
743    self.assertEqual(4, len(tokens))
744    self._verify_entries(zip_file, tokens, entries)
745
746  def test_Compute_withOptionalEntries(self):
747    entries = (
748        'required-entry1',
749        'required-entry2',
750        'optional-entry1',
751        'optional-entry2',
752    )
753    zip_file = self.construct_zip_package(entries)
754    property_files = TestPropertyFiles()
755    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
756      property_files_string = property_files.Compute(zip_fp)
757
758    tokens = self._parse_property_files_string(property_files_string)
759    self.assertEqual(6, len(tokens))
760    self._verify_entries(zip_file, tokens, entries)
761
762  def test_Compute_missingRequiredEntry(self):
763    entries = (
764        'required-entry2',
765    )
766    zip_file = self.construct_zip_package(entries)
767    property_files = TestPropertyFiles()
768    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
769      self.assertRaises(KeyError, property_files.Compute, zip_fp)
770
771  @test_utils.SkipIfExternalToolsUnavailable()
772  def test_Finalize(self):
773    entries = [
774        'required-entry1',
775        'required-entry2',
776        'META-INF/com/android/metadata',
777        'META-INF/com/android/metadata.pb',
778    ]
779    zip_file = self.construct_zip_package(entries)
780    property_files = TestPropertyFiles()
781    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
782      raw_metadata = property_files.GetPropertyFilesString(
783          zip_fp, reserve_space=False)
784      streaming_metadata = property_files.Finalize(zip_fp, len(raw_metadata))
785    tokens = self._parse_property_files_string(streaming_metadata)
786
787    self.assertEqual(4, len(tokens))
788    # 'META-INF/com/android/metadata' will be key'd as 'metadata' in the
789    # streaming metadata.
790    entries[2] = 'metadata'
791    entries[3] = 'metadata.pb'
792    self._verify_entries(zip_file, tokens, entries)
793
794  @test_utils.SkipIfExternalToolsUnavailable()
795  def test_Finalize_assertReservedLength(self):
796    entries = (
797        'required-entry1',
798        'required-entry2',
799        'optional-entry1',
800        'optional-entry2',
801        'META-INF/com/android/metadata',
802        'META-INF/com/android/metadata.pb',
803    )
804    zip_file = self.construct_zip_package(entries)
805    property_files = TestPropertyFiles()
806    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
807      # First get the raw metadata string (i.e. without padding space).
808      raw_metadata = property_files.GetPropertyFilesString(
809          zip_fp, reserve_space=False)
810      raw_length = len(raw_metadata)
811
812      # Now pass in the exact expected length.
813      streaming_metadata = property_files.Finalize(zip_fp, raw_length)
814      self.assertEqual(raw_length, len(streaming_metadata))
815
816      # Or pass in insufficient length.
817      self.assertRaises(
818          PropertyFiles.InsufficientSpaceException,
819          property_files.Finalize,
820          zip_fp,
821          raw_length - 1)
822
823      # Or pass in a much larger size.
824      streaming_metadata = property_files.Finalize(
825          zip_fp,
826          raw_length + 20)
827      self.assertEqual(raw_length + 20, len(streaming_metadata))
828      self.assertEqual(' ' * 20, streaming_metadata[raw_length:])
829
830  def test_Verify(self):
831    entries = (
832        'required-entry1',
833        'required-entry2',
834        'optional-entry1',
835        'optional-entry2',
836        'META-INF/com/android/metadata',
837        'META-INF/com/android/metadata.pb',
838    )
839    zip_file = self.construct_zip_package(entries)
840    property_files = TestPropertyFiles()
841    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
842      # First get the raw metadata string (i.e. without padding space).
843      raw_metadata = property_files.GetPropertyFilesString(
844          zip_fp, reserve_space=False)
845
846      # Should pass the test if verification passes.
847      property_files.Verify(zip_fp, raw_metadata)
848
849      # Or raise on verification failure.
850      self.assertRaises(
851          AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x')
852
853
854class StreamingPropertyFilesTest(PropertyFilesTestCase):
855  """Additional validity checks specialized for StreamingPropertyFiles."""
856
857  def test_init(self):
858    property_files = StreamingPropertyFiles()
859    self.assertEqual('ota-streaming-property-files', property_files.name)
860    self.assertEqual(
861        (
862            'payload.bin',
863            'payload_properties.txt',
864        ),
865        property_files.required)
866    self.assertEqual(
867        (
868            'apex_info.pb',
869            'care_map.pb',
870            'care_map.txt',
871            'compatibility.zip',
872        ),
873        property_files.optional)
874
875  def test_Compute(self):
876    entries = (
877        'payload.bin',
878        'payload_properties.txt',
879        'care_map.txt',
880        'compatibility.zip',
881    )
882    zip_file = self.construct_zip_package(entries)
883    property_files = StreamingPropertyFiles()
884    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
885      property_files_string = property_files.Compute(zip_fp)
886
887    tokens = self._parse_property_files_string(property_files_string)
888    self.assertEqual(6, len(tokens))
889    self._verify_entries(zip_file, tokens, entries)
890
891  def test_Finalize(self):
892    entries = [
893        'payload.bin',
894        'payload_properties.txt',
895        'care_map.txt',
896        'compatibility.zip',
897        'META-INF/com/android/metadata',
898        'META-INF/com/android/metadata.pb',
899    ]
900    zip_file = self.construct_zip_package(entries)
901    property_files = StreamingPropertyFiles()
902    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
903      raw_metadata = property_files.GetPropertyFilesString(
904          zip_fp, reserve_space=False)
905      streaming_metadata = property_files.Finalize(zip_fp, len(raw_metadata))
906    tokens = self._parse_property_files_string(streaming_metadata)
907
908    self.assertEqual(6, len(tokens))
909    # 'META-INF/com/android/metadata' will be key'd as 'metadata' in the
910    # streaming metadata.
911    entries[4] = 'metadata'
912    entries[5] = 'metadata.pb'
913    self._verify_entries(zip_file, tokens, entries)
914
915  def test_Verify(self):
916    entries = (
917        'payload.bin',
918        'payload_properties.txt',
919        'care_map.txt',
920        'compatibility.zip',
921        'META-INF/com/android/metadata',
922        'META-INF/com/android/metadata.pb',
923    )
924    zip_file = self.construct_zip_package(entries)
925    property_files = StreamingPropertyFiles()
926    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
927      # First get the raw metadata string (i.e. without padding space).
928      raw_metadata = property_files.GetPropertyFilesString(
929          zip_fp, reserve_space=False)
930
931      # Should pass the test if verification passes.
932      property_files.Verify(zip_fp, raw_metadata)
933
934      # Or raise on verification failure.
935      self.assertRaises(
936          AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x')
937
938
939class AbOtaPropertyFilesTest(PropertyFilesTestCase):
940  """Additional validity checks specialized for AbOtaPropertyFiles."""
941
942  # The size for payload and metadata signature size.
943  SIGNATURE_SIZE = 256
944
945  def setUp(self):
946    self.testdata_dir = test_utils.get_testdata_dir()
947    self.assertTrue(os.path.exists(self.testdata_dir))
948
949    common.OPTIONS.wipe_user_data = False
950    common.OPTIONS.payload_signer = None
951    common.OPTIONS.payload_signer_args = None
952    common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
953    common.OPTIONS.key_passwords = {
954        common.OPTIONS.package_key: None,
955    }
956
957  def test_init(self):
958    property_files = AbOtaPropertyFiles()
959    self.assertEqual('ota-property-files', property_files.name)
960    self.assertEqual(
961        (
962            'payload.bin',
963            'payload_properties.txt',
964        ),
965        property_files.required)
966    self.assertEqual(
967        (
968            'apex_info.pb',
969            'care_map.pb',
970            'care_map.txt',
971            'compatibility.zip',
972        ),
973        property_files.optional)
974
975  @test_utils.SkipIfExternalToolsUnavailable()
976  def test_GetPayloadMetadataOffsetAndSize(self):
977    target_file = construct_target_files()
978    payload = PayloadGenerator()
979    payload.Generate(target_file)
980
981    payload_signer = PayloadSigner()
982    payload.Sign(payload_signer)
983
984    output_file = common.MakeTempFile(suffix='.zip')
985    with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
986      payload.WriteToZip(output_zip)
987
988    # Find out the payload metadata offset and size.
989    property_files = AbOtaPropertyFiles()
990    with zipfile.ZipFile(output_file) as input_zip:
991      # pylint: disable=protected-access
992      payload_offset, metadata_total = (
993          property_files._GetPayloadMetadataOffsetAndSize(input_zip))
994
995    # The signature proto has the following format (details in
996    #  /platform/system/update_engine/update_metadata.proto):
997    #  message Signature {
998    #    optional uint32 version = 1;
999    #    optional bytes data = 2;
1000    #    optional fixed32 unpadded_signature_size = 3;
1001    #  }
1002    #
1003    # According to the protobuf encoding, the tail of the signature message will
1004    # be [signature string(256 bytes) + encoding of the fixed32 number 256]. And
1005    # 256 is encoded as 'x1d\x00\x01\x00\x00':
1006    # [3 (field number) << 3 | 5 (type) + byte reverse of 0x100 (256)].
1007    # Details in (https://developers.google.com/protocol-buffers/docs/encoding)
1008    signature_tail_length = self.SIGNATURE_SIZE + 5
1009    self.assertGreater(metadata_total, signature_tail_length)
1010    with open(output_file, 'rb') as verify_fp:
1011      verify_fp.seek(payload_offset + metadata_total - signature_tail_length)
1012      metadata_signature_proto_tail = verify_fp.read(signature_tail_length)
1013
1014    self.assertEqual(b'\x1d\x00\x01\x00\x00',
1015                     metadata_signature_proto_tail[-5:])
1016    metadata_signature = metadata_signature_proto_tail[:-5]
1017
1018    # Now we extract the metadata hash via brillo_update_payload script, which
1019    # will serve as the oracle result.
1020    payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
1021    metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
1022    cmd = ['brillo_update_payload', 'hash',
1023           '--unsigned_payload', payload.payload_file,
1024           '--signature_size', str(self.SIGNATURE_SIZE),
1025           '--metadata_hash_file', metadata_sig_file,
1026           '--payload_hash_file', payload_sig_file]
1027    proc = common.Run(cmd)
1028    stdoutdata, _ = proc.communicate()
1029    self.assertEqual(
1030        0, proc.returncode,
1031        'Failed to run brillo_update_payload:\n{}'.format(stdoutdata))
1032
1033    signed_metadata_sig_file = payload_signer.SignHashFile(metadata_sig_file)
1034
1035    # Finally we can compare the two signatures.
1036    with open(signed_metadata_sig_file, 'rb') as verify_fp:
1037      self.assertEqual(verify_fp.read(), metadata_signature)
1038
1039  @staticmethod
1040  def construct_zip_package_withValidPayload(with_metadata=False):
1041    # Cannot use construct_zip_package() since we need a "valid" payload.bin.
1042    target_file = construct_target_files()
1043    payload = PayloadGenerator()
1044    payload.Generate(target_file)
1045
1046    payload_signer = PayloadSigner()
1047    payload.Sign(payload_signer)
1048
1049    zip_file = common.MakeTempFile(suffix='.zip')
1050    with zipfile.ZipFile(zip_file, 'w', allowZip64=True) as zip_fp:
1051      # 'payload.bin',
1052      payload.WriteToZip(zip_fp)
1053
1054      # Other entries.
1055      entries = ['care_map.txt', 'compatibility.zip']
1056
1057      # Put META-INF/com/android/metadata if needed.
1058      if with_metadata:
1059        entries.append('META-INF/com/android/metadata')
1060        entries.append('META-INF/com/android/metadata.pb')
1061
1062      for entry in entries:
1063        zip_fp.writestr(
1064            entry, entry.replace('.', '-').upper(), zipfile.ZIP_STORED)
1065
1066    return zip_file
1067
1068  @test_utils.SkipIfExternalToolsUnavailable()
1069  def test_Compute(self):
1070    zip_file = self.construct_zip_package_withValidPayload()
1071    property_files = AbOtaPropertyFiles()
1072    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
1073      property_files_string = property_files.Compute(zip_fp)
1074
1075    tokens = self._parse_property_files_string(property_files_string)
1076    # "7" indcludes the four entries above, two metadata entries, and one entry
1077    # for payload-metadata.bin.
1078    self.assertEqual(7, len(tokens))
1079    self._verify_entries(
1080        zip_file, tokens, ('care_map.txt', 'compatibility.zip'))
1081
1082  @test_utils.SkipIfExternalToolsUnavailable()
1083  def test_Finalize(self):
1084    zip_file = self.construct_zip_package_withValidPayload(with_metadata=True)
1085    property_files = AbOtaPropertyFiles()
1086    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
1087      raw_metadata = property_files.GetPropertyFilesString(
1088          zip_fp, reserve_space=False)
1089      property_files_string = property_files.Finalize(
1090          zip_fp, len(raw_metadata))
1091
1092    tokens = self._parse_property_files_string(property_files_string)
1093    # "7" includes the four entries above, two metadata entries, and one entry
1094    # for payload-metadata.bin.
1095    self.assertEqual(7, len(tokens))
1096    self._verify_entries(
1097        zip_file, tokens, ('care_map.txt', 'compatibility.zip'))
1098
1099  @test_utils.SkipIfExternalToolsUnavailable()
1100  def test_Verify(self):
1101    zip_file = self.construct_zip_package_withValidPayload(with_metadata=True)
1102    property_files = AbOtaPropertyFiles()
1103    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
1104      raw_metadata = property_files.GetPropertyFilesString(
1105          zip_fp, reserve_space=False)
1106
1107      property_files.Verify(zip_fp, raw_metadata)
1108
1109
1110class PayloadSignerTest(test_utils.ReleaseToolsTestCase):
1111
1112  SIGFILE = 'sigfile.bin'
1113  SIGNED_SIGFILE = 'signed-sigfile.bin'
1114
1115  def setUp(self):
1116    self.testdata_dir = test_utils.get_testdata_dir()
1117    self.assertTrue(os.path.exists(self.testdata_dir))
1118
1119    common.OPTIONS.payload_signer = None
1120    common.OPTIONS.payload_signer_args = []
1121    common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
1122    common.OPTIONS.key_passwords = {
1123        common.OPTIONS.package_key: None,
1124    }
1125
1126  def _assertFilesEqual(self, file1, file2):
1127    with open(file1, 'rb') as fp1, open(file2, 'rb') as fp2:
1128      self.assertEqual(fp1.read(), fp2.read())
1129
1130  @test_utils.SkipIfExternalToolsUnavailable()
1131  def test_init(self):
1132    payload_signer = PayloadSigner()
1133    self.assertEqual('openssl', payload_signer.signer)
1134    self.assertEqual(256, payload_signer.maximum_signature_size)
1135
1136  @test_utils.SkipIfExternalToolsUnavailable()
1137  def test_init_withPassword(self):
1138    common.OPTIONS.package_key = os.path.join(
1139        self.testdata_dir, 'testkey_with_passwd')
1140    common.OPTIONS.key_passwords = {
1141        common.OPTIONS.package_key: 'foo',
1142    }
1143    payload_signer = PayloadSigner()
1144    self.assertEqual('openssl', payload_signer.signer)
1145
1146  def test_init_withExternalSigner(self):
1147    common.OPTIONS.payload_signer_args = ['arg1', 'arg2']
1148    common.OPTIONS.payload_signer_maximum_signature_size = '512'
1149    payload_signer = PayloadSigner(
1150        OPTIONS.package_key, OPTIONS.private_key_suffix, payload_signer='abc')
1151    self.assertEqual('abc', payload_signer.signer)
1152    self.assertEqual(['arg1', 'arg2'], payload_signer.signer_args)
1153    self.assertEqual(512, payload_signer.maximum_signature_size)
1154
1155  @test_utils.SkipIfExternalToolsUnavailable()
1156  def test_GetMaximumSignatureSizeInBytes_512Bytes(self):
1157    signing_key = os.path.join(self.testdata_dir, 'testkey_RSA4096.key')
1158    # pylint: disable=protected-access
1159    signature_size = PayloadSigner._GetMaximumSignatureSizeInBytes(signing_key)
1160    self.assertEqual(512, signature_size)
1161
1162  @test_utils.SkipIfExternalToolsUnavailable()
1163  def test_GetMaximumSignatureSizeInBytes_ECKey(self):
1164    signing_key = os.path.join(self.testdata_dir, 'testkey_EC.key')
1165    # pylint: disable=protected-access
1166    signature_size = PayloadSigner._GetMaximumSignatureSizeInBytes(signing_key)
1167    self.assertEqual(72, signature_size)
1168
1169  @test_utils.SkipIfExternalToolsUnavailable()
1170  def test_Sign(self):
1171    payload_signer = PayloadSigner()
1172    input_file = os.path.join(self.testdata_dir, self.SIGFILE)
1173    signed_file = payload_signer.SignHashFile(input_file)
1174
1175    verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
1176    self._assertFilesEqual(verify_file, signed_file)
1177
1178  def test_Sign_withExternalSigner_openssl(self):
1179    """Uses openssl as the external payload signer."""
1180    common.OPTIONS.payload_signer_args = [
1181        'pkeyutl', '-sign', '-keyform', 'DER', '-inkey',
1182        os.path.join(self.testdata_dir, 'testkey.pk8'),
1183        '-pkeyopt', 'digest:sha256']
1184    payload_signer = PayloadSigner(
1185        OPTIONS.package_key, OPTIONS.private_key_suffix, payload_signer="openssl")
1186    input_file = os.path.join(self.testdata_dir, self.SIGFILE)
1187    signed_file = payload_signer.SignHashFile(input_file)
1188
1189    verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
1190    self._assertFilesEqual(verify_file, signed_file)
1191
1192  def test_Sign_withExternalSigner_script(self):
1193    """Uses testdata/payload_signer.sh as the external payload signer."""
1194    external_signer = os.path.join(
1195        self.testdata_dir, 'payload_signer.sh')
1196    os.chmod(external_signer, 0o700)
1197    common.OPTIONS.payload_signer_args = [
1198        os.path.join(self.testdata_dir, 'testkey.pk8')]
1199    payload_signer = PayloadSigner(
1200        OPTIONS.package_key, OPTIONS.private_key_suffix, payload_signer=external_signer)
1201    input_file = os.path.join(self.testdata_dir, self.SIGFILE)
1202    signed_file = payload_signer.SignHashFile(input_file)
1203
1204    verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
1205    self._assertFilesEqual(verify_file, signed_file)
1206
1207
1208class PayloadTest(test_utils.ReleaseToolsTestCase):
1209
1210  def setUp(self):
1211    self.testdata_dir = test_utils.get_testdata_dir()
1212    self.assertTrue(os.path.exists(self.testdata_dir))
1213
1214    common.OPTIONS.wipe_user_data = False
1215    common.OPTIONS.payload_signer = None
1216    common.OPTIONS.payload_signer_args = None
1217    common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
1218    common.OPTIONS.key_passwords = {
1219        common.OPTIONS.package_key: None,
1220    }
1221
1222  @staticmethod
1223  def _create_payload_full(secondary=False):
1224    target_file = construct_target_files(secondary)
1225    payload = PayloadGenerator(secondary, OPTIONS.wipe_user_data)
1226    payload.Generate(target_file)
1227    return payload
1228
1229  @staticmethod
1230  def _create_payload_incremental():
1231    target_file = construct_target_files()
1232    source_file = construct_target_files()
1233    payload = PayloadGenerator()
1234    payload.Generate(target_file, source_file)
1235    return payload
1236
1237  @test_utils.SkipIfExternalToolsUnavailable()
1238  def test_Generate_full(self):
1239    payload = self._create_payload_full()
1240    self.assertTrue(os.path.exists(payload.payload_file))
1241
1242  @test_utils.SkipIfExternalToolsUnavailable()
1243  def test_Generate_incremental(self):
1244    payload = self._create_payload_incremental()
1245    self.assertTrue(os.path.exists(payload.payload_file))
1246
1247  @test_utils.SkipIfExternalToolsUnavailable()
1248  def test_Generate_additionalArgs(self):
1249    target_file = construct_target_files()
1250    source_file = construct_target_files()
1251    payload = PayloadGenerator()
1252    # This should work the same as calling payload.Generate(target_file,
1253    # source_file).
1254    payload.Generate(
1255        target_file, additional_args=["--source_image", source_file])
1256    self.assertTrue(os.path.exists(payload.payload_file))
1257
1258  @test_utils.SkipIfExternalToolsUnavailable()
1259  def test_Generate_invalidInput(self):
1260    target_file = construct_target_files()
1261    common.ZipDelete(target_file, 'IMAGES/vendor.img')
1262    payload = PayloadGenerator()
1263    self.assertRaises(common.ExternalError, payload.Generate, target_file)
1264
1265  @test_utils.SkipIfExternalToolsUnavailable()
1266  def test_Sign_full(self):
1267    payload = self._create_payload_full()
1268    payload.Sign(PayloadSigner())
1269
1270    output_file = common.MakeTempFile(suffix='.zip')
1271    with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
1272      payload.WriteToZip(output_zip)
1273
1274    import check_ota_package_signature
1275    check_ota_package_signature.VerifyAbOtaPayload(
1276        os.path.join(self.testdata_dir, 'testkey.x509.pem'),
1277        output_file)
1278
1279  @test_utils.SkipIfExternalToolsUnavailable()
1280  def test_Sign_incremental(self):
1281    payload = self._create_payload_incremental()
1282    payload.Sign(PayloadSigner())
1283
1284    output_file = common.MakeTempFile(suffix='.zip')
1285    with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
1286      payload.WriteToZip(output_zip)
1287
1288    import check_ota_package_signature
1289    check_ota_package_signature.VerifyAbOtaPayload(
1290        os.path.join(self.testdata_dir, 'testkey.x509.pem'),
1291        output_file)
1292
1293  @test_utils.SkipIfExternalToolsUnavailable()
1294  def test_Sign_withDataWipe(self):
1295    common.OPTIONS.wipe_user_data = True
1296    payload = self._create_payload_full()
1297    payload.Sign(PayloadSigner())
1298    with tempfile.NamedTemporaryFile() as fp:
1299      with zipfile.ZipFile(fp, "w") as zfp:
1300        payload.WriteToZip(zfp)
1301
1302    with open(payload.payload_properties) as properties_fp:
1303      self.assertIn("POWERWASH=1", properties_fp.read())
1304
1305  @test_utils.SkipIfExternalToolsUnavailable()
1306  def test_Sign_secondary(self):
1307    payload = self._create_payload_full(secondary=True)
1308    payload.Sign(PayloadSigner())
1309    with tempfile.NamedTemporaryFile() as fp:
1310      with zipfile.ZipFile(fp, "w") as zfp:
1311        payload.WriteToZip(zfp)
1312
1313    with open(payload.payload_properties) as properties_fp:
1314      self.assertIn("SWITCH_SLOT_ON_REBOOT=0", properties_fp.read())
1315
1316  @test_utils.SkipIfExternalToolsUnavailable()
1317  def test_Sign_badSigner(self):
1318    """Tests that signing failure can be captured."""
1319    payload = self._create_payload_full()
1320    payload_signer = PayloadSigner()
1321    payload_signer.signer_args.append('bad-option')
1322    self.assertRaises(common.ExternalError, payload.Sign, payload_signer)
1323
1324  @test_utils.SkipIfExternalToolsUnavailable()
1325  def test_WriteToZip(self):
1326    payload = self._create_payload_full()
1327    payload.Sign(PayloadSigner())
1328
1329    output_file = common.MakeTempFile(suffix='.zip')
1330    with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
1331      payload.WriteToZip(output_zip)
1332
1333    with zipfile.ZipFile(output_file) as verify_zip:
1334      # First make sure we have the essential entries.
1335      namelist = verify_zip.namelist()
1336      self.assertIn(PayloadGenerator.PAYLOAD_BIN, namelist)
1337      self.assertIn(PayloadGenerator.PAYLOAD_PROPERTIES_TXT, namelist)
1338
1339      # Then assert these entries are stored.
1340      for entry_info in verify_zip.infolist():
1341        if entry_info.filename not in (PayloadGenerator.PAYLOAD_BIN,
1342                                       PayloadGenerator.PAYLOAD_PROPERTIES_TXT):
1343          continue
1344        self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type)
1345
1346  @test_utils.SkipIfExternalToolsUnavailable()
1347  def test_WriteToZip_secondary(self):
1348    payload = self._create_payload_full(secondary=True)
1349    payload.Sign(PayloadSigner())
1350
1351    output_file = common.MakeTempFile(suffix='.zip')
1352    with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
1353      payload.WriteToZip(output_zip)
1354
1355    with zipfile.ZipFile(output_file) as verify_zip:
1356      # First make sure we have the essential entries.
1357      namelist = verify_zip.namelist()
1358      self.assertIn(PayloadGenerator.SECONDARY_PAYLOAD_BIN, namelist)
1359      self.assertIn(PayloadGenerator.SECONDARY_PAYLOAD_PROPERTIES_TXT, namelist)
1360
1361      # Then assert these entries are stored.
1362      for entry_info in verify_zip.infolist():
1363        if entry_info.filename not in (
1364                PayloadGenerator.SECONDARY_PAYLOAD_BIN,
1365                PayloadGenerator.SECONDARY_PAYLOAD_PROPERTIES_TXT):
1366          continue
1367        self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type)
1368
1369
1370class RuntimeFingerprintTest(test_utils.ReleaseToolsTestCase):
1371  MISC_INFO = [
1372      'recovery_api_version=3',
1373      'fstab_version=2',
1374      'recovery_as_boot=true',
1375      'ab_update=true',
1376  ]
1377
1378  BUILD_PROP = [
1379      'ro.build.id=build-id',
1380      'ro.build.version.incremental=version-incremental',
1381      'ro.build.type=build-type',
1382      'ro.build.tags=build-tags',
1383      'ro.build.version.release=version-release',
1384      'ro.build.version.release_or_codename=version-release',
1385      'ro.build.version.sdk=30',
1386      'ro.build.version.security_patch=2020',
1387      'ro.build.date.utc=12345678',
1388      'ro.system.build.version.release=version-release',
1389      'ro.system.build.id=build-id',
1390      'ro.system.build.version.incremental=version-incremental',
1391      'ro.system.build.type=build-type',
1392      'ro.system.build.tags=build-tags',
1393      'ro.system.build.version.sdk=30',
1394      'ro.system.build.version.security_patch=2020',
1395      'ro.system.build.date.utc=12345678',
1396      'ro.product.system.brand=generic',
1397      'ro.product.system.name=generic',
1398      'ro.product.system.device=generic',
1399  ]
1400
1401  VENDOR_BUILD_PROP = [
1402      'ro.vendor.build.version.release=version-release',
1403      'ro.vendor.build.id=build-id',
1404      'ro.vendor.build.version.incremental=version-incremental',
1405      'ro.vendor.build.type=build-type',
1406      'ro.vendor.build.tags=build-tags',
1407      'ro.vendor.build.version.sdk=30',
1408      'ro.vendor.build.version.security_patch=2020',
1409      'ro.vendor.build.date.utc=12345678',
1410      'ro.product.vendor.brand=vendor-product-brand',
1411      'ro.product.vendor.name=vendor-product-name',
1412      'ro.product.vendor.device=vendor-product-device'
1413  ]
1414
1415  def setUp(self):
1416    common.OPTIONS.oem_dicts = None
1417    self.test_dir = common.MakeTempDir()
1418    self.writeFiles({'META/misc_info.txt': '\n'.join(self.MISC_INFO)},
1419                    self.test_dir)
1420
1421  def writeFiles(self, contents_dict, out_dir):
1422    for path, content in contents_dict.items():
1423      abs_path = os.path.join(out_dir, path)
1424      dir_name = os.path.dirname(abs_path)
1425      if not os.path.exists(dir_name):
1426        os.makedirs(dir_name)
1427      with open(abs_path, 'w') as f:
1428        f.write(content)
1429
1430  @staticmethod
1431  def constructFingerprint(prefix):
1432    return '{}:version-release/build-id/version-incremental:' \
1433           'build-type/build-tags'.format(prefix)
1434
1435  def test_CalculatePossibleFingerprints_no_dynamic_fingerprint(self):
1436    build_prop = copy.deepcopy(self.BUILD_PROP)
1437    build_prop.extend([
1438        'ro.product.brand=product-brand',
1439        'ro.product.name=product-name',
1440        'ro.product.device=product-device',
1441    ])
1442    self.writeFiles({
1443        'SYSTEM/build.prop': '\n'.join(build_prop),
1444        'VENDOR/build.prop': '\n'.join(self.VENDOR_BUILD_PROP),
1445    }, self.test_dir)
1446
1447    build_info = common.BuildInfo(common.LoadInfoDict(self.test_dir))
1448    expected = ({'product-device'},
1449                {self.constructFingerprint(
1450                    'product-brand/product-name/product-device')})
1451    self.assertEqual(expected,
1452                     CalculateRuntimeDevicesAndFingerprints(build_info, {}))
1453
1454  def test_CalculatePossibleFingerprints_single_override(self):
1455    vendor_build_prop = copy.deepcopy(self.VENDOR_BUILD_PROP)
1456    vendor_build_prop.extend([
1457        'import /vendor/etc/build_${ro.boot.sku_name}.prop',
1458    ])
1459    self.writeFiles({
1460        'SYSTEM/build.prop': '\n'.join(self.BUILD_PROP),
1461        'VENDOR/build.prop': '\n'.join(vendor_build_prop),
1462        'VENDOR/etc/build_std.prop':
1463        'ro.product.vendor.name=vendor-product-std',
1464        'VENDOR/etc/build_pro.prop':
1465        'ro.product.vendor.name=vendor-product-pro',
1466    }, self.test_dir)
1467
1468    build_info = common.BuildInfo(common.LoadInfoDict(self.test_dir))
1469    boot_variable_values = {'ro.boot.sku_name': ['std', 'pro']}
1470
1471    expected = ({'vendor-product-device'}, {
1472        self.constructFingerprint(
1473            'vendor-product-brand/vendor-product-name/vendor-product-device'),
1474        self.constructFingerprint(
1475            'vendor-product-brand/vendor-product-std/vendor-product-device'),
1476        self.constructFingerprint(
1477            'vendor-product-brand/vendor-product-pro/vendor-product-device'),
1478    })
1479    self.assertEqual(
1480        expected, CalculateRuntimeDevicesAndFingerprints(
1481            build_info, boot_variable_values))
1482
1483  def test_CalculatePossibleFingerprints_multiple_overrides(self):
1484    vendor_build_prop = copy.deepcopy(self.VENDOR_BUILD_PROP)
1485    vendor_build_prop.extend([
1486        'import /vendor/etc/build_${ro.boot.sku_name}.prop',
1487        'import /vendor/etc/build_${ro.boot.device_name}.prop',
1488    ])
1489    self.writeFiles({
1490        'SYSTEM/build.prop': '\n'.join(self.BUILD_PROP),
1491        'VENDOR/build.prop': '\n'.join(vendor_build_prop),
1492        'VENDOR/etc/build_std.prop':
1493        'ro.product.vendor.name=vendor-product-std',
1494        'VENDOR/etc/build_product1.prop':
1495        'ro.product.vendor.device=vendor-device-product1',
1496        'VENDOR/etc/build_pro.prop':
1497        'ro.product.vendor.name=vendor-product-pro',
1498        'VENDOR/etc/build_product2.prop':
1499        'ro.product.vendor.device=vendor-device-product2',
1500    }, self.test_dir)
1501
1502    build_info = common.BuildInfo(common.LoadInfoDict(self.test_dir))
1503    boot_variable_values = {
1504        'ro.boot.sku_name': ['std', 'pro'],
1505        'ro.boot.device_name': ['product1', 'product2'],
1506    }
1507
1508    expected_devices = {'vendor-product-device', 'vendor-device-product1',
1509                        'vendor-device-product2'}
1510    expected_fingerprints = {
1511        self.constructFingerprint(
1512            'vendor-product-brand/vendor-product-name/vendor-product-device'),
1513        self.constructFingerprint(
1514            'vendor-product-brand/vendor-product-std/vendor-device-product1'),
1515        self.constructFingerprint(
1516            'vendor-product-brand/vendor-product-pro/vendor-device-product1'),
1517        self.constructFingerprint(
1518            'vendor-product-brand/vendor-product-std/vendor-device-product2'),
1519        self.constructFingerprint(
1520            'vendor-product-brand/vendor-product-pro/vendor-device-product2')
1521    }
1522    self.assertEqual((expected_devices, expected_fingerprints),
1523                     CalculateRuntimeDevicesAndFingerprints(
1524                         build_info, boot_variable_values))
1525
1526  def test_GetPackageMetadata_full_package(self):
1527    vendor_build_prop = copy.deepcopy(self.VENDOR_BUILD_PROP)
1528    vendor_build_prop.extend([
1529        'import /vendor/etc/build_${ro.boot.sku_name}.prop',
1530    ])
1531    self.writeFiles({
1532        'SYSTEM/build.prop': '\n'.join(self.BUILD_PROP),
1533        'VENDOR/build.prop': '\n'.join(vendor_build_prop),
1534        'VENDOR/etc/build_std.prop':
1535        'ro.product.vendor.name=vendor-product-std',
1536        'VENDOR/etc/build_pro.prop':
1537        'ro.product.vendor.name=vendor-product-pro',
1538        AB_PARTITIONS: '\n'.join(['system', 'vendor']),
1539    }, self.test_dir)
1540
1541    common.OPTIONS.boot_variable_file = common.MakeTempFile()
1542    with open(common.OPTIONS.boot_variable_file, 'w') as f:
1543      f.write('ro.boot.sku_name=std,pro')
1544
1545    build_info = common.BuildInfo(common.LoadInfoDict(self.test_dir))
1546    metadata_dict = BuildLegacyOtaMetadata(GetPackageMetadata(build_info))
1547    self.assertEqual('vendor-product-device', metadata_dict['pre-device'])
1548    fingerprints = [
1549        self.constructFingerprint(
1550            'vendor-product-brand/vendor-product-name/vendor-product-device'),
1551        self.constructFingerprint(
1552            'vendor-product-brand/vendor-product-pro/vendor-product-device'),
1553        self.constructFingerprint(
1554            'vendor-product-brand/vendor-product-std/vendor-product-device'),
1555    ]
1556    self.assertEqual('|'.join(fingerprints), metadata_dict['post-build'])
1557
1558  def CheckMetadataEqual(self, metadata_dict, metadata_proto):
1559    post_build = metadata_proto.postcondition
1560    self.assertEqual('|'.join(post_build.build),
1561                     metadata_dict['post-build'])
1562    self.assertEqual(post_build.build_incremental,
1563                     metadata_dict['post-build-incremental'])
1564    self.assertEqual(post_build.sdk_level,
1565                     metadata_dict['post-sdk-level'])
1566    self.assertEqual(post_build.security_patch_level,
1567                     metadata_dict['post-security-patch-level'])
1568
1569    if metadata_proto.type == ota_metadata_pb2.OtaMetadata.AB:
1570      ota_type = 'AB'
1571    elif metadata_proto.type == ota_metadata_pb2.OtaMetadata.BLOCK:
1572      ota_type = 'BLOCK'
1573    else:
1574      ota_type = ''
1575    self.assertEqual(ota_type, metadata_dict['ota-type'])
1576    self.assertEqual(metadata_proto.wipe,
1577                     metadata_dict.get('ota-wipe') == 'yes')
1578    self.assertEqual(metadata_proto.required_cache,
1579                     int(metadata_dict.get('ota-required-cache', 0)))
1580    self.assertEqual(metadata_proto.retrofit_dynamic_partitions,
1581                     metadata_dict.get(
1582                         'ota-retrofit-dynamic-partitions') == 'yes')
1583
1584  def test_GetPackageMetadata_incremental_package(self):
1585    vendor_build_prop = copy.deepcopy(self.VENDOR_BUILD_PROP)
1586    vendor_build_prop.extend([
1587        'import /vendor/etc/build_${ro.boot.sku_name}.prop',
1588    ])
1589    self.writeFiles({
1590        'META/misc_info.txt': '\n'.join(self.MISC_INFO),
1591        'META/ab_partitions.txt': '\n'.join(['system', 'vendor', 'product']),
1592        'SYSTEM/build.prop': '\n'.join(self.BUILD_PROP),
1593        'VENDOR/build.prop': '\n'.join(vendor_build_prop),
1594        'VENDOR/etc/build_std.prop':
1595        'ro.product.vendor.device=vendor-device-std',
1596        'VENDOR/etc/build_pro.prop':
1597        'ro.product.vendor.device=vendor-device-pro',
1598    }, self.test_dir)
1599
1600    common.OPTIONS.boot_variable_file = common.MakeTempFile()
1601    with open(common.OPTIONS.boot_variable_file, 'w') as f:
1602      f.write('ro.boot.sku_name=std,pro')
1603
1604    source_dir = common.MakeTempDir()
1605    source_build_prop = [
1606        'ro.build.version.release=source-version-release',
1607        'ro.build.id=source-build-id',
1608        'ro.build.version.incremental=source-version-incremental',
1609        'ro.build.type=build-type',
1610        'ro.build.tags=build-tags',
1611        'ro.build.version.sdk=29',
1612        'ro.build.version.security_patch=2020',
1613        'ro.build.date.utc=12340000',
1614        'ro.system.build.version.release=source-version-release',
1615        'ro.system.build.id=source-build-id',
1616        'ro.system.build.version.incremental=source-version-incremental',
1617        'ro.system.build.type=build-type',
1618        'ro.system.build.tags=build-tags',
1619        'ro.system.build.version.sdk=29',
1620        'ro.system.build.version.security_patch=2020',
1621        'ro.system.build.date.utc=12340000',
1622        'ro.product.system.brand=generic',
1623        'ro.product.system.name=generic',
1624        'ro.product.system.device=generic',
1625    ]
1626    self.writeFiles({
1627        'META/misc_info.txt': '\n'.join(self.MISC_INFO),
1628        'META/ab_partitions.txt': '\n'.join(['system', 'vendor', 'product']),
1629        'SYSTEM/build.prop': '\n'.join(source_build_prop),
1630        'VENDOR/build.prop': '\n'.join(vendor_build_prop),
1631        'VENDOR/etc/build_std.prop':
1632        'ro.product.vendor.device=vendor-device-std',
1633        'VENDOR/etc/build_pro.prop':
1634        'ro.product.vendor.device=vendor-device-pro',
1635    }, source_dir)
1636    common.OPTIONS.incremental_source = source_dir
1637
1638    target_info = common.BuildInfo(common.LoadInfoDict(self.test_dir))
1639    source_info = common.BuildInfo(common.LoadInfoDict(source_dir))
1640
1641    metadata_proto = GetPackageMetadata(target_info, source_info)
1642    metadata_dict = BuildLegacyOtaMetadata(metadata_proto)
1643    self.assertEqual(
1644        'vendor-device-pro|vendor-device-std|vendor-product-device',
1645        metadata_dict['pre-device'])
1646    source_suffix = ':source-version-release/source-build-id/' \
1647                    'source-version-incremental:build-type/build-tags'
1648    pre_fingerprints = [
1649        'vendor-product-brand/vendor-product-name/vendor-device-pro'
1650        '{}'.format(source_suffix),
1651        'vendor-product-brand/vendor-product-name/vendor-device-std'
1652        '{}'.format(source_suffix),
1653        'vendor-product-brand/vendor-product-name/vendor-product-device'
1654        '{}'.format(source_suffix),
1655    ]
1656    self.assertEqual('|'.join(pre_fingerprints), metadata_dict['pre-build'])
1657
1658    post_fingerprints = [
1659        self.constructFingerprint(
1660            'vendor-product-brand/vendor-product-name/vendor-device-pro'),
1661        self.constructFingerprint(
1662            'vendor-product-brand/vendor-product-name/vendor-device-std'),
1663        self.constructFingerprint(
1664            'vendor-product-brand/vendor-product-name/vendor-product-device'),
1665    ]
1666    self.assertEqual('|'.join(post_fingerprints), metadata_dict['post-build'])
1667
1668    self.CheckMetadataEqual(metadata_dict, metadata_proto)
1669
1670    pre_partition_states = metadata_proto.precondition.partition_state
1671    self.assertEqual(2, len(pre_partition_states))
1672    self.assertEqual('system', pre_partition_states[0].partition_name)
1673    self.assertEqual(['generic'], pre_partition_states[0].device)
1674    self.assertEqual(['generic/generic/generic{}'.format(source_suffix)],
1675                     pre_partition_states[0].build)
1676
1677    self.assertEqual('vendor', pre_partition_states[1].partition_name)
1678    self.assertEqual(['vendor-device-pro', 'vendor-device-std',
1679                      'vendor-product-device'], pre_partition_states[1].device)
1680    vendor_fingerprints = post_fingerprints
1681    self.assertEqual(vendor_fingerprints, pre_partition_states[1].build)
1682
1683    post_partition_states = metadata_proto.postcondition.partition_state
1684    self.assertEqual(2, len(post_partition_states))
1685    self.assertEqual('system', post_partition_states[0].partition_name)
1686    self.assertEqual(['generic'], post_partition_states[0].device)
1687    self.assertEqual([self.constructFingerprint('generic/generic/generic')],
1688                     post_partition_states[0].build)
1689
1690    self.assertEqual('vendor', post_partition_states[1].partition_name)
1691    self.assertEqual(['vendor-device-pro', 'vendor-device-std',
1692                      'vendor-product-device'], post_partition_states[1].device)
1693    self.assertEqual(vendor_fingerprints, post_partition_states[1].build)
1694