• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2
3# Copyright 2019, The Android Open Source Project
4#
5# Permission is hereby granted, free of charge, to any person
6# obtaining a copy of this software and associated documentation
7# files (the "Software"), to deal in the Software without
8# restriction, including without limitation the rights to use, copy,
9# modify, merge, publish, distribute, sublicense, and/or sell copies
10# of the Software, and to permit persons to whom the Software is
11# furnished to do so, subject to the following conditions:
12#
13# The above copyright notice and this permission notice shall be
14# included in all copies or substantial portions of the Software.
15#
16# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
20# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
21# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
22# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23# SOFTWARE.
24#
25"""Unit tests for aftltool."""
26
27import argparse
28import binascii
29import io
30import os
31import struct
32import sys
33import tempfile
34import unittest
35
36import aftltool
37import avbtool
38
39# pylint: disable=import-error
40import api_pb2
41# pylint: enable=import-error
42
43
44# Workaround for b/149307145 in order to pick up the test data from the right
45# location independent where the script is called from.
46# TODO(b/149307145): Remove workaround once the referenced bug is fixed.
47TEST_EXEC_PATH = os.path.dirname(os.path.realpath(__file__))
48
49class TlsDataTest(unittest.TestCase):
50
51  def test_decode(self):
52    data = io.BytesIO(b'\x01\x02')
53    value = aftltool.tls_decode_bytes('B', data)
54    self.assertEqual(value, b'\x02')
55    self.assertEqual(data.read(), b'')
56
57    data = io.BytesIO(b'\x00\x01\x03\xff')
58    value = aftltool.tls_decode_bytes('H', data)
59    self.assertEqual(value, b'\x03')
60    self.assertEqual(data.read(), b'\xff')
61
62    data = io.BytesIO(b'\x00\x00\x00\x02\x04\x05\xff\xff')
63    value = aftltool.tls_decode_bytes('L', data)
64    self.assertEqual(value, b'\x04\x05')
65    self.assertEqual(data.read(), b'\xff\xff')
66
67  def test_decode_invalid(self):
68    # Insufficient data for reading the size.
69    with self.assertRaises(aftltool.AftlError):
70      aftltool.tls_decode_bytes('B', io.BytesIO(b''))
71
72    # Invalid byte_size character.
73    with self.assertRaises(aftltool.AftlError):
74      aftltool.tls_decode_bytes('/o/', io.BytesIO(b'\x01\x02\xff'))
75
76    # Insufficient data for reading the value.
77    with self.assertRaises(aftltool.AftlError):
78      aftltool.tls_decode_bytes('B', io.BytesIO(b'\x01'))
79
80  def test_encode(self):
81    stream = io.BytesIO()
82    aftltool.tls_encode_bytes('B', b'\x01\x02\x03\x04', stream)
83    self.assertEqual(stream.getvalue(), b'\x04\x01\x02\x03\x04')
84
85    stream = io.BytesIO()
86    aftltool.tls_encode_bytes('H', b'\x01\x02\x03\x04', stream)
87    self.assertEqual(stream.getvalue(), b'\x00\x04\x01\x02\x03\x04')
88
89  def test_encode_invalid(self):
90    # Byte size is not large enough to encode the value.
91    stream = io.BytesIO()
92    with self.assertRaises(aftltool.AftlError):
93      aftltool.tls_encode_bytes('B', b'\x01'*256, stream)
94
95    # Invalid byte_size character.
96    stream = io.BytesIO()
97    with self.assertRaises(aftltool.AftlError):
98      aftltool.tls_encode_bytes('/o/', b'\x01\x02', stream)
99
100
101class VBMetaPrimaryAnnotationTest(unittest.TestCase):
102
103  def test_decode(self):
104    stream = io.BytesIO(b'\x00\x00\x00\x00\x00')
105    anno = aftltool.VBMetaPrimaryAnnotation.parse(stream)
106    self.assertEqual(anno.vbmeta_hash, b'')
107    self.assertEqual(anno.version_incremental, '')
108    self.assertEqual(anno.manufacturer_key_hash, b'')
109    self.assertEqual(anno.description, '')
110
111  def test_encode(self):
112    stream = io.BytesIO()
113    anno = aftltool.VBMetaPrimaryAnnotation()
114    anno.encode(stream)
115    self.assertEqual(stream.getvalue(), b'\x00\x00\x00\x00\x00')
116
117  def test_encode_invalid(self):
118    stream = io.BytesIO()
119    anno = aftltool.VBMetaPrimaryAnnotation()
120    # Version incremental should be ASCII only.
121    anno.version_incremental = '☃'
122    with self.assertRaises(aftltool.AftlError):
123      anno.encode(stream)
124
125
126class SignedVBMetaAnnotationLeafTest(unittest.TestCase):
127
128  def test_encode(self):
129    leaf = aftltool.SignedVBMetaPrimaryAnnotationLeaf()
130    self.assertEqual(leaf.encode(),
131                     b'\x01'   # Version
132                     b'\x00\x00\x00\x00\x00\x00\x00\x00'  # Timestamp
133                     b'\x01' + # Leaf Type
134                     b'\x00' * 4 + # Empty Signature
135                     b'\x00' * 5) # Empty Annotation
136
137  def test_encode_invalid_type(self):
138    # The version field must be a 1-byte integer.
139    leaf = aftltool.SignedVBMetaPrimaryAnnotationLeaf()
140    leaf.version = 'x'
141    with self.assertRaises(aftltool.AftlError):
142      leaf.encode()
143
144  def test_encode_invalid_size(self):
145    leaf = aftltool.SignedVBMetaPrimaryAnnotationLeaf()
146    leaf.version = 256
147    with self.assertRaises(aftltool.AftlError):
148      leaf.encode()
149
150
151class AftltoolTestCase(unittest.TestCase):
152
153  def setUp(self):
154    """Sets up the test bed for the unit tests."""
155    super(AftltoolTestCase, self).setUp()
156
157    # Redirects the stderr to /dev/null when running the unittests. The reason
158    # is that soong interprets any output on stderr as an error and marks the
159    # unit test as failed although the test itself succeeded.
160    self.stderr = sys.stderr
161    self.null = open(os.devnull, 'wt')
162    sys.stderr = self.null
163
164    # AFTL public key.
165    self.test_aftl_pub_key = (
166        '-----BEGIN PUBLIC KEY-----\n'
167        'MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEA4ilqCNsenNA013iCdwgD\n'
168        'YPxZ853nbHG9lMBp9boXiwRcqT/8bUKHIL7YX5z7s+QoRYVY3rkMKppRabclXzyx\n'
169        'H59YnPMaU4uv7NqwWzjgaZo7E+vo7IF+KBjV3cJulId5Av0yIYUCsrwd7MpGtWdC\n'
170        'Q3S+7Vd4zwzCKEhcvliNIhnNlp1U3wNkPCxOyCAsMEn6k8O5ar12ke5TvxDv15db\n'
171        'rPDeHh8G2OYWoCkWL+lSN35L2kOJqKqVbLKWrrOd96RCYrrtbPCi580OADJRcUlG\n'
172        'lgcjwmNwmypBWvQMZ6ITj0P0ksHnl1zZz1DE2rXe1goLI1doghb5KxLaezlR8c2C\n'
173        'E3w/uo9KJgNmNgUVzzqZZ6FE0moyIDNOpP7KtZAL0DvEZj6jqLbB0ccPQElrg52m\n'
174        'Dv2/A3nYSr0mYBKeskT4+Bg7PGgoC8p7WyLSxMyzJEDYdtrj9OFx6eZaA23oqTQx\n'
175        'k3Qq5H8RfNBeeSUEeKF7pKH/7gyqZ2bNzBFMA2EBZgBozwRfaeN/HCv3qbaCnwvu\n'
176        '6caacmAsK+RxiYxSL1QsJqyhCWWGxVyenmxdc1KG/u5ypi7OIioztyzR3t2tAzD3\n'
177        'Nb+2t8lgHBRxbV24yiPlnvPmB1ZYEctXnlRR9Evpl1o9xA9NnybPHKr9rozN39CZ\n'
178        'V/USB8K6ao1y5xPZxa8CZksCAwEAAQ==\n'
179        '-----END PUBLIC KEY-----\n')
180
181    # Test AftlIcpEntry #1
182    self.test_tl_url_1 = 'aftl-test-server.google.com'
183
184    self.test_sth_1 = aftltool.TrillianLogRootDescriptor()
185    self.test_sth_1.tree_size = 2
186    self.test_sth_1.root_hash_size = 32
187    self.test_sth_1.root_hash = b'f' * 32
188    self.test_sth_1.timestamp = 0x1234567890ABCDEF
189    self.test_sth_1.revision = 0xFEDCBA0987654321
190
191    self.test_sth_1_bytes = (
192        b'\x00\x01'                          # version
193        b'\x00\x00\x00\x00\x00\x00\x00\x02'  # tree_size
194        b'\x20'                              # root_hash_size
195        + b'f' * 32 +                        # root_hash
196        b'\x12\x34\x56\x78\x90\xAB\xCD\xEF'  # timestamp
197        b'\xFE\xDC\xBA\x09\x87\x65\x43\x21'  # revision
198        b'\x00\x00'                          # metadata_size
199        b''                                  # metadata (empty)
200    )
201
202    # Test Annotation #1
203    anno_1 = aftltool.VBMetaPrimaryAnnotation(vbmeta_hash=b'w'*32,
204                                              version_incremental='x'*5,
205                                              manufacturer_key_hash=b'y'*32,
206                                              description='z'*51)
207    signed_anno_1 = aftltool.SignedVBMetaPrimaryAnnotation(annotation=anno_1)
208
209    self.test_anno_1 = aftltool.SignedVBMetaPrimaryAnnotationLeaf(
210        signed_vbmeta_primary_annotation=signed_anno_1)
211    self.test_anno_1_bytes = (
212        b'\x01'                              # version
213        b'\x00\x00\x00\x00\x00\x00\x00\x00'  # timestamp
214        b'\x01'                              # leaf_type
215        b'\x00'                              # hash_algorithm
216        b'\x00'                              # signature_algorithm
217        + b'\x00\x00'                        # signature
218        + b'\x20' + b'w' * 32                # vbmeta_hash
219        + b'\x05' + b'x' * 5                 # version_incremental
220        + b'\x20' + b'y' * 32                # manufacturer_key_hash
221        + b'\x00\x33' + b'z' * 51            # description
222    )
223
224    # Fill each structure with an easily observable pattern for easy validation.
225    self.test_proof_hashes_1 = []
226    self.test_proof_hashes_1.append(b'b' * 32)
227    self.test_proof_hashes_1.append(b'c' * 32)
228    self.test_proof_hashes_1.append(b'd' * 32)
229    self.test_proof_hashes_1.append(b'e' * 32)
230
231    # Valid test AftlIcpEntry #1.
232    self.test_entry_1 = aftltool.AftlIcpEntry()
233    self.test_entry_1.log_url = self.test_tl_url_1
234    self.test_entry_1.leaf_index = 1
235    self.test_entry_1.annotation_leaf = self.test_anno_1
236    self.test_entry_1.log_root_descriptor = self.test_sth_1
237    self.test_entry_1.proofs = self.test_proof_hashes_1
238    self.test_entry_1.log_root_signature = b'g' * 512
239
240    self.test_entry_1_bytes = (
241        b'\x00\x00\x00\x1b'                  # Transparency log url size.
242        b'\x00\x00\x00\x00\x00\x00\x00\x01'  # Leaf index.
243        b'\x00\x00\x00\x3d'                  # Log root descriptor size.
244        b'\x00\x00\x00\x8b'                  # Annotation leaf size.
245        b'\x02\x00'                          # Log root signature size.
246        b'\x04'                              # Number of hashes in ICP.
247        b'\x00\x00\x00\x80'                  # Size of ICP in bytes.
248        + self.test_tl_url_1.encode('ascii') # Transparency log url.
249        + self.test_sth_1_bytes
250        + self.test_anno_1_bytes
251        + b'g' * 512                         # Log root signature.
252        + b'b' * 32                          # Hashes...
253        + b'c' * 32
254        + b'd' * 32
255        + b'e' * 32)
256
257    # Valid test AftlIcpEntry #2.
258    self.test_tl_url_2 = 'aftl-test-server.google.ch'
259
260    self.test_sth_2 = aftltool.TrillianLogRootDescriptor()
261    self.test_sth_2.tree_size = 4
262    self.test_sth_2.root_hash_size = 32
263    self.test_sth_2.root_hash = b'e' * 32
264    self.test_sth_2.timestamp = 6
265    self.test_sth_2.revision = 7
266    self.test_sth_2.metadata_size = 2
267    self.test_sth_2.metadata = b'12'
268
269    self.test_sth_2_bytes = (
270        b'\x00\x01'                          # version
271        b'\x00\x00\x00\x00\x00\x00\x00\x04'  # tree_size
272        b'\x20'                              # root_hash_size
273        + b'e' * 32 +                        # root_hash
274        b'\x00\x00\x00\x00\x00\x00\x00\x06'  # timestamp
275        b'\x00\x00\x00\x00\x00\x00\x00\x07'  # revision
276        b'\x00\x02'                          # metadata_size
277        b'12'                                # metadata
278    )
279
280    # Fill each structure with an easily observable pattern for easy validation.
281    self.test_proof_hashes_2 = []
282    self.test_proof_hashes_2.append(b'g' * 32)
283    self.test_proof_hashes_2.append(b'h' * 32)
284
285    self.test_entry_2 = aftltool.AftlIcpEntry()
286    self.test_entry_2.log_url = self.test_tl_url_2
287    self.test_entry_2.leaf_index = 2
288    self.test_entry_2.annotation_leaf = self.test_anno_1
289    self.test_entry_2.log_root_descriptor = self.test_sth_2
290    self.test_entry_2.log_root_signature = b'd' * 512
291    self.test_entry_2.proofs = self.test_proof_hashes_2
292
293    self.test_entry_2_bytes = (
294        b'\x00\x00\x00\x1a'                   # Transparency log url size.
295        b'\x00\x00\x00\x00\x00\x00\x00\x02'   # Leaf index.
296        b'\x00\x00\x00\x3f'                   # Log root descriptor size.
297        b'\x00\x00\x00\x8b'                   # Annotation leaf size.
298        b'\x02\x00'                           # Log root signature size.
299        b'\x02'                               # Number of hashes in ICP.
300        b'\x00\x00\x00\x40'                   # Size of ICP in bytes.
301        + self.test_tl_url_2.encode('ascii')  # Transparency log url.
302        + self.test_sth_2_bytes               # Log root
303        + self.test_anno_1_bytes
304        + b'd' * 512                          # Log root signature.
305        + b'g' * 32                           # Hashes...
306        + b'h' * 32)
307
308    # Valid test AftlImage made out of AftlEntry #1 and #2.
309    self.test_aftl_desc = aftltool.AftlImage()
310    self.test_aftl_desc.add_icp_entry(self.test_entry_1)
311    self.test_aftl_desc.add_icp_entry(self.test_entry_2)
312
313    self.test_expected_aftl_image_bytes = (
314        b'AFTL'                                         # Magic.
315        + struct.pack('!L', avbtool.AVB_VERSION_MAJOR)  # Major version.
316        + struct.pack('!L', avbtool.AVB_VERSION_MINOR)  # Minor version.
317        + b'\x00\x00\x06\xcf'                           # Image size.
318        b'\x00\x02'                                     # Number of ICP entries.
319        + self.test_entry_1_bytes
320        + self.test_entry_2_bytes)
321
322    self.test_avbm_resp = api_pb2.AddVBMetaResponse()
323    self.test_avbm_resp.annotation_proof.proof.leaf_index = 9127
324    hashes = [
325        '61076ca285b4982669e67757f55682ddc43ab5c11ba671260f82a8efa8831f94',
326        '89c2fbcc58da25a65ce5e9b4fb22aaf208b20601f0bc023f73f05d35bc1f3bac',
327        '75d26b5f754b4bed332a3ce2a2bfea0334706a974b7e00ee663f0279fa8b446e',
328        'e1cd9c96feb893b5ef7771e424ac1c6c47509c2b98bc578d22ad07369c9641aa',
329        'e83e0e4dd352b1670a55f93f88781a73bb41efcadb9927399f59459dfa14bc40',
330        '8d5d25996117c88655d66f685baa3c94390867a040507b10587b17fbe92b496a',
331        '5de4c627e9ca712f207d6056f56f0d3286ed4a5381ed7f3cc1aa470217734138',
332        '19acfdb424d7fe28d1f850c76302f78f9a50146a5b9c65f9fdfbbc0173fd6993']
333    for h in hashes:
334      self.test_avbm_resp.annotation_proof.proof.hashes.append(
335          binascii.unhexlify(h))
336    self.test_avbm_resp.annotation_proof.sth.key_hint = binascii.unhexlify(
337        '5af859abce8fe1ea')
338    self.test_avbm_resp.annotation_proof.sth.log_root = binascii.unhexlify(
339        '0001'
340        '00000000000023a8'
341        '20'
342        '9a5f71340f8dc98bdc6320f976dda5f34db8554cb273ba5ab60f1697c519d6f6'
343        '1609ae15024774b1'
344        '0000000000001e5a'
345        '0000'
346    )
347    self.test_avbm_resp.annotation_proof.sth.log_root_signature = (
348        binascii.unhexlify(
349            '7c37903cc76e8689a6b31da9ad56c3daeb6194029510297cc7d147278390da33'
350            '09c4d9eb1f6be0cdcd1de5315b0b3b573cc9fcd8620d3fab956abbe3c597a572'
351            '46e5a5d277c4cc4b590872d0292fa64e1d3285626b1dedeb00b6aa0a7a0717c0'
352            '7d4c89b68fda9091be06180be1369675a7c4ce7f42cca133ef0daf8dcc5ba1ee'
353            '930cef6dcb71b0a7690446e19661c8e18c089a5d6f6fc9299a0592efb33a4db5'
354            '4c640027fa4f0ad0009f8bf75ec5fc17e0fa1091fabe74fe52738443745066ab'
355            '48f99b297809b863c01016abda17a2479fce91f9929c60bc2ce15e474204fc5a'
356            '8e79b2190aadb7c149671e8c76a4da506860f8d6020fb2eaabfee025cc267bad'
357            '3c8257186c8aaf1da9eefe50cae4b3e8deb66033ebc4bfcda2b317f9e7d2dd78'
358            'b47f2d86795815d82058ad4cba8fc7983a3bbf843e9b8c7ec7f1ae137be6848d'
359            '03c76eefdac40ce5e66cc23d9f3e79ad87acbe7ec0c0bb419a7d368ae1e73c85'
360            '742871f847bde69c871e8797638e0e270282fb058ef1cbcba52aded9dcc8249b'
361            '38fbed8424c33b8cfcde4f49797c64dda8d089d73b84062602fd41c66091543c'
362            'e13c18cfa7f8300530ad4b7adb8924bbb86d17bcc5f1d3d74c522a7dcc8c3c1f'
363            '28a999f2fe1bfe5520c66f93f7c90996dc7f52e62dd95ace9ceace90324c3040'
364            '669b7f5aeb5c5a53f217f1de46e32f80d0aaaf7d9cc9d0e8f8fd7026c612103a'
365        )
366    )
367
368    anno = aftltool.VBMetaPrimaryAnnotation(
369        vbmeta_hash=bytes.fromhex(
370            '5623731104bfa1cfdb275df2978d1f93f72f5f0f746f11d06f3091c601c32067'),
371        version_incremental='only_for_testing',
372        manufacturer_key_hash=bytes.fromhex(
373            '83ab3b109b73a1d32dce4153a2de57a1a0485052db8364f3180d98614749d7f7'))
374    raw_signature = bytes.fromhex(
375        '6a523021bc5b933bb58c38c8238be3a5fe1166002f5df8b77dee9dd22d353595'
376        'be7996656d3824ebf4e1411a05ee3652d64669d3d62b167d3290dbdf4f2741ba'
377        '4b6472e1bd71fc1860465fdcdca1ff08c4ab0420d7dcbf4ad144f64e211d8f92'
378        '081ba51192358e2478195e573d000282423b23e6dd945069907dcf11520ff11a'
379        '250e26643b820f8a5d80ccfe7d5d84f58e549cd05630f2254ade8edc88d9aa8a'
380        'ec2089f84643854e1f265a4f746598ce4cae529c4eaa637f6e35fa1d1da9254e'
381        'ec8dfede7a4313f7b151547dcdde98782ce6fb3149326ee5b8e750813d3fd37a'
382        '738fe92f6111bf0dff4091769e216b842980e05716f2e50268a7dcca430e175e'
383        '711f80e41a1a28f20635741ac11a56f97492d30db6d1955a827daf8e83faebe5'
384        'a96e18a13c558ae561a02c90982514c853db0296c2e791e68b77c30e6232a3b7'
385        'ed355441d4706277f33a01735f56cb8279336491731939691683f96f1c3e3183'
386        'a0b77510d6ff0199b7688902044829793106546fd6fd4a5294d63c31c91256ad'
387        'f7be6d053e77875698ad32ffaaeaac5d54b432e537f72549d2543072ae35578f'
388        '138d82afcadd668511ba276ce02b6f9c18ef3b6f2f6ae0d123e9f8cb930f21a9'
389        'c49a6d9e95de741c7860593a956735e1b77e9851ecb1f6572abf6e2c8ba15085'
390        'e37e0f7bab0a30d108b997ed5edd74cf7f89cf082590a6f0af7a3a1f68c0077a')
391    signature = aftltool.Signature(signature=raw_signature)
392    signed_anno = aftltool.SignedVBMetaPrimaryAnnotation(annotation=anno,
393                                                         signature=signature)
394    leaf = aftltool.SignedVBMetaPrimaryAnnotationLeaf(
395        timestamp=1587991742919072870,
396        signed_vbmeta_primary_annotation=signed_anno).encode()
397    self.test_avbm_resp.annotation_leaf = leaf
398
399
400  def tearDown(self):
401    """Tears down the test bed for the unit tests."""
402    # Reconnects stderr back to the normal stderr; see setUp() for details.
403    sys.stderr = self.stderr
404    self.null.close()
405
406    super(AftltoolTestCase, self).tearDown()
407
408  def get_testdata_path(self, relative_path):
409    """Retrieves the absolute path for testdata given the relative path.
410
411    Arguments:
412      relative_path: The relative path to the testdata in the testdata
413        directory.
414
415    Returns:
416      The absolute path to the testdata.
417    """
418    rel_path_parts = ['test', 'data']
419    rel_path_parts.extend(relative_path.split(os.path.sep))
420    return os.path.join(TEST_EXEC_PATH, *rel_path_parts)
421
422
423class AftltoolTest(AftltoolTestCase):
424
425  def test_merkle_root_hash(self):
426    """Tests validation of inclusion proof and the merkle tree calculations.
427
428    The test vectors have been taken from the Trillian tests:
429    https://github.com/google/trillian/blob/v1.3.3/merkle/log_verifier_test.go
430    """
431
432    inclusion_proofs = [
433        (1,
434         8,
435         [
436             binascii.unhexlify('96a296d224f285c67bee93c30f8a3091'
437                                '57f0daa35dc5b87e410b78630a09cfc7'),
438             binascii.unhexlify('5f083f0a1a33ca076a95279832580db3'
439                                'e0ef4584bdff1f54c8a360f50de3031e'),
440             binascii.unhexlify('6b47aaf29ee3c2af9af889bc1fb9254d'
441                                'abd31177f16232dd6aab035ca39bf6e4')
442         ]),
443        (6,
444         8,
445         [
446             binascii.unhexlify('bc1a0643b12e4d2d7c77918f44e0f4f7'
447                                '9a838b6cf9ec5b5c283e1f4d88599e6b'),
448             binascii.unhexlify('ca854ea128ed050b41b35ffc1b87b8eb'
449                                '2bde461e9e3b5596ece6b9d5975a0ae0'),
450             binascii.unhexlify('d37ee418976dd95753c1c73862b9398f'
451                                'a2a2cf9b4ff0fdfe8b30cd95209614b7')
452         ]),
453        (3,
454         3,
455         [
456             binascii.unhexlify('fac54203e7cc696cf0dfcb42c92a1d9d'
457                                'baf70ad9e621f4bd8d98662f00e3c125')
458         ]),
459        (2,
460         5,
461         [
462             binascii.unhexlify('6e340b9cffb37a989ca544e6bb780a2c'
463                                '78901d3fb33738768511a30617afa01d'),
464             binascii.unhexlify('5f083f0a1a33ca076a95279832580db3'
465                                'e0ef4584bdff1f54c8a360f50de3031e'),
466             binascii.unhexlify('bc1a0643b12e4d2d7c77918f44e0f4f7'
467                                '9a838b6cf9ec5b5c283e1f4d88599e6b')
468         ]
469        )
470    ]
471
472    leaves = [
473        binascii.unhexlify(''),
474        binascii.unhexlify('00'),
475        binascii.unhexlify('10'),
476        binascii.unhexlify('2021'),
477        binascii.unhexlify('3031'),
478        binascii.unhexlify('40414243'),
479        binascii.unhexlify('5051525354555657'),
480        binascii.unhexlify('606162636465666768696a6b6c6d6e6f'),
481    ]
482
483    roots = [
484        binascii.unhexlify('6e340b9cffb37a989ca544e6bb780a2c'
485                           '78901d3fb33738768511a30617afa01d'),
486        binascii.unhexlify('fac54203e7cc696cf0dfcb42c92a1d9d'
487                           'baf70ad9e621f4bd8d98662f00e3c125'),
488        binascii.unhexlify('aeb6bcfe274b70a14fb067a5e5578264'
489                           'db0fa9b51af5e0ba159158f329e06e77'),
490        binascii.unhexlify('d37ee418976dd95753c1c73862b9398f'
491                           'a2a2cf9b4ff0fdfe8b30cd95209614b7'),
492        binascii.unhexlify('4e3bbb1f7b478dcfe71fb631631519a3'
493                           'bca12c9aefca1612bfce4c13a86264d4'),
494        binascii.unhexlify('76e67dadbcdf1e10e1b74ddc608abd2f'
495                           '98dfb16fbce75277b5232a127f2087ef'),
496        binascii.unhexlify('ddb89be403809e325750d3d263cd7892'
497                           '9c2942b7942a34b77e122c9594a74c8c'),
498        binascii.unhexlify('5dc9da79a70659a9ad559cb701ded9a2'
499                           'ab9d823aad2f4960cfe370eff4604328'),
500    ]
501
502    for icp in inclusion_proofs:
503      leaf_id = icp[0] - 1
504      leaf_hash = aftltool.rfc6962_hash_leaf(leaves[leaf_id])
505      root_hash = aftltool.root_from_icp(leaf_id, icp[1], icp[2], leaf_hash)
506      self.assertEqual(root_hash, roots[icp[1] -1])
507
508
509class AftlImageTest(AftltoolTestCase):
510
511  def test__init__(self):
512    """Tests the constructor."""
513    # Calls constructor without data.
514    d = aftltool.AftlImage()
515    self.assertIsInstance(d.image_header, aftltool.AftlImageHeader)
516    self.assertEqual(d.image_header.icp_count, 0)
517    self.assertEqual(d.icp_entries, [])
518    self.assertTrue(d.is_valid())
519
520    # Calls constructor with data.
521    d = aftltool.AftlImage(self.test_expected_aftl_image_bytes)
522    self.assertIsInstance(d.image_header, aftltool.AftlImageHeader)
523    self.assertEqual(d.image_header.icp_count, 2)
524    self.assertEqual(len(d.icp_entries), 2)
525    for entry in d.icp_entries:
526      self.assertIsInstance(entry, aftltool.AftlIcpEntry)
527    self.assertTrue(d.is_valid())
528
529  def test_add_icp_entry(self):
530    """Tests the add_icp_entry method."""
531    d = aftltool.AftlImage()
532
533    # Adds 1st ICP.
534    d.add_icp_entry(self.test_entry_1)
535    self.assertEqual(d.image_header.icp_count, 1)
536    self.assertEqual(len(d.icp_entries), 1)
537    self.assertTrue(d.is_valid())
538
539    # Adds 2nd ICP.
540    d.add_icp_entry(self.test_entry_2)
541    self.assertEqual(d.image_header.icp_count, 2)
542    self.assertEqual(len(d.icp_entries), 2)
543    self.assertTrue(d.is_valid())
544
545  def test_verify_vbmeta_image_with_1_icp(self):
546    """Tests the verify_vbmeta_image method."""
547    # Valid vbmeta image without footer with 1 ICP.
548    tool = aftltool.Aftl()
549    image_path = self.get_testdata_path(
550        'aftl_output_vbmeta_with_1_icp.img')
551    vbmeta_image, _ = tool.get_vbmeta_image(image_path)
552    desc = tool.get_aftl_image(image_path)
553
554    # Valid image checked against correct log key.
555    self.assertTrue(desc.verify_vbmeta_image(
556        vbmeta_image, [self.get_testdata_path('aftl_pubkey_1.pem')]))
557
558    # Valid image checked with a key from another log.
559    self.assertFalse(desc.verify_vbmeta_image(
560        vbmeta_image, [self.get_testdata_path('testkey_rsa4096_pub.pem')]))
561
562    # Valid image checked with non existed key file path.
563    self.assertFalse(desc.verify_vbmeta_image(
564        vbmeta_image, [self.get_testdata_path('non_existent_blabli')]))
565
566    # Valid image checked with an invalid key.
567    self.assertFalse(desc.verify_vbmeta_image(
568        vbmeta_image, [self.get_testdata_path('large_blob.bin')]))
569
570    # Valid image checked with empty list of keys.
571    self.assertFalse(desc.verify_vbmeta_image(vbmeta_image, []))
572
573    # Valid image checked with empty list of keys.
574    self.assertFalse(desc.verify_vbmeta_image(vbmeta_image, None))
575
576  def test_verify_vbmeta_image_with_2_icp_from_same_log(self):
577    """Tests the verify_vbmeta_image method."""
578    # Valid vbmeta image without footer with 2 ICPs from same log.
579    tool = aftltool.Aftl()
580    image_path = self.get_testdata_path(
581        'aftl_output_vbmeta_with_2_icp_same_log.img')
582    vbmeta_image, _ = tool.get_vbmeta_image(image_path)
583    desc = tool.get_aftl_image(image_path)
584
585    # Valid image checked against correct log key.
586    self.assertTrue(desc.verify_vbmeta_image(
587        vbmeta_image, [self.get_testdata_path('aftl_pubkey_1.pem')]))
588
589    # Valid vbmeta image checked with key from another log.
590    self.assertFalse(desc.verify_vbmeta_image(
591        vbmeta_image, [self.get_testdata_path('testkey_rsa4096_pub.pem')]))
592
593    # Valid image checked with non existed key file path.
594    self.assertFalse(desc.verify_vbmeta_image(
595        vbmeta_image, [self.get_testdata_path('non_existent_blabli')]))
596
597    # Valid image checked with invalid key.
598    self.assertFalse(desc.verify_vbmeta_image(
599        vbmeta_image, [self.get_testdata_path('large_blob.bin')]))
600
601    # Valid image but checked with empty list of keys.
602    self.assertFalse(desc.verify_vbmeta_image(vbmeta_image, []))
603
604  def test_encode(self):
605    """Tests encode method."""
606    desc_bytes = self.test_aftl_desc.encode()
607    self.assertEqual(desc_bytes, self.test_expected_aftl_image_bytes)
608
609  def test_is_valid(self):
610    """Tests is_valid method."""
611    d = aftltool.AftlImage()
612    d.add_icp_entry(self.test_entry_1)
613    d.add_icp_entry(self.test_entry_2)
614
615    # Force invalid ICP header.
616    old_magic = d.image_header.magic
617    d.image_header.magic = b'YOLO'
618    self.assertFalse(d.is_valid())
619    d.image_header.magic = old_magic
620    self.assertTrue(d.is_valid())
621
622    # Force count mismatch between header and actual entries.
623    old_icp_count = d.image_header.icp_count
624    d.image_header.icp_count = 1
625    self.assertFalse(d.is_valid())
626    d.image_header.icp_count = old_icp_count
627    self.assertTrue(d.is_valid())
628
629    # Force invalid ICP entry.
630    old_leaf_index = d.icp_entries[0].leaf_index
631    d.icp_entries[0].leaf_index = -10
632    self.assertFalse(d.is_valid())
633    d.icp_entries[0].leaf_index = old_leaf_index
634    self.assertTrue(d.is_valid())
635
636  def test_print_desc(self):
637    """Tests print_desc method."""
638    buf = io.StringIO()
639    self.test_aftl_desc.print_desc(buf)
640    desc = buf.getvalue()
641
642    # Cursory check whether the printed description contains something useful.
643    self.assertGreater(len(desc), 0)
644    self.assertIn('Log Root Descriptor:', desc)
645
646
647class AftlImageHeaderTest(AftltoolTestCase):
648  """Test suite for testing the AftlImageHeader descriptor."""
649
650  def setUp(self):
651    """Sets up the test bed for the unit tests."""
652    super(AftlImageHeaderTest, self).setUp()
653
654    self.test_header_valid = aftltool.AftlImageHeader()
655    self.test_header_valid.icp_count = 1
656
657    self.test_header_invalid = aftltool.AftlImageHeader()
658    self.test_header_invalid.icp_count = -34
659
660    self.test_header_bytes = (
661        b'AFTL'                                         # Magic.
662        + struct.pack('!L', avbtool.AVB_VERSION_MAJOR)  # Major version.
663        + struct.pack('!L', avbtool.AVB_VERSION_MINOR)  # Minor version.
664        + b'\x00\x00\x00\x12'                           # Image size.
665        b'\x00\x01')                                    # Number of ICP entries.
666
667  def test__init__(self):
668    """Tests constructor."""
669
670    # Calls constructor without data.
671    header = aftltool.AftlImageHeader()
672    self.assertEqual(header.magic, b'AFTL')
673    self.assertEqual(header.required_icp_version_major,
674                     avbtool.AVB_VERSION_MAJOR)
675    self.assertEqual(header.required_icp_version_minor,
676                     avbtool.AVB_VERSION_MINOR)
677    self.assertEqual(header.aftl_image_size, aftltool.AftlImageHeader.SIZE)
678    self.assertEqual(header.icp_count, 0)
679    self.assertTrue(header.is_valid())
680
681    # Calls constructor with data.
682    header = aftltool.AftlImageHeader(self.test_header_bytes)
683    self.assertEqual(header.magic, b'AFTL')
684    self.assertEqual(header.required_icp_version_major,
685                     avbtool.AVB_VERSION_MAJOR)
686    self.assertEqual(header.required_icp_version_minor,
687                     avbtool.AVB_VERSION_MINOR)
688    self.assertEqual(header.aftl_image_size, aftltool.AftlImageHeader.SIZE)
689    self.assertTrue(header.icp_count, 1)
690    self.assertTrue(header.is_valid())
691
692  def test_encode(self):
693    """Tests encode method."""
694    # Valid header.
695    header_bytes = self.test_header_valid.encode()
696    self.assertEqual(header_bytes, self.test_header_bytes)
697
698    # Invalid header
699    with self.assertRaises(aftltool.AftlError):
700      header_bytes = self.test_header_invalid.encode()
701
702  def test_is_valid(self):
703    """Tests is_valid method."""
704    # Valid default record.
705    header = aftltool.AftlImageHeader()
706    self.assertTrue(header.is_valid())
707
708    # Invalid magic.
709    header = aftltool.AftlImageHeader()
710    header.magic = b'YOLO'
711    self.assertFalse(header.is_valid())
712
713    # Valid ICP count.
714    self.assertTrue(self.test_header_valid.is_valid())
715
716    # Invalid ICP count.
717    self.assertFalse(self.test_header_invalid.is_valid())
718
719    header = aftltool.AftlImageHeader()
720    header.icp_count = 10000000
721    self.assertFalse(header.is_valid())
722
723    # Invalid ICP major version.
724    header = aftltool.AftlImageHeader()
725    header.required_icp_version_major = avbtool.AVB_VERSION_MAJOR + 1
726    self.assertFalse(header.is_valid())
727
728    # Invalid ICP minor version.
729    header = aftltool.AftlImageHeader()
730    header.required_icp_version_minor = avbtool.AVB_VERSION_MINOR + 1
731    self.assertFalse(header.is_valid())
732
733  def test_print_desc(self):
734    """Tests print_desc method."""
735    buf = io.StringIO()
736    self.test_header_valid.print_desc(buf)
737    desc = buf.getvalue()
738
739    # Cursory check whether the printed description contains something useful.
740    self.assertGreater(len(desc), 0)
741    self.assertIn('Major version:', desc)
742
743
744class AftlIcpEntryTest(AftltoolTestCase):
745  """Test suite for testing the AftlIcpEntry descriptor."""
746
747  def test__init__and_properties(self):
748    """Tests constructor and properties methods."""
749
750    # Calls constructor without data.
751    entry = aftltool.AftlIcpEntry()
752    self.assertEqual(entry.log_url_size, 0)
753    self.assertEqual(entry.leaf_index, 0)
754    self.assertEqual(entry.log_root_descriptor_size, 29)
755    self.assertEqual(entry.annotation_leaf_size, 19)
756    self.assertEqual(entry.log_root_sig_size, 0)
757    self.assertEqual(entry.proof_hash_count, 0)
758    self.assertEqual(entry.inc_proof_size, 0)
759    self.assertEqual(entry.log_url, '')
760    self.assertIsInstance(entry.log_root_descriptor,
761                          aftltool.TrillianLogRootDescriptor)
762    self.assertEqual(entry.proofs, [])
763    self.assertTrue(entry.is_valid())
764
765    # Calls constructor with data.
766    entry = aftltool.AftlIcpEntry(self.test_entry_1_bytes)
767    self.assertEqual(entry.log_url_size, len(self.test_tl_url_1))
768    self.assertEqual(entry.leaf_index, 1)
769    self.assertEqual(entry.annotation_leaf_size, 139)
770    self.assertEqual(entry.log_root_sig_size, 512)
771    self.assertEqual(entry.proof_hash_count, len(self.test_proof_hashes_1))
772    self.assertEqual(entry.inc_proof_size, 128)
773    self.assertEqual(entry.log_url, self.test_tl_url_1)
774    self.assertEqual(entry.proofs, self.test_proof_hashes_1)
775    self.assertTrue(entry.is_valid())
776
777  def test_encode(self):
778    """Tests encode method."""
779    entry_bytes = self.test_entry_1.encode()
780    self.assertEqual(entry_bytes, self.test_entry_1_bytes)
781
782  def test_get_expected_size(self):
783    """Tests get_expected_size method."""
784    # Default record.
785    entry = aftltool.AftlIcpEntry()
786    self.assertEqual(entry.get_expected_size(), 75)
787    self.assertEqual(entry.get_expected_size(), len(entry.encode()))
788
789    # Test record.
790    self.assertEqual(self.test_entry_1.get_expected_size(), 894)
791    self.assertEqual(self.test_entry_1.get_expected_size(),
792                     len(self.test_entry_1.encode()))
793
794  def test_is_valid(self):
795    """Tests is_valid method."""
796    # Valid default record.
797    entry = aftltool.AftlIcpEntry()
798    entry.leaf_index = 2
799    entry.log_url = self.test_tl_url_1
800    entry.set_log_root_descriptor = self.test_sth_1
801    entry.proofs = self.test_proof_hashes_1
802    self.assertTrue(entry.is_valid())
803
804    # Invalid leaf index.
805    entry = aftltool.AftlIcpEntry()
806    entry.leaf_index = -1
807    self.assertFalse(entry.is_valid())
808
809    # Invalid log_root_descriptor
810    entry = aftltool.AftlIcpEntry()
811    entry.log_root_descriptor = None
812    self.assertFalse(entry.is_valid())
813
814    entry.log_root_descriptor = b''
815    self.assertFalse(entry.is_valid())
816
817    entry.log_root_descriptor = b'blabli'
818    self.assertFalse(entry.is_valid())
819
820  def test_translate_response(self):
821    """Tests translate_response method."""
822    entry = aftltool.AftlIcpEntry()
823    entry.translate_response('aftl-test.foo.bar:80', self.test_avbm_resp)
824    self.assertEqual(entry.log_url, 'aftl-test.foo.bar:80')
825    self.assertEqual(entry.leaf_index, 9127)
826    self.assertEqual(entry.log_root_descriptor.encode(),
827                     self.test_avbm_resp.annotation_proof.sth.log_root)
828    self.assertEqual(
829        entry.log_root_signature,
830        self.test_avbm_resp.annotation_proof.sth.log_root_signature)
831    self.assertEqual(
832        entry.proofs,
833        self.test_avbm_resp.annotation_proof.proof.hashes)
834
835  def test_verify_icp(self):
836    """Tests verify_icp method."""
837    with tempfile.NamedTemporaryFile('wt+') as key_file:
838      key_file.write(self.test_aftl_pub_key)
839      key_file.flush()
840
841      # Valid ICP.
842      entry = aftltool.AftlIcpEntry()
843      entry.translate_response(self.test_tl_url_1, self.test_avbm_resp)
844      self.assertTrue(entry.verify_icp(key_file.name))
845
846      # Invalid ICP where annotation_leaf is not matching up with proofs.
847      # pylint: disable=protected-access
848      entry = aftltool.AftlIcpEntry()
849      entry.translate_response(self.test_tl_url_1, self.test_avbm_resp)
850      vbmeta_hash = entry.annotation_leaf.annotation.vbmeta_hash
851      vbmeta_hash = vbmeta_hash.replace(b"\x56\x23\x73\x11",
852                                        b"\x00\x00\x00\x00")
853      entry.annotation_leaf.annotation.vbmeta_hash = vbmeta_hash
854      self.assertFalse(entry.verify_icp(key_file))
855
856  def test_verify_vbmeta_image(self):
857    """Tests the verify_vbmeta_image method."""
858    # Valid vbmeta image without footer with 1 ICP.
859    tool = aftltool.Aftl()
860    image_path = self.get_testdata_path(
861        'aftl_output_vbmeta_with_1_icp.img')
862    vbmeta_image, _ = tool.get_vbmeta_image(image_path)
863    desc = tool.get_aftl_image(image_path)
864
865    # Checks that there is 1 ICP.
866    self.assertEqual(desc.image_header.icp_count, 1)
867    entry = desc.icp_entries[0]
868
869    # Valid vbmeta image checked with correct log key.
870    self.assertTrue(entry.verify_vbmeta_image(
871        vbmeta_image, self.get_testdata_path('aftl_pubkey_1.pem')))
872
873    # Valid vbmeta image checked with public key of another log.
874    self.assertFalse(entry.verify_vbmeta_image(
875        vbmeta_image, self.get_testdata_path('testkey_rsa4096_pub.pem')))
876
877    # Valid vbmeta image checked with invalid key.
878    self.assertFalse(entry.verify_vbmeta_image(
879        vbmeta_image, self.get_testdata_path('large_blob.bin')))
880
881    # Valid vbmeta image checked with no key.
882    self.assertFalse(entry.verify_vbmeta_image(vbmeta_image, None))
883
884  def test_verify_invalid_vbmeta_image(self):
885    """Tests the verify_vbmeta_image method."""
886    # Valid vbmeta image without footer with 1 ICP.
887    tool = aftltool.Aftl()
888    image_path = self.get_testdata_path(
889        'aftl_output_vbmeta_with_1_icp.img')
890    vbmeta_image, _ = tool.get_vbmeta_image(image_path)
891    desc = tool.get_aftl_image(image_path)
892
893    self.assertEqual(desc.image_header.icp_count, 1)
894    entry = desc.icp_entries[0]
895
896    # Modify vbmeta image to become invalid
897    vbmeta_image = b'A' * len(vbmeta_image)
898
899    # Invalid vbmeta image checked with correct log key.
900    self.assertFalse(entry.verify_vbmeta_image(
901        vbmeta_image, self.get_testdata_path('aftl_pubkey_1.pem')))
902
903    # Invalid vbmeta image checked with invalid key.
904    self.assertFalse(entry.verify_vbmeta_image(
905        vbmeta_image, self.get_testdata_path('large_blob.bin')))
906
907    # Valid vbmeta image checked with no key.
908    self.assertFalse(entry.verify_vbmeta_image(vbmeta_image, None))
909
910    # None image checked with a key.
911    self.assertFalse(entry.verify_vbmeta_image(
912        None, self.get_testdata_path('aftl_pubkey_1.pem')))
913
914  def test_print_desc(self):
915    """Tests print_desc method."""
916    buf = io.StringIO()
917    self.test_entry_1.print_desc(buf)
918    desc = buf.getvalue()
919
920    # Cursory check whether the printed description contains something useful.
921    self.assertGreater(len(desc), 0)
922    self.assertIn('ICP hashes:', desc)
923
924
925class TrillianLogRootDescriptorTest(AftltoolTestCase):
926  """Test suite for testing the TrillianLogRootDescriptor descriptor."""
927
928  def setUp(self):
929    """Sets up the test bed for the unit tests."""
930    super(TrillianLogRootDescriptorTest, self).setUp()
931
932    # Creates basic log root without metadata fields.
933    base_log_root = (
934        '0001'                              # version
935        '00000000000002e5'                  # tree_size
936        '20'                                # root_hash_size
937        '2d614759ad408a111a3351c0cb33c099'  # root_hash
938        '422c30a5c5104788a343332bde2b387b'
939        '15e1c97e3b4bd239'                  # timestamp
940        '00000000000002e4'                  # revision
941    )
942
943    # Create valid log roots with metadata fields w/ and w/o metadata.
944    self.test_log_root_bytes_wo_metadata = binascii.unhexlify(
945        base_log_root + '0000')
946    self.test_log_root_bytes_with_metadata = binascii.unhexlify(
947        base_log_root + '00023132')
948
949  def test__init__(self):
950    """Tests constructor."""
951    # Calls constructor without data.
952    d = aftltool.TrillianLogRootDescriptor()
953    self.assertTrue(d.is_valid())
954    self.assertEqual(d.version, 1)
955    self.assertEqual(d.tree_size, 0)
956    self.assertEqual(d.root_hash_size, 0)
957    self.assertEqual(d.root_hash, b'')
958    self.assertEqual(d.timestamp, 0)
959    self.assertEqual(d.revision, 0)
960    self.assertEqual(d.metadata_size, 0)
961    self.assertEqual(d.metadata, b'')
962
963    # Calls constructor with log_root w/o metadata
964    d = aftltool.TrillianLogRootDescriptor(self.test_log_root_bytes_wo_metadata)
965    self.assertTrue(d.is_valid())
966    self.assertEqual(d.version, 1)
967    self.assertEqual(d.tree_size, 741)
968    self.assertEqual(d.root_hash_size, 32)
969    self.assertEqual(d.root_hash,
970                     binascii.unhexlify('2d614759ad408a111a3351c0cb33c099'
971                                        '422c30a5c5104788a343332bde2b387b'))
972    self.assertEqual(d.timestamp, 1576762888554271289)
973    self.assertEqual(d.revision, 740)
974    self.assertEqual(d.metadata_size, 0)
975    self.assertEqual(d.metadata, b'')
976
977    # Calls constructor with log_root with metadata
978    d = aftltool.TrillianLogRootDescriptor(
979        self.test_log_root_bytes_with_metadata)
980    self.assertEqual(d.metadata_size, 2)
981    self.assertEqual(d.metadata, b'12')
982
983  def test_get_expected_size(self):
984    """Tests get_expected_size method."""
985    # Default constructor.
986    d = aftltool.TrillianLogRootDescriptor()
987    self.assertEqual(d.get_expected_size(), 11 + 18)
988
989    # Log root without metadata.
990    d = aftltool.TrillianLogRootDescriptor(self.test_log_root_bytes_wo_metadata)
991    self.assertEqual(d.get_expected_size(), 11 + 18 + 32)
992
993    # Log root with metadata.
994    d = aftltool.TrillianLogRootDescriptor(
995        self.test_log_root_bytes_with_metadata)
996    self.assertEqual(d.get_expected_size(), 11 + 18 + 32 + 2)
997
998  def test_encode(self):
999    """Tests encode method."""
1000    # Log root from default constructor.
1001    d = aftltool.TrillianLogRootDescriptor()
1002    expected_bytes = (
1003        '0001'                              # version
1004        '0000000000000000'                  # tree_size
1005        '00'                                # root_hash_size
1006        ''                                  # root_hash (empty)
1007        '0000000000000000'                  # timestamp
1008        '0000000000000000'                  # revision
1009        '0000'                              # metadata size
1010        ''                                  # metadata (empty)
1011    )
1012    self.assertEqual(d.encode(), binascii.unhexlify(expected_bytes))
1013
1014    # Log root without metadata.
1015    d = aftltool.TrillianLogRootDescriptor(self.test_log_root_bytes_wo_metadata)
1016    self.assertEqual(d.encode(), self.test_log_root_bytes_wo_metadata)
1017
1018    # Log root with metadata.
1019    d = aftltool.TrillianLogRootDescriptor(
1020        self.test_log_root_bytes_with_metadata)
1021    self.assertEqual(d.encode(), self.test_log_root_bytes_with_metadata)
1022
1023  def test_is_valid(self):
1024    """Tests is_valid method."""
1025    d = aftltool.TrillianLogRootDescriptor()
1026    self.assertTrue(d.is_valid())
1027
1028    # Invalid version.
1029    d = aftltool.TrillianLogRootDescriptor()
1030    d.version = 2
1031    self.assertFalse(d.is_valid())
1032
1033    # Invalid tree_size.
1034    d = aftltool.TrillianLogRootDescriptor()
1035    d.tree_size = -1
1036    self.assertFalse(d.is_valid())
1037
1038    # Invalid root_hash_size.
1039    d = aftltool.TrillianLogRootDescriptor()
1040    d.root_hash_size = -1
1041    self.assertFalse(d.is_valid())
1042    d.root_hash_size = 300
1043    self.assertFalse(d.is_valid())
1044
1045    # Invalid/valid root_hash_size / root_hash combination.
1046    d = aftltool.TrillianLogRootDescriptor()
1047    d.root_hash_size = 4
1048    d.root_hash = b'123'
1049    self.assertFalse(d.is_valid())
1050    d.root_hash = b'1234'
1051    self.assertTrue(d.is_valid())
1052
1053    # Invalid timestamp.
1054    d = aftltool.TrillianLogRootDescriptor()
1055    d.timestamp = -1
1056    self.assertFalse(d.is_valid())
1057
1058    # Invalid revision.
1059    d = aftltool.TrillianLogRootDescriptor()
1060    d.revision = -1
1061    self.assertFalse(d.is_valid())
1062
1063    # Invalid metadata_size.
1064    d = aftltool.TrillianLogRootDescriptor()
1065    d.metadata_size = -1
1066    self.assertFalse(d.is_valid())
1067    d.metadata_size = 70000
1068    self.assertFalse(d.is_valid())
1069
1070    # Invalid/valid metadata_size / metadata combination.
1071    d = aftltool.TrillianLogRootDescriptor()
1072    d.metadata_size = 4
1073    d.metadata = b'123'
1074    self.assertFalse(d.is_valid())
1075    d.metadata = b'1234'
1076    self.assertTrue(d.is_valid())
1077
1078  def test_print_desc(self):
1079    """Tests print_desc method."""
1080    # Log root without metadata
1081    buf = io.StringIO()
1082    d = aftltool.TrillianLogRootDescriptor(self.test_log_root_bytes_wo_metadata)
1083    d.print_desc(buf)
1084    desc = buf.getvalue()
1085
1086    # Cursory check whether the printed description contains something useful.
1087    self.assertGreater(len(desc), 0)
1088    self.assertIn('Version:', desc)
1089    self.assertNotIn('Metadata:', desc)
1090
1091    # Log root with metadata
1092    buf = io.StringIO()
1093    d = aftltool.TrillianLogRootDescriptor(
1094        self.test_log_root_bytes_with_metadata)
1095    d.print_desc(buf)
1096    desc = buf.getvalue()
1097
1098    # Cursory check whether the printed description contains something useful.
1099    self.assertGreater(len(desc), 0)
1100    self.assertIn('Version:', desc)
1101    self.assertIn('Metadata:', desc)
1102
1103
1104class SignedVBMetaPrimaryAnnotationLeafTest(AftltoolTestCase):
1105  """Test suite for testing the Leaf."""
1106
1107  def test__init__(self):
1108    """Tests constructor and properties methods."""
1109    # Calls constructor without data.
1110    leaf = aftltool.SignedVBMetaPrimaryAnnotationLeaf()
1111    self.assertEqual(leaf.version, 1)
1112    self.assertEqual(leaf.timestamp, 0)
1113    self.assertEqual(leaf.signature.signature, b'')
1114    self.assertEqual(leaf.annotation.vbmeta_hash, b'')
1115    self.assertEqual(leaf.annotation.description, '')
1116
1117  def test_parse(self):
1118    # Calls parse with valid data.
1119    leaf = aftltool.SignedVBMetaPrimaryAnnotationLeaf.parse(
1120        self.test_anno_1_bytes)
1121    self.assertEqual(leaf.annotation.vbmeta_hash, b'w'*32)
1122    self.assertEqual(leaf.annotation.version_incremental, 'x'*5)
1123    self.assertEqual(leaf.annotation.manufacturer_key_hash, b'y'*32)
1124    self.assertEqual(leaf.annotation.description, 'z'*51)
1125
1126    # Calls parse with invalid data.
1127    with self.assertRaises(aftltool.AftlError):
1128      leaf = aftltool.SignedVBMetaPrimaryAnnotationLeaf.parse(b'Invalid data')
1129
1130  def test_get_expected_size(self):
1131    """Tests get_expected_size method."""
1132    # Calls constructor without data.
1133    leaf = aftltool.SignedVBMetaPrimaryAnnotationLeaf()
1134    self.assertEqual(leaf.get_expected_size(), 19)
1135
1136    # Calls constructor with data.
1137    leaf = aftltool.SignedVBMetaPrimaryAnnotationLeaf.parse(
1138        self.test_anno_1_bytes)
1139    self.assertEqual(leaf.get_expected_size(),
1140                     len(self.test_anno_1_bytes))
1141
1142  def test_encode(self):
1143    """Tests encode method."""
1144    # Calls constructor with data.
1145    self.assertEqual(self.test_anno_1.encode(),
1146                     self.test_anno_1_bytes)
1147
1148  def test_print_desc(self):
1149    """Tests print_desc method."""
1150    buf = io.StringIO()
1151    self.test_anno_1.print_desc(buf)
1152    desc = buf.getvalue()
1153
1154    # Cursory check whether the printed description contains something useful.
1155    self.assertGreater(len(desc), 0)
1156    self.assertIn('VBMeta hash:', desc)
1157
1158
1159class AftlMockCommunication(aftltool.AftlCommunication):
1160  """Testing Mock implementation of AftlCommunication."""
1161
1162  def __init__(self, transparency_log_config, canned_response):
1163    """Initializes the object.
1164
1165    Arguments:
1166      transparency_log_config: An aftltool.TransparencyLogConfig instance.
1167      canned_response: AddVBMetaResponse to return or the Exception to
1168        raise.
1169    """
1170    super(AftlMockCommunication, self).__init__(transparency_log_config,
1171                                                timeout=None)
1172    self.request = None
1173    self.canned_response = canned_response
1174
1175  def add_vbmeta(self, request):
1176    """Records the request and returns the canned response."""
1177    self.request = request
1178
1179    if isinstance(self.canned_response, aftltool.AftlError):
1180      raise self.canned_response
1181    return self.canned_response
1182
1183
1184class AftlMock(aftltool.Aftl):
1185  """Mock for aftltool.Aftl to mock the communication piece."""
1186
1187  def __init__(self, canned_response):
1188    """Initializes the object.
1189
1190    Arguments:
1191      canned_response: AddVBMetaResponse to return or the Exception to
1192        raise.
1193    """
1194    self.mock_canned_response = canned_response
1195
1196  def request_inclusion_proof(self, transparency_log_config, vbmeta_image,
1197                              version_inc, manufacturer_key_path,
1198                              signing_helper, signing_helper_with_files,
1199                              timeout, aftl_comms=None):
1200    """Mocked request_inclusion_proof function."""
1201    aftl_comms = AftlMockCommunication(transparency_log_config,
1202                                       self.mock_canned_response)
1203    return super(AftlMock, self).request_inclusion_proof(
1204        transparency_log_config, vbmeta_image, version_inc,
1205        manufacturer_key_path, signing_helper, signing_helper_with_files,
1206        timeout, aftl_comms=aftl_comms)
1207
1208
1209class AftlTestCase(AftltoolTestCase):
1210
1211  def setUp(self):
1212    """Sets up the test bed for the unit tests."""
1213    super(AftlTestCase, self).setUp()
1214
1215    # Sets up the member variables which are then configured by
1216    # set_up_environment() in the subclasses.
1217    self.aftl_host = None
1218    self.aftl_pubkey = None
1219    self.aftl_apikey = None
1220    self.vbmeta_image = None
1221    self.manufacturer_key = None
1222    self.set_up_environment()
1223
1224    self.transparency_log_config = aftltool.TransparencyLogConfig(
1225        self.aftl_host, self.aftl_pubkey, self.aftl_apikey)
1226
1227    self.make_icp_default_params = {
1228        'vbmeta_image_path': self.vbmeta_image,
1229        'output': None,
1230        'signing_helper': None,
1231        'signing_helper_with_files': None,
1232        'version_incremental': '1',
1233        'transparency_log_configs': [self.transparency_log_config],
1234        'manufacturer_key': self.manufacturer_key,
1235        'padding_size': 0,
1236        'timeout': None
1237    }
1238
1239    self.info_icp_default_params = {
1240        'vbmeta_image_path': None,
1241        'output': io.StringIO()
1242    }
1243
1244    self.verify_icp_default_params = {
1245        'vbmeta_image_path': None,
1246        'transparency_log_pub_keys': [self.aftl_pubkey],
1247        'output': io.StringIO()
1248    }
1249
1250    self.load_test_aftl_default_params = {
1251        'vbmeta_image_path': self.vbmeta_image,
1252        'output': io.StringIO(),
1253        'transparency_log_config': self.transparency_log_config,
1254        'manufacturer_key': self.manufacturer_key,
1255        'process_count': 1,
1256        'submission_count': 1,
1257        'stats_filename': None,
1258        'preserve_icp_images': False,
1259        'timeout': None
1260    }
1261
1262  def set_up_environment(self):
1263    """Sets up member variables for the particular test environment.
1264
1265    This allows to have different settings and mocking for unit tests and
1266    integration tests.
1267    """
1268    raise NotImplementedError('set_up_environment() needs to be implemented '
1269                              'by subclass.')
1270
1271  def get_aftl_implementation(self, canned_response):
1272    """Gets the aftltool.Aftl implementation used for testing.
1273
1274    This allows to have different Aftl implementations for unit tests and
1275    integration tests.
1276
1277    Arguments:
1278      canned_response: Since we are using the actual implementation and not a
1279      mock this gets ignored.
1280
1281    Raises:
1282      NotImplementedError if subclass is not implementing the method.
1283    """
1284    raise NotImplementedError('get_aftl_implementation() needs to be'
1285                              'implemented by subclass.')
1286
1287
1288class AftlTest(AftlTestCase):
1289
1290  def set_up_environment(self):
1291    """Sets up the environment for unit testing without networking."""
1292    self.aftl_host = 'test.foo.bar:9000'
1293    self.aftl_pubkey = self.get_testdata_path('aftl_pubkey_1.pem')
1294    self.vbmeta_image = self.get_testdata_path('aftl_input_vbmeta.img')
1295    self.manufacturer_key = self.get_testdata_path('testkey_rsa4096.pem')
1296
1297  def get_aftl_implementation(self, canned_response):
1298    """Retrieves the AftlMock for unit testing without networking."""
1299    return AftlMock(canned_response)
1300
1301  def test_get_vbmeta_image(self):
1302    """Tests the get_vbmeta_image method."""
1303    tool = aftltool.Aftl()
1304
1305    # Valid vbmeta image without footer and AftlImage.
1306    image, footer = tool.get_vbmeta_image(
1307        self.get_testdata_path('aftl_input_vbmeta.img'))
1308    self.assertIsNotNone(image)
1309    self.assertEqual(len(image), 4352)
1310    self.assertIsNone(footer)
1311
1312    # Valid vbmeta image without footer but with AftlImage.
1313    image, footer = tool.get_vbmeta_image(
1314        self.get_testdata_path('aftl_output_vbmeta_with_1_icp.img'))
1315    self.assertIsNotNone(image)
1316    self.assertEqual(len(image), 4352)
1317    self.assertIsNone(footer)
1318
1319    # Invalid vbmeta image.
1320    image, footer = tool.get_vbmeta_image(
1321        self.get_testdata_path('large_blob.bin'))
1322    self.assertIsNone(image)
1323    self.assertIsNone(footer)
1324
1325    # Invalid file path.
1326    image, footer = tool.get_vbmeta_image(
1327        self.get_testdata_path('blabli_not_existing_file'))
1328    self.assertIsNone(image)
1329    self.assertIsNone(footer)
1330
1331  def test_get_aftl_image(self):
1332    """Tests the get_aftl_image method."""
1333    tool = aftltool.Aftl()
1334
1335    # Valid vbmeta image without footer with AftlImage.
1336    desc = tool.get_aftl_image(
1337        self.get_testdata_path('aftl_output_vbmeta_with_1_icp.img'))
1338    self.assertIsInstance(desc, aftltool.AftlImage)
1339
1340    # Valid vbmeta image without footer and AftlImage.
1341    desc = tool.get_aftl_image(
1342        self.get_testdata_path('aftl_input_vbmeta.img'))
1343    self.assertIsNone(desc)
1344
1345    # Invalid vbmeta image.
1346    desc = tool.get_aftl_image(self.get_testdata_path('large_blob.bin'))
1347    self.assertIsNone(desc)
1348
1349  # pylint: disable=no-member
1350  def test_request_inclusion_proof(self):
1351    """Tests the request_inclusion_proof method."""
1352    # Always work with a mock independent if run as unit or integration tests.
1353    aftl = AftlMock(self.test_avbm_resp)
1354
1355    icp = aftl.request_inclusion_proof(
1356        self.transparency_log_config, b'a' * 1024, '1',
1357        self.get_testdata_path('testkey_rsa4096.pem'), None, None, None)
1358    self.assertEqual(icp.leaf_index,
1359                     self.test_avbm_resp.annotation_proof.proof.leaf_index)
1360    self.assertEqual(icp.proof_hash_count,
1361                     len(self.test_avbm_resp.annotation_proof.proof.hashes))
1362    self.assertEqual(icp.log_url, self.aftl_host)
1363    self.assertEqual(
1364        icp.log_root_descriptor.root_hash, binascii.unhexlify(
1365            '9a5f71340f8dc98bdc6320f976dda5f34db8554cb273ba5ab60f1697c519d6f6'))
1366
1367    self.assertEqual(icp.annotation_leaf.annotation.version_incremental,
1368                     'only_for_testing')
1369    # To calculate the hash of the a RSA key use the following command:
1370    # openssl rsa -in test/data/testkey_rsa4096.pem -pubout \
1371    #    -outform DER | sha256sum
1372    self.assertEqual(
1373        icp.annotation_leaf.annotation.manufacturer_key_hash,
1374        bytes.fromhex(
1375            "83ab3b109b73a1d32dce4153a2de57a1a0485052db8364f3180d98614749d7f7"))
1376
1377    self.assertEqual(
1378        icp.log_root_signature,
1379        self.test_avbm_resp.annotation_proof.sth.log_root_signature)
1380    self.assertEqual(
1381        icp.proofs,
1382        self.test_avbm_resp.annotation_proof.proof.hashes)
1383
1384  # pylint: disable=no-member
1385  def test_request_inclusion_proof_failure(self):
1386    """Tests the request_inclusion_proof method in case of a comms problem."""
1387    # Always work with a mock independent if run as unit or integration tests.
1388    aftl = AftlMock(aftltool.AftlError('Comms error'))
1389
1390    with self.assertRaises(aftltool.AftlError):
1391      aftl.request_inclusion_proof(
1392          self.transparency_log_config, b'a' * 1024, 'version_inc',
1393          self.get_testdata_path('testkey_rsa4096.pem'), None, None, None)
1394
1395  def test_request_inclusion_proof_manuf_key_not_4096(self):
1396    """Tests request_inclusion_proof with manufacturing key not of size 4096."""
1397    # Always work with a mock independent if run as unit or integration tests.
1398    aftl = AftlMock(self.test_avbm_resp)
1399    with self.assertRaises(aftltool.AftlError) as e:
1400      aftl.request_inclusion_proof(
1401          self.transparency_log_config, b'a' * 1024, 'version_inc',
1402          self.get_testdata_path('testkey_rsa2048.pem'), None, None, None)
1403    self.assertIn('not of size 4096: 2048', str(e.exception))
1404
1405  def test_make_and_verify_icp_with_1_log(self):
1406    """Tests make_icp_from_vbmeta, verify_image_icp & info_image_icp."""
1407    aftl = self.get_aftl_implementation(self.test_avbm_resp)
1408
1409    # Make a VBmeta image with ICP.
1410    with tempfile.NamedTemporaryFile('wb+') as output_file:
1411      self.make_icp_default_params['output'] = output_file
1412      result = aftl.make_icp_from_vbmeta(**self.make_icp_default_params)
1413      output_file.flush()
1414      self.assertTrue(result)
1415
1416      # Checks that there is 1 ICP.
1417      aftl_image = aftl.get_aftl_image(output_file.name)
1418      self.assertEqual(aftl_image.image_header.icp_count, 1)
1419
1420      # Verifies the generated image.
1421      self.verify_icp_default_params['vbmeta_image_path'] = output_file.name
1422      result = aftl.verify_image_icp(**self.verify_icp_default_params)
1423      self.assertTrue(result)
1424
1425      # Prints the image details.
1426      self.info_icp_default_params['vbmeta_image_path'] = output_file.name
1427      result = aftl.info_image_icp(**self.info_icp_default_params)
1428      self.assertTrue(result)
1429
1430  def test_make_and_verify_icp_with_2_logs(self):
1431    """Tests make_icp_from_vbmeta, verify_image_icp & info_image_icp."""
1432    aftl = self.get_aftl_implementation(self.test_avbm_resp)
1433
1434    # Reconfigures default parameters with two transparency logs.
1435    self.make_icp_default_params['transparency_log_configs'] = [
1436        self.transparency_log_config, self.transparency_log_config]
1437
1438    # Make a VBmeta image with ICP.
1439    with tempfile.NamedTemporaryFile('wb+') as output_file:
1440      self.make_icp_default_params['output'] = output_file
1441      result = aftl.make_icp_from_vbmeta(
1442          **self.make_icp_default_params)
1443      output_file.flush()
1444      self.assertTrue(result)
1445
1446      # Checks that there are 2 ICPs.
1447      aftl_image = aftl.get_aftl_image(output_file.name)
1448      self.assertEqual(aftl_image.image_header.icp_count, 2)
1449
1450      # Verifies the generated image.
1451      self.verify_icp_default_params['vbmeta_image_path'] = output_file.name
1452      result = aftl.verify_image_icp(**self.verify_icp_default_params)
1453      self.assertTrue(result)
1454
1455      # Prints the image details.
1456      self.info_icp_default_params['vbmeta_image_path'] = output_file.name
1457      result = aftl.info_image_icp(**self.info_icp_default_params)
1458      self.assertTrue(result)
1459
1460  def test_info_image_icp(self):
1461    """Tests info_image_icp with vbmeta image with 2 ICP."""
1462    # Always work with a mock independent if run as unit or integration tests.
1463    aftl = AftlMock(self.test_avbm_resp)
1464
1465    image_path = self.get_testdata_path(
1466        'aftl_output_vbmeta_with_2_icp_same_log.img')
1467    self.info_icp_default_params['vbmeta_image_path'] = image_path
1468
1469    # Verifies the generated image.
1470    result = aftl.info_image_icp(**self.info_icp_default_params)
1471    self.assertTrue(result)
1472
1473  def test_info_image_icp_fail(self):
1474    """Tests info_image_icp with invalid vbmeta image."""
1475    # Always work with a mock independent if run as unit or integration tests.
1476    aftl = AftlMock(self.test_avbm_resp)
1477
1478    image_path = self.get_testdata_path('large_blob.bin')
1479    self.info_icp_default_params['vbmeta_image_path'] = image_path
1480
1481    # Verifies the generated image.
1482    result = aftl.info_image_icp(**self.info_icp_default_params)
1483    self.assertFalse(result)
1484
1485  def test_verify_image_icp(self):
1486    """Tets verify_image_icp with 2 ICP with all matching log keys."""
1487    # Always work with a mock independent if run as unit or integration tests.
1488    aftl = AftlMock(self.test_avbm_resp)
1489
1490    image_path = self.get_testdata_path(
1491        'aftl_output_vbmeta_with_2_icp_same_log.img')
1492    self.verify_icp_default_params['vbmeta_image_path'] = image_path
1493    self.verify_icp_default_params['transparency_log_pub_keys'] = [
1494        self.get_testdata_path('aftl_pubkey_1.pem'),
1495    ]
1496
1497    result = aftl.verify_image_icp(**self.verify_icp_default_params)
1498    self.assertTrue(result)
1499
1500  def test_make_icp_with_invalid_grpc_service(self):
1501    """Tests make_icp_from_vbmeta command with a host not supporting GRPC."""
1502    aftl = self.get_aftl_implementation(aftltool.AftlError('Comms error'))
1503    self.make_icp_default_params[
1504        'transparency_log_configs'][0].target = 'www.google.com:80'
1505    with tempfile.NamedTemporaryFile('wb+') as output_file:
1506      self.make_icp_default_params['output'] = output_file
1507      result = aftl.make_icp_from_vbmeta(
1508          **self.make_icp_default_params)
1509      self.assertFalse(result)
1510
1511  def test_make_icp_grpc_timeout(self):
1512    """Tests make_icp_from_vbmeta command when running into GRPC timeout."""
1513    aftl = self.get_aftl_implementation(aftltool.AftlError('Comms error'))
1514
1515    # The timeout is set to 1 second which is way below the minimum processing
1516    # time of the transparency log per load test results in b/139407814#2 where
1517    # it was 3.43 seconds.
1518    self.make_icp_default_params['timeout'] = 1
1519    with tempfile.NamedTemporaryFile('wb+') as output_file:
1520      self.make_icp_default_params['output'] = output_file
1521      result = aftl.make_icp_from_vbmeta(
1522          **self.make_icp_default_params)
1523      self.assertFalse(result)
1524
1525  def test_load_test_single_process_single_submission(self):
1526    """Tests load_test_aftl command with 1 process which does 1 submission."""
1527    aftl = self.get_aftl_implementation(self.test_avbm_resp)
1528
1529    with tempfile.TemporaryDirectory() as tmp_dir:
1530      self.load_test_aftl_default_params[
1531          'stats_filename'] = os.path.join(tmp_dir, 'load_test.csv')
1532      result = aftl.load_test_aftl(**self.load_test_aftl_default_params)
1533      self.assertTrue(result)
1534
1535      output = self.load_test_aftl_default_params['output'].getvalue()
1536      self.assertRegex(output, 'Succeeded:.+?1\n')
1537      self.assertRegex(output, 'Failed:.+?0\n')
1538
1539      self.assertTrue(os.path.exists(
1540          self.load_test_aftl_default_params['stats_filename']))
1541
1542  def test_load_test_multi_process_multi_submission(self):
1543    """Tests load_test_aftl command with 2 processes and 2 submissions each."""
1544    aftl = self.get_aftl_implementation(self.test_avbm_resp)
1545
1546    self.load_test_aftl_default_params['process_count'] = 2
1547    self.load_test_aftl_default_params['submission_count'] = 2
1548    with tempfile.TemporaryDirectory() as tmp_dir:
1549      self.load_test_aftl_default_params[
1550          'stats_filename'] = os.path.join(tmp_dir, 'load_test.csv')
1551      result = aftl.load_test_aftl(**self.load_test_aftl_default_params)
1552      self.assertTrue(result)
1553
1554      output = self.load_test_aftl_default_params['output'].getvalue()
1555      self.assertRegex(output, 'Succeeded:.+?4\n')
1556      self.assertRegex(output, 'Failed:.+?0\n')
1557
1558      self.assertTrue(os.path.exists(
1559          self.load_test_aftl_default_params['stats_filename']))
1560
1561  def test_load_test_invalid_grpc_service(self):
1562    """Tests load_test_aftl command with a host that does not support GRPC."""
1563    aftl = self.get_aftl_implementation(aftltool.AftlError('Comms error'))
1564
1565    self.load_test_aftl_default_params[
1566        'transparency_log_config'].target = 'www.google.com:80'
1567    result = aftl.load_test_aftl(**self.load_test_aftl_default_params)
1568    self.assertFalse(result)
1569
1570    output = self.load_test_aftl_default_params['output'].getvalue()
1571    self.assertRegex(output, 'Succeeded:.+?0\n')
1572    self.assertRegex(output, 'Failed:.+?1\n')
1573
1574  def test_load_test_grpc_timeout(self):
1575    """Tests load_test_aftl command when running into timeout."""
1576    aftl = self.get_aftl_implementation(aftltool.AftlError('Comms error'))
1577
1578    self.load_test_aftl_default_params['timeout'] = 1
1579    result = aftl.load_test_aftl(**self.load_test_aftl_default_params)
1580    self.assertFalse(result)
1581
1582    output = self.load_test_aftl_default_params['output'].getvalue()
1583    self.assertRegex(output, 'Succeeded:.+?0\n')
1584    self.assertRegex(output, 'Failed:.+?1\n')
1585
1586
1587class TransparencyLogConfigTestCase(unittest.TestCase):
1588
1589  def test_from_argument(self):
1590    log = aftltool.TransparencyLogConfig.from_argument(
1591        "example.com:8080,mykey.pub")
1592    self.assertEqual(log.target, "example.com:8080")
1593    self.assertEqual(log.pub_key, "mykey.pub")
1594
1595    with self.assertRaises(argparse.ArgumentTypeError):
1596      aftltool.TransparencyLogConfig.from_argument("example.com:8080,")
1597
1598    with self.assertRaises(argparse.ArgumentTypeError):
1599      aftltool.TransparencyLogConfig.from_argument(",")
1600
1601  def test_from_argument_with_api_key(self):
1602    log = aftltool.TransparencyLogConfig.from_argument(
1603        "example.com:8080,mykey.pub,Aipl29gj3x9")
1604    self.assertEqual(log.target, "example.com:8080")
1605    self.assertEqual(log.pub_key, "mykey.pub")
1606    self.assertEqual(log.api_key, "Aipl29gj3x9")
1607
1608if __name__ == '__main__':
1609  unittest.main(verbosity=2)
1610