1#!/usr/bin/env python3 2 3# Copyright 2019, The Android Open Source Project 4# 5# Permission is hereby granted, free of charge, to any person 6# obtaining a copy of this software and associated documentation 7# files (the "Software"), to deal in the Software without 8# restriction, including without limitation the rights to use, copy, 9# modify, merge, publish, distribute, sublicense, and/or sell copies 10# of the Software, and to permit persons to whom the Software is 11# furnished to do so, subject to the following conditions: 12# 13# The above copyright notice and this permission notice shall be 14# included in all copies or substantial portions of the Software. 15# 16# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 17# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 18# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 19# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 20# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 21# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 22# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 23# SOFTWARE. 24# 25"""Unit tests for aftltool.""" 26 27import argparse 28import binascii 29import io 30import os 31import struct 32import sys 33import tempfile 34import unittest 35 36import aftltool 37import avbtool 38 39# pylint: disable=import-error 40import api_pb2 41# pylint: enable=import-error 42 43 44# Workaround for b/149307145 in order to pick up the test data from the right 45# location independent where the script is called from. 46# TODO(b/149307145): Remove workaround once the referenced bug is fixed. 47TEST_EXEC_PATH = os.path.dirname(os.path.realpath(__file__)) 48 49class TlsDataTest(unittest.TestCase): 50 51 def test_decode(self): 52 data = io.BytesIO(b'\x01\x02') 53 value = aftltool.tls_decode_bytes('B', data) 54 self.assertEqual(value, b'\x02') 55 self.assertEqual(data.read(), b'') 56 57 data = io.BytesIO(b'\x00\x01\x03\xff') 58 value = aftltool.tls_decode_bytes('H', data) 59 self.assertEqual(value, b'\x03') 60 self.assertEqual(data.read(), b'\xff') 61 62 data = io.BytesIO(b'\x00\x00\x00\x02\x04\x05\xff\xff') 63 value = aftltool.tls_decode_bytes('L', data) 64 self.assertEqual(value, b'\x04\x05') 65 self.assertEqual(data.read(), b'\xff\xff') 66 67 def test_decode_invalid(self): 68 # Insufficient data for reading the size. 69 with self.assertRaises(aftltool.AftlError): 70 aftltool.tls_decode_bytes('B', io.BytesIO(b'')) 71 72 # Invalid byte_size character. 73 with self.assertRaises(aftltool.AftlError): 74 aftltool.tls_decode_bytes('/o/', io.BytesIO(b'\x01\x02\xff')) 75 76 # Insufficient data for reading the value. 77 with self.assertRaises(aftltool.AftlError): 78 aftltool.tls_decode_bytes('B', io.BytesIO(b'\x01')) 79 80 def test_encode(self): 81 stream = io.BytesIO() 82 aftltool.tls_encode_bytes('B', b'\x01\x02\x03\x04', stream) 83 self.assertEqual(stream.getvalue(), b'\x04\x01\x02\x03\x04') 84 85 stream = io.BytesIO() 86 aftltool.tls_encode_bytes('H', b'\x01\x02\x03\x04', stream) 87 self.assertEqual(stream.getvalue(), b'\x00\x04\x01\x02\x03\x04') 88 89 def test_encode_invalid(self): 90 # Byte size is not large enough to encode the value. 91 stream = io.BytesIO() 92 with self.assertRaises(aftltool.AftlError): 93 aftltool.tls_encode_bytes('B', b'\x01'*256, stream) 94 95 # Invalid byte_size character. 96 stream = io.BytesIO() 97 with self.assertRaises(aftltool.AftlError): 98 aftltool.tls_encode_bytes('/o/', b'\x01\x02', stream) 99 100 101class VBMetaPrimaryAnnotationTest(unittest.TestCase): 102 103 def test_decode(self): 104 stream = io.BytesIO(b'\x00\x00\x00\x00\x00') 105 anno = aftltool.VBMetaPrimaryAnnotation.parse(stream) 106 self.assertEqual(anno.vbmeta_hash, b'') 107 self.assertEqual(anno.version_incremental, '') 108 self.assertEqual(anno.manufacturer_key_hash, b'') 109 self.assertEqual(anno.description, '') 110 111 def test_encode(self): 112 stream = io.BytesIO() 113 anno = aftltool.VBMetaPrimaryAnnotation() 114 anno.encode(stream) 115 self.assertEqual(stream.getvalue(), b'\x00\x00\x00\x00\x00') 116 117 def test_encode_invalid(self): 118 stream = io.BytesIO() 119 anno = aftltool.VBMetaPrimaryAnnotation() 120 # Version incremental should be ASCII only. 121 anno.version_incremental = '☃' 122 with self.assertRaises(aftltool.AftlError): 123 anno.encode(stream) 124 125 126class SignedVBMetaAnnotationLeafTest(unittest.TestCase): 127 128 def test_encode(self): 129 leaf = aftltool.SignedVBMetaPrimaryAnnotationLeaf() 130 self.assertEqual(leaf.encode(), 131 b'\x01' # Version 132 b'\x00\x00\x00\x00\x00\x00\x00\x00' # Timestamp 133 b'\x01' + # Leaf Type 134 b'\x00' * 4 + # Empty Signature 135 b'\x00' * 5) # Empty Annotation 136 137 def test_encode_invalid_type(self): 138 # The version field must be a 1-byte integer. 139 leaf = aftltool.SignedVBMetaPrimaryAnnotationLeaf() 140 leaf.version = 'x' 141 with self.assertRaises(aftltool.AftlError): 142 leaf.encode() 143 144 def test_encode_invalid_size(self): 145 leaf = aftltool.SignedVBMetaPrimaryAnnotationLeaf() 146 leaf.version = 256 147 with self.assertRaises(aftltool.AftlError): 148 leaf.encode() 149 150 151class AftltoolTestCase(unittest.TestCase): 152 153 def setUp(self): 154 """Sets up the test bed for the unit tests.""" 155 super(AftltoolTestCase, self).setUp() 156 157 # Redirects the stderr to /dev/null when running the unittests. The reason 158 # is that soong interprets any output on stderr as an error and marks the 159 # unit test as failed although the test itself succeeded. 160 self.stderr = sys.stderr 161 self.null = open(os.devnull, 'wt') 162 sys.stderr = self.null 163 164 # AFTL public key. 165 self.test_aftl_pub_key = ( 166 '-----BEGIN PUBLIC KEY-----\n' 167 'MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEA4ilqCNsenNA013iCdwgD\n' 168 'YPxZ853nbHG9lMBp9boXiwRcqT/8bUKHIL7YX5z7s+QoRYVY3rkMKppRabclXzyx\n' 169 'H59YnPMaU4uv7NqwWzjgaZo7E+vo7IF+KBjV3cJulId5Av0yIYUCsrwd7MpGtWdC\n' 170 'Q3S+7Vd4zwzCKEhcvliNIhnNlp1U3wNkPCxOyCAsMEn6k8O5ar12ke5TvxDv15db\n' 171 'rPDeHh8G2OYWoCkWL+lSN35L2kOJqKqVbLKWrrOd96RCYrrtbPCi580OADJRcUlG\n' 172 'lgcjwmNwmypBWvQMZ6ITj0P0ksHnl1zZz1DE2rXe1goLI1doghb5KxLaezlR8c2C\n' 173 'E3w/uo9KJgNmNgUVzzqZZ6FE0moyIDNOpP7KtZAL0DvEZj6jqLbB0ccPQElrg52m\n' 174 'Dv2/A3nYSr0mYBKeskT4+Bg7PGgoC8p7WyLSxMyzJEDYdtrj9OFx6eZaA23oqTQx\n' 175 'k3Qq5H8RfNBeeSUEeKF7pKH/7gyqZ2bNzBFMA2EBZgBozwRfaeN/HCv3qbaCnwvu\n' 176 '6caacmAsK+RxiYxSL1QsJqyhCWWGxVyenmxdc1KG/u5ypi7OIioztyzR3t2tAzD3\n' 177 'Nb+2t8lgHBRxbV24yiPlnvPmB1ZYEctXnlRR9Evpl1o9xA9NnybPHKr9rozN39CZ\n' 178 'V/USB8K6ao1y5xPZxa8CZksCAwEAAQ==\n' 179 '-----END PUBLIC KEY-----\n') 180 181 # Test AftlIcpEntry #1 182 self.test_tl_url_1 = 'aftl-test-server.google.com' 183 184 self.test_sth_1 = aftltool.TrillianLogRootDescriptor() 185 self.test_sth_1.tree_size = 2 186 self.test_sth_1.root_hash_size = 32 187 self.test_sth_1.root_hash = b'f' * 32 188 self.test_sth_1.timestamp = 0x1234567890ABCDEF 189 self.test_sth_1.revision = 0xFEDCBA0987654321 190 191 self.test_sth_1_bytes = ( 192 b'\x00\x01' # version 193 b'\x00\x00\x00\x00\x00\x00\x00\x02' # tree_size 194 b'\x20' # root_hash_size 195 + b'f' * 32 + # root_hash 196 b'\x12\x34\x56\x78\x90\xAB\xCD\xEF' # timestamp 197 b'\xFE\xDC\xBA\x09\x87\x65\x43\x21' # revision 198 b'\x00\x00' # metadata_size 199 b'' # metadata (empty) 200 ) 201 202 # Test Annotation #1 203 anno_1 = aftltool.VBMetaPrimaryAnnotation(vbmeta_hash=b'w'*32, 204 version_incremental='x'*5, 205 manufacturer_key_hash=b'y'*32, 206 description='z'*51) 207 signed_anno_1 = aftltool.SignedVBMetaPrimaryAnnotation(annotation=anno_1) 208 209 self.test_anno_1 = aftltool.SignedVBMetaPrimaryAnnotationLeaf( 210 signed_vbmeta_primary_annotation=signed_anno_1) 211 self.test_anno_1_bytes = ( 212 b'\x01' # version 213 b'\x00\x00\x00\x00\x00\x00\x00\x00' # timestamp 214 b'\x01' # leaf_type 215 b'\x00' # hash_algorithm 216 b'\x00' # signature_algorithm 217 + b'\x00\x00' # signature 218 + b'\x20' + b'w' * 32 # vbmeta_hash 219 + b'\x05' + b'x' * 5 # version_incremental 220 + b'\x20' + b'y' * 32 # manufacturer_key_hash 221 + b'\x00\x33' + b'z' * 51 # description 222 ) 223 224 # Fill each structure with an easily observable pattern for easy validation. 225 self.test_proof_hashes_1 = [] 226 self.test_proof_hashes_1.append(b'b' * 32) 227 self.test_proof_hashes_1.append(b'c' * 32) 228 self.test_proof_hashes_1.append(b'd' * 32) 229 self.test_proof_hashes_1.append(b'e' * 32) 230 231 # Valid test AftlIcpEntry #1. 232 self.test_entry_1 = aftltool.AftlIcpEntry() 233 self.test_entry_1.log_url = self.test_tl_url_1 234 self.test_entry_1.leaf_index = 1 235 self.test_entry_1.annotation_leaf = self.test_anno_1 236 self.test_entry_1.log_root_descriptor = self.test_sth_1 237 self.test_entry_1.proofs = self.test_proof_hashes_1 238 self.test_entry_1.log_root_signature = b'g' * 512 239 240 self.test_entry_1_bytes = ( 241 b'\x00\x00\x00\x1b' # Transparency log url size. 242 b'\x00\x00\x00\x00\x00\x00\x00\x01' # Leaf index. 243 b'\x00\x00\x00\x3d' # Log root descriptor size. 244 b'\x00\x00\x00\x8b' # Annotation leaf size. 245 b'\x02\x00' # Log root signature size. 246 b'\x04' # Number of hashes in ICP. 247 b'\x00\x00\x00\x80' # Size of ICP in bytes. 248 + self.test_tl_url_1.encode('ascii') # Transparency log url. 249 + self.test_sth_1_bytes 250 + self.test_anno_1_bytes 251 + b'g' * 512 # Log root signature. 252 + b'b' * 32 # Hashes... 253 + b'c' * 32 254 + b'd' * 32 255 + b'e' * 32) 256 257 # Valid test AftlIcpEntry #2. 258 self.test_tl_url_2 = 'aftl-test-server.google.ch' 259 260 self.test_sth_2 = aftltool.TrillianLogRootDescriptor() 261 self.test_sth_2.tree_size = 4 262 self.test_sth_2.root_hash_size = 32 263 self.test_sth_2.root_hash = b'e' * 32 264 self.test_sth_2.timestamp = 6 265 self.test_sth_2.revision = 7 266 self.test_sth_2.metadata_size = 2 267 self.test_sth_2.metadata = b'12' 268 269 self.test_sth_2_bytes = ( 270 b'\x00\x01' # version 271 b'\x00\x00\x00\x00\x00\x00\x00\x04' # tree_size 272 b'\x20' # root_hash_size 273 + b'e' * 32 + # root_hash 274 b'\x00\x00\x00\x00\x00\x00\x00\x06' # timestamp 275 b'\x00\x00\x00\x00\x00\x00\x00\x07' # revision 276 b'\x00\x02' # metadata_size 277 b'12' # metadata 278 ) 279 280 # Fill each structure with an easily observable pattern for easy validation. 281 self.test_proof_hashes_2 = [] 282 self.test_proof_hashes_2.append(b'g' * 32) 283 self.test_proof_hashes_2.append(b'h' * 32) 284 285 self.test_entry_2 = aftltool.AftlIcpEntry() 286 self.test_entry_2.log_url = self.test_tl_url_2 287 self.test_entry_2.leaf_index = 2 288 self.test_entry_2.annotation_leaf = self.test_anno_1 289 self.test_entry_2.log_root_descriptor = self.test_sth_2 290 self.test_entry_2.log_root_signature = b'd' * 512 291 self.test_entry_2.proofs = self.test_proof_hashes_2 292 293 self.test_entry_2_bytes = ( 294 b'\x00\x00\x00\x1a' # Transparency log url size. 295 b'\x00\x00\x00\x00\x00\x00\x00\x02' # Leaf index. 296 b'\x00\x00\x00\x3f' # Log root descriptor size. 297 b'\x00\x00\x00\x8b' # Annotation leaf size. 298 b'\x02\x00' # Log root signature size. 299 b'\x02' # Number of hashes in ICP. 300 b'\x00\x00\x00\x40' # Size of ICP in bytes. 301 + self.test_tl_url_2.encode('ascii') # Transparency log url. 302 + self.test_sth_2_bytes # Log root 303 + self.test_anno_1_bytes 304 + b'd' * 512 # Log root signature. 305 + b'g' * 32 # Hashes... 306 + b'h' * 32) 307 308 # Valid test AftlImage made out of AftlEntry #1 and #2. 309 self.test_aftl_desc = aftltool.AftlImage() 310 self.test_aftl_desc.add_icp_entry(self.test_entry_1) 311 self.test_aftl_desc.add_icp_entry(self.test_entry_2) 312 313 self.test_expected_aftl_image_bytes = ( 314 b'AFTL' # Magic. 315 + struct.pack('!L', avbtool.AVB_VERSION_MAJOR) # Major version. 316 + struct.pack('!L', avbtool.AVB_VERSION_MINOR) # Minor version. 317 + b'\x00\x00\x06\xcf' # Image size. 318 b'\x00\x02' # Number of ICP entries. 319 + self.test_entry_1_bytes 320 + self.test_entry_2_bytes) 321 322 self.test_avbm_resp = api_pb2.AddVBMetaResponse() 323 self.test_avbm_resp.annotation_proof.proof.leaf_index = 9127 324 hashes = [ 325 '61076ca285b4982669e67757f55682ddc43ab5c11ba671260f82a8efa8831f94', 326 '89c2fbcc58da25a65ce5e9b4fb22aaf208b20601f0bc023f73f05d35bc1f3bac', 327 '75d26b5f754b4bed332a3ce2a2bfea0334706a974b7e00ee663f0279fa8b446e', 328 'e1cd9c96feb893b5ef7771e424ac1c6c47509c2b98bc578d22ad07369c9641aa', 329 'e83e0e4dd352b1670a55f93f88781a73bb41efcadb9927399f59459dfa14bc40', 330 '8d5d25996117c88655d66f685baa3c94390867a040507b10587b17fbe92b496a', 331 '5de4c627e9ca712f207d6056f56f0d3286ed4a5381ed7f3cc1aa470217734138', 332 '19acfdb424d7fe28d1f850c76302f78f9a50146a5b9c65f9fdfbbc0173fd6993'] 333 for h in hashes: 334 self.test_avbm_resp.annotation_proof.proof.hashes.append( 335 binascii.unhexlify(h)) 336 self.test_avbm_resp.annotation_proof.sth.key_hint = binascii.unhexlify( 337 '5af859abce8fe1ea') 338 self.test_avbm_resp.annotation_proof.sth.log_root = binascii.unhexlify( 339 '0001' 340 '00000000000023a8' 341 '20' 342 '9a5f71340f8dc98bdc6320f976dda5f34db8554cb273ba5ab60f1697c519d6f6' 343 '1609ae15024774b1' 344 '0000000000001e5a' 345 '0000' 346 ) 347 self.test_avbm_resp.annotation_proof.sth.log_root_signature = ( 348 binascii.unhexlify( 349 '7c37903cc76e8689a6b31da9ad56c3daeb6194029510297cc7d147278390da33' 350 '09c4d9eb1f6be0cdcd1de5315b0b3b573cc9fcd8620d3fab956abbe3c597a572' 351 '46e5a5d277c4cc4b590872d0292fa64e1d3285626b1dedeb00b6aa0a7a0717c0' 352 '7d4c89b68fda9091be06180be1369675a7c4ce7f42cca133ef0daf8dcc5ba1ee' 353 '930cef6dcb71b0a7690446e19661c8e18c089a5d6f6fc9299a0592efb33a4db5' 354 '4c640027fa4f0ad0009f8bf75ec5fc17e0fa1091fabe74fe52738443745066ab' 355 '48f99b297809b863c01016abda17a2479fce91f9929c60bc2ce15e474204fc5a' 356 '8e79b2190aadb7c149671e8c76a4da506860f8d6020fb2eaabfee025cc267bad' 357 '3c8257186c8aaf1da9eefe50cae4b3e8deb66033ebc4bfcda2b317f9e7d2dd78' 358 'b47f2d86795815d82058ad4cba8fc7983a3bbf843e9b8c7ec7f1ae137be6848d' 359 '03c76eefdac40ce5e66cc23d9f3e79ad87acbe7ec0c0bb419a7d368ae1e73c85' 360 '742871f847bde69c871e8797638e0e270282fb058ef1cbcba52aded9dcc8249b' 361 '38fbed8424c33b8cfcde4f49797c64dda8d089d73b84062602fd41c66091543c' 362 'e13c18cfa7f8300530ad4b7adb8924bbb86d17bcc5f1d3d74c522a7dcc8c3c1f' 363 '28a999f2fe1bfe5520c66f93f7c90996dc7f52e62dd95ace9ceace90324c3040' 364 '669b7f5aeb5c5a53f217f1de46e32f80d0aaaf7d9cc9d0e8f8fd7026c612103a' 365 ) 366 ) 367 368 anno = aftltool.VBMetaPrimaryAnnotation( 369 vbmeta_hash=bytes.fromhex( 370 '5623731104bfa1cfdb275df2978d1f93f72f5f0f746f11d06f3091c601c32067'), 371 version_incremental='only_for_testing', 372 manufacturer_key_hash=bytes.fromhex( 373 '83ab3b109b73a1d32dce4153a2de57a1a0485052db8364f3180d98614749d7f7')) 374 raw_signature = bytes.fromhex( 375 '6a523021bc5b933bb58c38c8238be3a5fe1166002f5df8b77dee9dd22d353595' 376 'be7996656d3824ebf4e1411a05ee3652d64669d3d62b167d3290dbdf4f2741ba' 377 '4b6472e1bd71fc1860465fdcdca1ff08c4ab0420d7dcbf4ad144f64e211d8f92' 378 '081ba51192358e2478195e573d000282423b23e6dd945069907dcf11520ff11a' 379 '250e26643b820f8a5d80ccfe7d5d84f58e549cd05630f2254ade8edc88d9aa8a' 380 'ec2089f84643854e1f265a4f746598ce4cae529c4eaa637f6e35fa1d1da9254e' 381 'ec8dfede7a4313f7b151547dcdde98782ce6fb3149326ee5b8e750813d3fd37a' 382 '738fe92f6111bf0dff4091769e216b842980e05716f2e50268a7dcca430e175e' 383 '711f80e41a1a28f20635741ac11a56f97492d30db6d1955a827daf8e83faebe5' 384 'a96e18a13c558ae561a02c90982514c853db0296c2e791e68b77c30e6232a3b7' 385 'ed355441d4706277f33a01735f56cb8279336491731939691683f96f1c3e3183' 386 'a0b77510d6ff0199b7688902044829793106546fd6fd4a5294d63c31c91256ad' 387 'f7be6d053e77875698ad32ffaaeaac5d54b432e537f72549d2543072ae35578f' 388 '138d82afcadd668511ba276ce02b6f9c18ef3b6f2f6ae0d123e9f8cb930f21a9' 389 'c49a6d9e95de741c7860593a956735e1b77e9851ecb1f6572abf6e2c8ba15085' 390 'e37e0f7bab0a30d108b997ed5edd74cf7f89cf082590a6f0af7a3a1f68c0077a') 391 signature = aftltool.Signature(signature=raw_signature) 392 signed_anno = aftltool.SignedVBMetaPrimaryAnnotation(annotation=anno, 393 signature=signature) 394 leaf = aftltool.SignedVBMetaPrimaryAnnotationLeaf( 395 timestamp=1587991742919072870, 396 signed_vbmeta_primary_annotation=signed_anno).encode() 397 self.test_avbm_resp.annotation_leaf = leaf 398 399 400 def tearDown(self): 401 """Tears down the test bed for the unit tests.""" 402 # Reconnects stderr back to the normal stderr; see setUp() for details. 403 sys.stderr = self.stderr 404 self.null.close() 405 406 super(AftltoolTestCase, self).tearDown() 407 408 def get_testdata_path(self, relative_path): 409 """Retrieves the absolute path for testdata given the relative path. 410 411 Arguments: 412 relative_path: The relative path to the testdata in the testdata 413 directory. 414 415 Returns: 416 The absolute path to the testdata. 417 """ 418 rel_path_parts = ['test', 'data'] 419 rel_path_parts.extend(relative_path.split(os.path.sep)) 420 return os.path.join(TEST_EXEC_PATH, *rel_path_parts) 421 422 423class AftltoolTest(AftltoolTestCase): 424 425 def test_merkle_root_hash(self): 426 """Tests validation of inclusion proof and the merkle tree calculations. 427 428 The test vectors have been taken from the Trillian tests: 429 https://github.com/google/trillian/blob/v1.3.3/merkle/log_verifier_test.go 430 """ 431 432 inclusion_proofs = [ 433 (1, 434 8, 435 [ 436 binascii.unhexlify('96a296d224f285c67bee93c30f8a3091' 437 '57f0daa35dc5b87e410b78630a09cfc7'), 438 binascii.unhexlify('5f083f0a1a33ca076a95279832580db3' 439 'e0ef4584bdff1f54c8a360f50de3031e'), 440 binascii.unhexlify('6b47aaf29ee3c2af9af889bc1fb9254d' 441 'abd31177f16232dd6aab035ca39bf6e4') 442 ]), 443 (6, 444 8, 445 [ 446 binascii.unhexlify('bc1a0643b12e4d2d7c77918f44e0f4f7' 447 '9a838b6cf9ec5b5c283e1f4d88599e6b'), 448 binascii.unhexlify('ca854ea128ed050b41b35ffc1b87b8eb' 449 '2bde461e9e3b5596ece6b9d5975a0ae0'), 450 binascii.unhexlify('d37ee418976dd95753c1c73862b9398f' 451 'a2a2cf9b4ff0fdfe8b30cd95209614b7') 452 ]), 453 (3, 454 3, 455 [ 456 binascii.unhexlify('fac54203e7cc696cf0dfcb42c92a1d9d' 457 'baf70ad9e621f4bd8d98662f00e3c125') 458 ]), 459 (2, 460 5, 461 [ 462 binascii.unhexlify('6e340b9cffb37a989ca544e6bb780a2c' 463 '78901d3fb33738768511a30617afa01d'), 464 binascii.unhexlify('5f083f0a1a33ca076a95279832580db3' 465 'e0ef4584bdff1f54c8a360f50de3031e'), 466 binascii.unhexlify('bc1a0643b12e4d2d7c77918f44e0f4f7' 467 '9a838b6cf9ec5b5c283e1f4d88599e6b') 468 ] 469 ) 470 ] 471 472 leaves = [ 473 binascii.unhexlify(''), 474 binascii.unhexlify('00'), 475 binascii.unhexlify('10'), 476 binascii.unhexlify('2021'), 477 binascii.unhexlify('3031'), 478 binascii.unhexlify('40414243'), 479 binascii.unhexlify('5051525354555657'), 480 binascii.unhexlify('606162636465666768696a6b6c6d6e6f'), 481 ] 482 483 roots = [ 484 binascii.unhexlify('6e340b9cffb37a989ca544e6bb780a2c' 485 '78901d3fb33738768511a30617afa01d'), 486 binascii.unhexlify('fac54203e7cc696cf0dfcb42c92a1d9d' 487 'baf70ad9e621f4bd8d98662f00e3c125'), 488 binascii.unhexlify('aeb6bcfe274b70a14fb067a5e5578264' 489 'db0fa9b51af5e0ba159158f329e06e77'), 490 binascii.unhexlify('d37ee418976dd95753c1c73862b9398f' 491 'a2a2cf9b4ff0fdfe8b30cd95209614b7'), 492 binascii.unhexlify('4e3bbb1f7b478dcfe71fb631631519a3' 493 'bca12c9aefca1612bfce4c13a86264d4'), 494 binascii.unhexlify('76e67dadbcdf1e10e1b74ddc608abd2f' 495 '98dfb16fbce75277b5232a127f2087ef'), 496 binascii.unhexlify('ddb89be403809e325750d3d263cd7892' 497 '9c2942b7942a34b77e122c9594a74c8c'), 498 binascii.unhexlify('5dc9da79a70659a9ad559cb701ded9a2' 499 'ab9d823aad2f4960cfe370eff4604328'), 500 ] 501 502 for icp in inclusion_proofs: 503 leaf_id = icp[0] - 1 504 leaf_hash = aftltool.rfc6962_hash_leaf(leaves[leaf_id]) 505 root_hash = aftltool.root_from_icp(leaf_id, icp[1], icp[2], leaf_hash) 506 self.assertEqual(root_hash, roots[icp[1] -1]) 507 508 509class AftlImageTest(AftltoolTestCase): 510 511 def test__init__(self): 512 """Tests the constructor.""" 513 # Calls constructor without data. 514 d = aftltool.AftlImage() 515 self.assertIsInstance(d.image_header, aftltool.AftlImageHeader) 516 self.assertEqual(d.image_header.icp_count, 0) 517 self.assertEqual(d.icp_entries, []) 518 self.assertTrue(d.is_valid()) 519 520 # Calls constructor with data. 521 d = aftltool.AftlImage(self.test_expected_aftl_image_bytes) 522 self.assertIsInstance(d.image_header, aftltool.AftlImageHeader) 523 self.assertEqual(d.image_header.icp_count, 2) 524 self.assertEqual(len(d.icp_entries), 2) 525 for entry in d.icp_entries: 526 self.assertIsInstance(entry, aftltool.AftlIcpEntry) 527 self.assertTrue(d.is_valid()) 528 529 def test_add_icp_entry(self): 530 """Tests the add_icp_entry method.""" 531 d = aftltool.AftlImage() 532 533 # Adds 1st ICP. 534 d.add_icp_entry(self.test_entry_1) 535 self.assertEqual(d.image_header.icp_count, 1) 536 self.assertEqual(len(d.icp_entries), 1) 537 self.assertTrue(d.is_valid()) 538 539 # Adds 2nd ICP. 540 d.add_icp_entry(self.test_entry_2) 541 self.assertEqual(d.image_header.icp_count, 2) 542 self.assertEqual(len(d.icp_entries), 2) 543 self.assertTrue(d.is_valid()) 544 545 def test_verify_vbmeta_image_with_1_icp(self): 546 """Tests the verify_vbmeta_image method.""" 547 # Valid vbmeta image without footer with 1 ICP. 548 tool = aftltool.Aftl() 549 image_path = self.get_testdata_path( 550 'aftl_output_vbmeta_with_1_icp.img') 551 vbmeta_image, _ = tool.get_vbmeta_image(image_path) 552 desc = tool.get_aftl_image(image_path) 553 554 # Valid image checked against correct log key. 555 self.assertTrue(desc.verify_vbmeta_image( 556 vbmeta_image, [self.get_testdata_path('aftl_pubkey_1.pem')])) 557 558 # Valid image checked with a key from another log. 559 self.assertFalse(desc.verify_vbmeta_image( 560 vbmeta_image, [self.get_testdata_path('testkey_rsa4096_pub.pem')])) 561 562 # Valid image checked with non existed key file path. 563 self.assertFalse(desc.verify_vbmeta_image( 564 vbmeta_image, [self.get_testdata_path('non_existent_blabli')])) 565 566 # Valid image checked with an invalid key. 567 self.assertFalse(desc.verify_vbmeta_image( 568 vbmeta_image, [self.get_testdata_path('large_blob.bin')])) 569 570 # Valid image checked with empty list of keys. 571 self.assertFalse(desc.verify_vbmeta_image(vbmeta_image, [])) 572 573 # Valid image checked with empty list of keys. 574 self.assertFalse(desc.verify_vbmeta_image(vbmeta_image, None)) 575 576 def test_verify_vbmeta_image_with_2_icp_from_same_log(self): 577 """Tests the verify_vbmeta_image method.""" 578 # Valid vbmeta image without footer with 2 ICPs from same log. 579 tool = aftltool.Aftl() 580 image_path = self.get_testdata_path( 581 'aftl_output_vbmeta_with_2_icp_same_log.img') 582 vbmeta_image, _ = tool.get_vbmeta_image(image_path) 583 desc = tool.get_aftl_image(image_path) 584 585 # Valid image checked against correct log key. 586 self.assertTrue(desc.verify_vbmeta_image( 587 vbmeta_image, [self.get_testdata_path('aftl_pubkey_1.pem')])) 588 589 # Valid vbmeta image checked with key from another log. 590 self.assertFalse(desc.verify_vbmeta_image( 591 vbmeta_image, [self.get_testdata_path('testkey_rsa4096_pub.pem')])) 592 593 # Valid image checked with non existed key file path. 594 self.assertFalse(desc.verify_vbmeta_image( 595 vbmeta_image, [self.get_testdata_path('non_existent_blabli')])) 596 597 # Valid image checked with invalid key. 598 self.assertFalse(desc.verify_vbmeta_image( 599 vbmeta_image, [self.get_testdata_path('large_blob.bin')])) 600 601 # Valid image but checked with empty list of keys. 602 self.assertFalse(desc.verify_vbmeta_image(vbmeta_image, [])) 603 604 def test_encode(self): 605 """Tests encode method.""" 606 desc_bytes = self.test_aftl_desc.encode() 607 self.assertEqual(desc_bytes, self.test_expected_aftl_image_bytes) 608 609 def test_is_valid(self): 610 """Tests is_valid method.""" 611 d = aftltool.AftlImage() 612 d.add_icp_entry(self.test_entry_1) 613 d.add_icp_entry(self.test_entry_2) 614 615 # Force invalid ICP header. 616 old_magic = d.image_header.magic 617 d.image_header.magic = b'YOLO' 618 self.assertFalse(d.is_valid()) 619 d.image_header.magic = old_magic 620 self.assertTrue(d.is_valid()) 621 622 # Force count mismatch between header and actual entries. 623 old_icp_count = d.image_header.icp_count 624 d.image_header.icp_count = 1 625 self.assertFalse(d.is_valid()) 626 d.image_header.icp_count = old_icp_count 627 self.assertTrue(d.is_valid()) 628 629 # Force invalid ICP entry. 630 old_leaf_index = d.icp_entries[0].leaf_index 631 d.icp_entries[0].leaf_index = -10 632 self.assertFalse(d.is_valid()) 633 d.icp_entries[0].leaf_index = old_leaf_index 634 self.assertTrue(d.is_valid()) 635 636 def test_print_desc(self): 637 """Tests print_desc method.""" 638 buf = io.StringIO() 639 self.test_aftl_desc.print_desc(buf) 640 desc = buf.getvalue() 641 642 # Cursory check whether the printed description contains something useful. 643 self.assertGreater(len(desc), 0) 644 self.assertIn('Log Root Descriptor:', desc) 645 646 647class AftlImageHeaderTest(AftltoolTestCase): 648 """Test suite for testing the AftlImageHeader descriptor.""" 649 650 def setUp(self): 651 """Sets up the test bed for the unit tests.""" 652 super(AftlImageHeaderTest, self).setUp() 653 654 self.test_header_valid = aftltool.AftlImageHeader() 655 self.test_header_valid.icp_count = 1 656 657 self.test_header_invalid = aftltool.AftlImageHeader() 658 self.test_header_invalid.icp_count = -34 659 660 self.test_header_bytes = ( 661 b'AFTL' # Magic. 662 + struct.pack('!L', avbtool.AVB_VERSION_MAJOR) # Major version. 663 + struct.pack('!L', avbtool.AVB_VERSION_MINOR) # Minor version. 664 + b'\x00\x00\x00\x12' # Image size. 665 b'\x00\x01') # Number of ICP entries. 666 667 def test__init__(self): 668 """Tests constructor.""" 669 670 # Calls constructor without data. 671 header = aftltool.AftlImageHeader() 672 self.assertEqual(header.magic, b'AFTL') 673 self.assertEqual(header.required_icp_version_major, 674 avbtool.AVB_VERSION_MAJOR) 675 self.assertEqual(header.required_icp_version_minor, 676 avbtool.AVB_VERSION_MINOR) 677 self.assertEqual(header.aftl_image_size, aftltool.AftlImageHeader.SIZE) 678 self.assertEqual(header.icp_count, 0) 679 self.assertTrue(header.is_valid()) 680 681 # Calls constructor with data. 682 header = aftltool.AftlImageHeader(self.test_header_bytes) 683 self.assertEqual(header.magic, b'AFTL') 684 self.assertEqual(header.required_icp_version_major, 685 avbtool.AVB_VERSION_MAJOR) 686 self.assertEqual(header.required_icp_version_minor, 687 avbtool.AVB_VERSION_MINOR) 688 self.assertEqual(header.aftl_image_size, aftltool.AftlImageHeader.SIZE) 689 self.assertTrue(header.icp_count, 1) 690 self.assertTrue(header.is_valid()) 691 692 def test_encode(self): 693 """Tests encode method.""" 694 # Valid header. 695 header_bytes = self.test_header_valid.encode() 696 self.assertEqual(header_bytes, self.test_header_bytes) 697 698 # Invalid header 699 with self.assertRaises(aftltool.AftlError): 700 header_bytes = self.test_header_invalid.encode() 701 702 def test_is_valid(self): 703 """Tests is_valid method.""" 704 # Valid default record. 705 header = aftltool.AftlImageHeader() 706 self.assertTrue(header.is_valid()) 707 708 # Invalid magic. 709 header = aftltool.AftlImageHeader() 710 header.magic = b'YOLO' 711 self.assertFalse(header.is_valid()) 712 713 # Valid ICP count. 714 self.assertTrue(self.test_header_valid.is_valid()) 715 716 # Invalid ICP count. 717 self.assertFalse(self.test_header_invalid.is_valid()) 718 719 header = aftltool.AftlImageHeader() 720 header.icp_count = 10000000 721 self.assertFalse(header.is_valid()) 722 723 # Invalid ICP major version. 724 header = aftltool.AftlImageHeader() 725 header.required_icp_version_major = avbtool.AVB_VERSION_MAJOR + 1 726 self.assertFalse(header.is_valid()) 727 728 # Invalid ICP minor version. 729 header = aftltool.AftlImageHeader() 730 header.required_icp_version_minor = avbtool.AVB_VERSION_MINOR + 1 731 self.assertFalse(header.is_valid()) 732 733 def test_print_desc(self): 734 """Tests print_desc method.""" 735 buf = io.StringIO() 736 self.test_header_valid.print_desc(buf) 737 desc = buf.getvalue() 738 739 # Cursory check whether the printed description contains something useful. 740 self.assertGreater(len(desc), 0) 741 self.assertIn('Major version:', desc) 742 743 744class AftlIcpEntryTest(AftltoolTestCase): 745 """Test suite for testing the AftlIcpEntry descriptor.""" 746 747 def test__init__and_properties(self): 748 """Tests constructor and properties methods.""" 749 750 # Calls constructor without data. 751 entry = aftltool.AftlIcpEntry() 752 self.assertEqual(entry.log_url_size, 0) 753 self.assertEqual(entry.leaf_index, 0) 754 self.assertEqual(entry.log_root_descriptor_size, 29) 755 self.assertEqual(entry.annotation_leaf_size, 19) 756 self.assertEqual(entry.log_root_sig_size, 0) 757 self.assertEqual(entry.proof_hash_count, 0) 758 self.assertEqual(entry.inc_proof_size, 0) 759 self.assertEqual(entry.log_url, '') 760 self.assertIsInstance(entry.log_root_descriptor, 761 aftltool.TrillianLogRootDescriptor) 762 self.assertEqual(entry.proofs, []) 763 self.assertTrue(entry.is_valid()) 764 765 # Calls constructor with data. 766 entry = aftltool.AftlIcpEntry(self.test_entry_1_bytes) 767 self.assertEqual(entry.log_url_size, len(self.test_tl_url_1)) 768 self.assertEqual(entry.leaf_index, 1) 769 self.assertEqual(entry.annotation_leaf_size, 139) 770 self.assertEqual(entry.log_root_sig_size, 512) 771 self.assertEqual(entry.proof_hash_count, len(self.test_proof_hashes_1)) 772 self.assertEqual(entry.inc_proof_size, 128) 773 self.assertEqual(entry.log_url, self.test_tl_url_1) 774 self.assertEqual(entry.proofs, self.test_proof_hashes_1) 775 self.assertTrue(entry.is_valid()) 776 777 def test_encode(self): 778 """Tests encode method.""" 779 entry_bytes = self.test_entry_1.encode() 780 self.assertEqual(entry_bytes, self.test_entry_1_bytes) 781 782 def test_get_expected_size(self): 783 """Tests get_expected_size method.""" 784 # Default record. 785 entry = aftltool.AftlIcpEntry() 786 self.assertEqual(entry.get_expected_size(), 75) 787 self.assertEqual(entry.get_expected_size(), len(entry.encode())) 788 789 # Test record. 790 self.assertEqual(self.test_entry_1.get_expected_size(), 894) 791 self.assertEqual(self.test_entry_1.get_expected_size(), 792 len(self.test_entry_1.encode())) 793 794 def test_is_valid(self): 795 """Tests is_valid method.""" 796 # Valid default record. 797 entry = aftltool.AftlIcpEntry() 798 entry.leaf_index = 2 799 entry.log_url = self.test_tl_url_1 800 entry.set_log_root_descriptor = self.test_sth_1 801 entry.proofs = self.test_proof_hashes_1 802 self.assertTrue(entry.is_valid()) 803 804 # Invalid leaf index. 805 entry = aftltool.AftlIcpEntry() 806 entry.leaf_index = -1 807 self.assertFalse(entry.is_valid()) 808 809 # Invalid log_root_descriptor 810 entry = aftltool.AftlIcpEntry() 811 entry.log_root_descriptor = None 812 self.assertFalse(entry.is_valid()) 813 814 entry.log_root_descriptor = b'' 815 self.assertFalse(entry.is_valid()) 816 817 entry.log_root_descriptor = b'blabli' 818 self.assertFalse(entry.is_valid()) 819 820 def test_translate_response(self): 821 """Tests translate_response method.""" 822 entry = aftltool.AftlIcpEntry() 823 entry.translate_response('aftl-test.foo.bar:80', self.test_avbm_resp) 824 self.assertEqual(entry.log_url, 'aftl-test.foo.bar:80') 825 self.assertEqual(entry.leaf_index, 9127) 826 self.assertEqual(entry.log_root_descriptor.encode(), 827 self.test_avbm_resp.annotation_proof.sth.log_root) 828 self.assertEqual( 829 entry.log_root_signature, 830 self.test_avbm_resp.annotation_proof.sth.log_root_signature) 831 self.assertEqual( 832 entry.proofs, 833 self.test_avbm_resp.annotation_proof.proof.hashes) 834 835 def test_verify_icp(self): 836 """Tests verify_icp method.""" 837 with tempfile.NamedTemporaryFile('wt+') as key_file: 838 key_file.write(self.test_aftl_pub_key) 839 key_file.flush() 840 841 # Valid ICP. 842 entry = aftltool.AftlIcpEntry() 843 entry.translate_response(self.test_tl_url_1, self.test_avbm_resp) 844 self.assertTrue(entry.verify_icp(key_file.name)) 845 846 # Invalid ICP where annotation_leaf is not matching up with proofs. 847 # pylint: disable=protected-access 848 entry = aftltool.AftlIcpEntry() 849 entry.translate_response(self.test_tl_url_1, self.test_avbm_resp) 850 vbmeta_hash = entry.annotation_leaf.annotation.vbmeta_hash 851 vbmeta_hash = vbmeta_hash.replace(b"\x56\x23\x73\x11", 852 b"\x00\x00\x00\x00") 853 entry.annotation_leaf.annotation.vbmeta_hash = vbmeta_hash 854 self.assertFalse(entry.verify_icp(key_file)) 855 856 def test_verify_vbmeta_image(self): 857 """Tests the verify_vbmeta_image method.""" 858 # Valid vbmeta image without footer with 1 ICP. 859 tool = aftltool.Aftl() 860 image_path = self.get_testdata_path( 861 'aftl_output_vbmeta_with_1_icp.img') 862 vbmeta_image, _ = tool.get_vbmeta_image(image_path) 863 desc = tool.get_aftl_image(image_path) 864 865 # Checks that there is 1 ICP. 866 self.assertEqual(desc.image_header.icp_count, 1) 867 entry = desc.icp_entries[0] 868 869 # Valid vbmeta image checked with correct log key. 870 self.assertTrue(entry.verify_vbmeta_image( 871 vbmeta_image, self.get_testdata_path('aftl_pubkey_1.pem'))) 872 873 # Valid vbmeta image checked with public key of another log. 874 self.assertFalse(entry.verify_vbmeta_image( 875 vbmeta_image, self.get_testdata_path('testkey_rsa4096_pub.pem'))) 876 877 # Valid vbmeta image checked with invalid key. 878 self.assertFalse(entry.verify_vbmeta_image( 879 vbmeta_image, self.get_testdata_path('large_blob.bin'))) 880 881 # Valid vbmeta image checked with no key. 882 self.assertFalse(entry.verify_vbmeta_image(vbmeta_image, None)) 883 884 def test_verify_invalid_vbmeta_image(self): 885 """Tests the verify_vbmeta_image method.""" 886 # Valid vbmeta image without footer with 1 ICP. 887 tool = aftltool.Aftl() 888 image_path = self.get_testdata_path( 889 'aftl_output_vbmeta_with_1_icp.img') 890 vbmeta_image, _ = tool.get_vbmeta_image(image_path) 891 desc = tool.get_aftl_image(image_path) 892 893 self.assertEqual(desc.image_header.icp_count, 1) 894 entry = desc.icp_entries[0] 895 896 # Modify vbmeta image to become invalid 897 vbmeta_image = b'A' * len(vbmeta_image) 898 899 # Invalid vbmeta image checked with correct log key. 900 self.assertFalse(entry.verify_vbmeta_image( 901 vbmeta_image, self.get_testdata_path('aftl_pubkey_1.pem'))) 902 903 # Invalid vbmeta image checked with invalid key. 904 self.assertFalse(entry.verify_vbmeta_image( 905 vbmeta_image, self.get_testdata_path('large_blob.bin'))) 906 907 # Valid vbmeta image checked with no key. 908 self.assertFalse(entry.verify_vbmeta_image(vbmeta_image, None)) 909 910 # None image checked with a key. 911 self.assertFalse(entry.verify_vbmeta_image( 912 None, self.get_testdata_path('aftl_pubkey_1.pem'))) 913 914 def test_print_desc(self): 915 """Tests print_desc method.""" 916 buf = io.StringIO() 917 self.test_entry_1.print_desc(buf) 918 desc = buf.getvalue() 919 920 # Cursory check whether the printed description contains something useful. 921 self.assertGreater(len(desc), 0) 922 self.assertIn('ICP hashes:', desc) 923 924 925class TrillianLogRootDescriptorTest(AftltoolTestCase): 926 """Test suite for testing the TrillianLogRootDescriptor descriptor.""" 927 928 def setUp(self): 929 """Sets up the test bed for the unit tests.""" 930 super(TrillianLogRootDescriptorTest, self).setUp() 931 932 # Creates basic log root without metadata fields. 933 base_log_root = ( 934 '0001' # version 935 '00000000000002e5' # tree_size 936 '20' # root_hash_size 937 '2d614759ad408a111a3351c0cb33c099' # root_hash 938 '422c30a5c5104788a343332bde2b387b' 939 '15e1c97e3b4bd239' # timestamp 940 '00000000000002e4' # revision 941 ) 942 943 # Create valid log roots with metadata fields w/ and w/o metadata. 944 self.test_log_root_bytes_wo_metadata = binascii.unhexlify( 945 base_log_root + '0000') 946 self.test_log_root_bytes_with_metadata = binascii.unhexlify( 947 base_log_root + '00023132') 948 949 def test__init__(self): 950 """Tests constructor.""" 951 # Calls constructor without data. 952 d = aftltool.TrillianLogRootDescriptor() 953 self.assertTrue(d.is_valid()) 954 self.assertEqual(d.version, 1) 955 self.assertEqual(d.tree_size, 0) 956 self.assertEqual(d.root_hash_size, 0) 957 self.assertEqual(d.root_hash, b'') 958 self.assertEqual(d.timestamp, 0) 959 self.assertEqual(d.revision, 0) 960 self.assertEqual(d.metadata_size, 0) 961 self.assertEqual(d.metadata, b'') 962 963 # Calls constructor with log_root w/o metadata 964 d = aftltool.TrillianLogRootDescriptor(self.test_log_root_bytes_wo_metadata) 965 self.assertTrue(d.is_valid()) 966 self.assertEqual(d.version, 1) 967 self.assertEqual(d.tree_size, 741) 968 self.assertEqual(d.root_hash_size, 32) 969 self.assertEqual(d.root_hash, 970 binascii.unhexlify('2d614759ad408a111a3351c0cb33c099' 971 '422c30a5c5104788a343332bde2b387b')) 972 self.assertEqual(d.timestamp, 1576762888554271289) 973 self.assertEqual(d.revision, 740) 974 self.assertEqual(d.metadata_size, 0) 975 self.assertEqual(d.metadata, b'') 976 977 # Calls constructor with log_root with metadata 978 d = aftltool.TrillianLogRootDescriptor( 979 self.test_log_root_bytes_with_metadata) 980 self.assertEqual(d.metadata_size, 2) 981 self.assertEqual(d.metadata, b'12') 982 983 def test_get_expected_size(self): 984 """Tests get_expected_size method.""" 985 # Default constructor. 986 d = aftltool.TrillianLogRootDescriptor() 987 self.assertEqual(d.get_expected_size(), 11 + 18) 988 989 # Log root without metadata. 990 d = aftltool.TrillianLogRootDescriptor(self.test_log_root_bytes_wo_metadata) 991 self.assertEqual(d.get_expected_size(), 11 + 18 + 32) 992 993 # Log root with metadata. 994 d = aftltool.TrillianLogRootDescriptor( 995 self.test_log_root_bytes_with_metadata) 996 self.assertEqual(d.get_expected_size(), 11 + 18 + 32 + 2) 997 998 def test_encode(self): 999 """Tests encode method.""" 1000 # Log root from default constructor. 1001 d = aftltool.TrillianLogRootDescriptor() 1002 expected_bytes = ( 1003 '0001' # version 1004 '0000000000000000' # tree_size 1005 '00' # root_hash_size 1006 '' # root_hash (empty) 1007 '0000000000000000' # timestamp 1008 '0000000000000000' # revision 1009 '0000' # metadata size 1010 '' # metadata (empty) 1011 ) 1012 self.assertEqual(d.encode(), binascii.unhexlify(expected_bytes)) 1013 1014 # Log root without metadata. 1015 d = aftltool.TrillianLogRootDescriptor(self.test_log_root_bytes_wo_metadata) 1016 self.assertEqual(d.encode(), self.test_log_root_bytes_wo_metadata) 1017 1018 # Log root with metadata. 1019 d = aftltool.TrillianLogRootDescriptor( 1020 self.test_log_root_bytes_with_metadata) 1021 self.assertEqual(d.encode(), self.test_log_root_bytes_with_metadata) 1022 1023 def test_is_valid(self): 1024 """Tests is_valid method.""" 1025 d = aftltool.TrillianLogRootDescriptor() 1026 self.assertTrue(d.is_valid()) 1027 1028 # Invalid version. 1029 d = aftltool.TrillianLogRootDescriptor() 1030 d.version = 2 1031 self.assertFalse(d.is_valid()) 1032 1033 # Invalid tree_size. 1034 d = aftltool.TrillianLogRootDescriptor() 1035 d.tree_size = -1 1036 self.assertFalse(d.is_valid()) 1037 1038 # Invalid root_hash_size. 1039 d = aftltool.TrillianLogRootDescriptor() 1040 d.root_hash_size = -1 1041 self.assertFalse(d.is_valid()) 1042 d.root_hash_size = 300 1043 self.assertFalse(d.is_valid()) 1044 1045 # Invalid/valid root_hash_size / root_hash combination. 1046 d = aftltool.TrillianLogRootDescriptor() 1047 d.root_hash_size = 4 1048 d.root_hash = b'123' 1049 self.assertFalse(d.is_valid()) 1050 d.root_hash = b'1234' 1051 self.assertTrue(d.is_valid()) 1052 1053 # Invalid timestamp. 1054 d = aftltool.TrillianLogRootDescriptor() 1055 d.timestamp = -1 1056 self.assertFalse(d.is_valid()) 1057 1058 # Invalid revision. 1059 d = aftltool.TrillianLogRootDescriptor() 1060 d.revision = -1 1061 self.assertFalse(d.is_valid()) 1062 1063 # Invalid metadata_size. 1064 d = aftltool.TrillianLogRootDescriptor() 1065 d.metadata_size = -1 1066 self.assertFalse(d.is_valid()) 1067 d.metadata_size = 70000 1068 self.assertFalse(d.is_valid()) 1069 1070 # Invalid/valid metadata_size / metadata combination. 1071 d = aftltool.TrillianLogRootDescriptor() 1072 d.metadata_size = 4 1073 d.metadata = b'123' 1074 self.assertFalse(d.is_valid()) 1075 d.metadata = b'1234' 1076 self.assertTrue(d.is_valid()) 1077 1078 def test_print_desc(self): 1079 """Tests print_desc method.""" 1080 # Log root without metadata 1081 buf = io.StringIO() 1082 d = aftltool.TrillianLogRootDescriptor(self.test_log_root_bytes_wo_metadata) 1083 d.print_desc(buf) 1084 desc = buf.getvalue() 1085 1086 # Cursory check whether the printed description contains something useful. 1087 self.assertGreater(len(desc), 0) 1088 self.assertIn('Version:', desc) 1089 self.assertNotIn('Metadata:', desc) 1090 1091 # Log root with metadata 1092 buf = io.StringIO() 1093 d = aftltool.TrillianLogRootDescriptor( 1094 self.test_log_root_bytes_with_metadata) 1095 d.print_desc(buf) 1096 desc = buf.getvalue() 1097 1098 # Cursory check whether the printed description contains something useful. 1099 self.assertGreater(len(desc), 0) 1100 self.assertIn('Version:', desc) 1101 self.assertIn('Metadata:', desc) 1102 1103 1104class SignedVBMetaPrimaryAnnotationLeafTest(AftltoolTestCase): 1105 """Test suite for testing the Leaf.""" 1106 1107 def test__init__(self): 1108 """Tests constructor and properties methods.""" 1109 # Calls constructor without data. 1110 leaf = aftltool.SignedVBMetaPrimaryAnnotationLeaf() 1111 self.assertEqual(leaf.version, 1) 1112 self.assertEqual(leaf.timestamp, 0) 1113 self.assertEqual(leaf.signature.signature, b'') 1114 self.assertEqual(leaf.annotation.vbmeta_hash, b'') 1115 self.assertEqual(leaf.annotation.description, '') 1116 1117 def test_parse(self): 1118 # Calls parse with valid data. 1119 leaf = aftltool.SignedVBMetaPrimaryAnnotationLeaf.parse( 1120 self.test_anno_1_bytes) 1121 self.assertEqual(leaf.annotation.vbmeta_hash, b'w'*32) 1122 self.assertEqual(leaf.annotation.version_incremental, 'x'*5) 1123 self.assertEqual(leaf.annotation.manufacturer_key_hash, b'y'*32) 1124 self.assertEqual(leaf.annotation.description, 'z'*51) 1125 1126 # Calls parse with invalid data. 1127 with self.assertRaises(aftltool.AftlError): 1128 leaf = aftltool.SignedVBMetaPrimaryAnnotationLeaf.parse(b'Invalid data') 1129 1130 def test_get_expected_size(self): 1131 """Tests get_expected_size method.""" 1132 # Calls constructor without data. 1133 leaf = aftltool.SignedVBMetaPrimaryAnnotationLeaf() 1134 self.assertEqual(leaf.get_expected_size(), 19) 1135 1136 # Calls constructor with data. 1137 leaf = aftltool.SignedVBMetaPrimaryAnnotationLeaf.parse( 1138 self.test_anno_1_bytes) 1139 self.assertEqual(leaf.get_expected_size(), 1140 len(self.test_anno_1_bytes)) 1141 1142 def test_encode(self): 1143 """Tests encode method.""" 1144 # Calls constructor with data. 1145 self.assertEqual(self.test_anno_1.encode(), 1146 self.test_anno_1_bytes) 1147 1148 def test_print_desc(self): 1149 """Tests print_desc method.""" 1150 buf = io.StringIO() 1151 self.test_anno_1.print_desc(buf) 1152 desc = buf.getvalue() 1153 1154 # Cursory check whether the printed description contains something useful. 1155 self.assertGreater(len(desc), 0) 1156 self.assertIn('VBMeta hash:', desc) 1157 1158 1159class AftlMockCommunication(aftltool.AftlCommunication): 1160 """Testing Mock implementation of AftlCommunication.""" 1161 1162 def __init__(self, transparency_log_config, canned_response): 1163 """Initializes the object. 1164 1165 Arguments: 1166 transparency_log_config: An aftltool.TransparencyLogConfig instance. 1167 canned_response: AddVBMetaResponse to return or the Exception to 1168 raise. 1169 """ 1170 super(AftlMockCommunication, self).__init__(transparency_log_config, 1171 timeout=None) 1172 self.request = None 1173 self.canned_response = canned_response 1174 1175 def add_vbmeta(self, request): 1176 """Records the request and returns the canned response.""" 1177 self.request = request 1178 1179 if isinstance(self.canned_response, aftltool.AftlError): 1180 raise self.canned_response 1181 return self.canned_response 1182 1183 1184class AftlMock(aftltool.Aftl): 1185 """Mock for aftltool.Aftl to mock the communication piece.""" 1186 1187 def __init__(self, canned_response): 1188 """Initializes the object. 1189 1190 Arguments: 1191 canned_response: AddVBMetaResponse to return or the Exception to 1192 raise. 1193 """ 1194 self.mock_canned_response = canned_response 1195 1196 def request_inclusion_proof(self, transparency_log_config, vbmeta_image, 1197 version_inc, manufacturer_key_path, 1198 signing_helper, signing_helper_with_files, 1199 timeout, aftl_comms=None): 1200 """Mocked request_inclusion_proof function.""" 1201 aftl_comms = AftlMockCommunication(transparency_log_config, 1202 self.mock_canned_response) 1203 return super(AftlMock, self).request_inclusion_proof( 1204 transparency_log_config, vbmeta_image, version_inc, 1205 manufacturer_key_path, signing_helper, signing_helper_with_files, 1206 timeout, aftl_comms=aftl_comms) 1207 1208 1209class AftlTestCase(AftltoolTestCase): 1210 1211 def setUp(self): 1212 """Sets up the test bed for the unit tests.""" 1213 super(AftlTestCase, self).setUp() 1214 1215 # Sets up the member variables which are then configured by 1216 # set_up_environment() in the subclasses. 1217 self.aftl_host = None 1218 self.aftl_pubkey = None 1219 self.aftl_apikey = None 1220 self.vbmeta_image = None 1221 self.manufacturer_key = None 1222 self.set_up_environment() 1223 1224 self.transparency_log_config = aftltool.TransparencyLogConfig( 1225 self.aftl_host, self.aftl_pubkey, self.aftl_apikey) 1226 1227 self.make_icp_default_params = { 1228 'vbmeta_image_path': self.vbmeta_image, 1229 'output': None, 1230 'signing_helper': None, 1231 'signing_helper_with_files': None, 1232 'version_incremental': '1', 1233 'transparency_log_configs': [self.transparency_log_config], 1234 'manufacturer_key': self.manufacturer_key, 1235 'padding_size': 0, 1236 'timeout': None 1237 } 1238 1239 self.info_icp_default_params = { 1240 'vbmeta_image_path': None, 1241 'output': io.StringIO() 1242 } 1243 1244 self.verify_icp_default_params = { 1245 'vbmeta_image_path': None, 1246 'transparency_log_pub_keys': [self.aftl_pubkey], 1247 'output': io.StringIO() 1248 } 1249 1250 self.load_test_aftl_default_params = { 1251 'vbmeta_image_path': self.vbmeta_image, 1252 'output': io.StringIO(), 1253 'transparency_log_config': self.transparency_log_config, 1254 'manufacturer_key': self.manufacturer_key, 1255 'process_count': 1, 1256 'submission_count': 1, 1257 'stats_filename': None, 1258 'preserve_icp_images': False, 1259 'timeout': None 1260 } 1261 1262 def set_up_environment(self): 1263 """Sets up member variables for the particular test environment. 1264 1265 This allows to have different settings and mocking for unit tests and 1266 integration tests. 1267 """ 1268 raise NotImplementedError('set_up_environment() needs to be implemented ' 1269 'by subclass.') 1270 1271 def get_aftl_implementation(self, canned_response): 1272 """Gets the aftltool.Aftl implementation used for testing. 1273 1274 This allows to have different Aftl implementations for unit tests and 1275 integration tests. 1276 1277 Arguments: 1278 canned_response: Since we are using the actual implementation and not a 1279 mock this gets ignored. 1280 1281 Raises: 1282 NotImplementedError if subclass is not implementing the method. 1283 """ 1284 raise NotImplementedError('get_aftl_implementation() needs to be' 1285 'implemented by subclass.') 1286 1287 1288class AftlTest(AftlTestCase): 1289 1290 def set_up_environment(self): 1291 """Sets up the environment for unit testing without networking.""" 1292 self.aftl_host = 'test.foo.bar:9000' 1293 self.aftl_pubkey = self.get_testdata_path('aftl_pubkey_1.pem') 1294 self.vbmeta_image = self.get_testdata_path('aftl_input_vbmeta.img') 1295 self.manufacturer_key = self.get_testdata_path('testkey_rsa4096.pem') 1296 1297 def get_aftl_implementation(self, canned_response): 1298 """Retrieves the AftlMock for unit testing without networking.""" 1299 return AftlMock(canned_response) 1300 1301 def test_get_vbmeta_image(self): 1302 """Tests the get_vbmeta_image method.""" 1303 tool = aftltool.Aftl() 1304 1305 # Valid vbmeta image without footer and AftlImage. 1306 image, footer = tool.get_vbmeta_image( 1307 self.get_testdata_path('aftl_input_vbmeta.img')) 1308 self.assertIsNotNone(image) 1309 self.assertEqual(len(image), 4352) 1310 self.assertIsNone(footer) 1311 1312 # Valid vbmeta image without footer but with AftlImage. 1313 image, footer = tool.get_vbmeta_image( 1314 self.get_testdata_path('aftl_output_vbmeta_with_1_icp.img')) 1315 self.assertIsNotNone(image) 1316 self.assertEqual(len(image), 4352) 1317 self.assertIsNone(footer) 1318 1319 # Invalid vbmeta image. 1320 image, footer = tool.get_vbmeta_image( 1321 self.get_testdata_path('large_blob.bin')) 1322 self.assertIsNone(image) 1323 self.assertIsNone(footer) 1324 1325 # Invalid file path. 1326 image, footer = tool.get_vbmeta_image( 1327 self.get_testdata_path('blabli_not_existing_file')) 1328 self.assertIsNone(image) 1329 self.assertIsNone(footer) 1330 1331 def test_get_aftl_image(self): 1332 """Tests the get_aftl_image method.""" 1333 tool = aftltool.Aftl() 1334 1335 # Valid vbmeta image without footer with AftlImage. 1336 desc = tool.get_aftl_image( 1337 self.get_testdata_path('aftl_output_vbmeta_with_1_icp.img')) 1338 self.assertIsInstance(desc, aftltool.AftlImage) 1339 1340 # Valid vbmeta image without footer and AftlImage. 1341 desc = tool.get_aftl_image( 1342 self.get_testdata_path('aftl_input_vbmeta.img')) 1343 self.assertIsNone(desc) 1344 1345 # Invalid vbmeta image. 1346 desc = tool.get_aftl_image(self.get_testdata_path('large_blob.bin')) 1347 self.assertIsNone(desc) 1348 1349 # pylint: disable=no-member 1350 def test_request_inclusion_proof(self): 1351 """Tests the request_inclusion_proof method.""" 1352 # Always work with a mock independent if run as unit or integration tests. 1353 aftl = AftlMock(self.test_avbm_resp) 1354 1355 icp = aftl.request_inclusion_proof( 1356 self.transparency_log_config, b'a' * 1024, '1', 1357 self.get_testdata_path('testkey_rsa4096.pem'), None, None, None) 1358 self.assertEqual(icp.leaf_index, 1359 self.test_avbm_resp.annotation_proof.proof.leaf_index) 1360 self.assertEqual(icp.proof_hash_count, 1361 len(self.test_avbm_resp.annotation_proof.proof.hashes)) 1362 self.assertEqual(icp.log_url, self.aftl_host) 1363 self.assertEqual( 1364 icp.log_root_descriptor.root_hash, binascii.unhexlify( 1365 '9a5f71340f8dc98bdc6320f976dda5f34db8554cb273ba5ab60f1697c519d6f6')) 1366 1367 self.assertEqual(icp.annotation_leaf.annotation.version_incremental, 1368 'only_for_testing') 1369 # To calculate the hash of the a RSA key use the following command: 1370 # openssl rsa -in test/data/testkey_rsa4096.pem -pubout \ 1371 # -outform DER | sha256sum 1372 self.assertEqual( 1373 icp.annotation_leaf.annotation.manufacturer_key_hash, 1374 bytes.fromhex( 1375 "83ab3b109b73a1d32dce4153a2de57a1a0485052db8364f3180d98614749d7f7")) 1376 1377 self.assertEqual( 1378 icp.log_root_signature, 1379 self.test_avbm_resp.annotation_proof.sth.log_root_signature) 1380 self.assertEqual( 1381 icp.proofs, 1382 self.test_avbm_resp.annotation_proof.proof.hashes) 1383 1384 # pylint: disable=no-member 1385 def test_request_inclusion_proof_failure(self): 1386 """Tests the request_inclusion_proof method in case of a comms problem.""" 1387 # Always work with a mock independent if run as unit or integration tests. 1388 aftl = AftlMock(aftltool.AftlError('Comms error')) 1389 1390 with self.assertRaises(aftltool.AftlError): 1391 aftl.request_inclusion_proof( 1392 self.transparency_log_config, b'a' * 1024, 'version_inc', 1393 self.get_testdata_path('testkey_rsa4096.pem'), None, None, None) 1394 1395 def test_request_inclusion_proof_manuf_key_not_4096(self): 1396 """Tests request_inclusion_proof with manufacturing key not of size 4096.""" 1397 # Always work with a mock independent if run as unit or integration tests. 1398 aftl = AftlMock(self.test_avbm_resp) 1399 with self.assertRaises(aftltool.AftlError) as e: 1400 aftl.request_inclusion_proof( 1401 self.transparency_log_config, b'a' * 1024, 'version_inc', 1402 self.get_testdata_path('testkey_rsa2048.pem'), None, None, None) 1403 self.assertIn('not of size 4096: 2048', str(e.exception)) 1404 1405 def test_make_and_verify_icp_with_1_log(self): 1406 """Tests make_icp_from_vbmeta, verify_image_icp & info_image_icp.""" 1407 aftl = self.get_aftl_implementation(self.test_avbm_resp) 1408 1409 # Make a VBmeta image with ICP. 1410 with tempfile.NamedTemporaryFile('wb+') as output_file: 1411 self.make_icp_default_params['output'] = output_file 1412 result = aftl.make_icp_from_vbmeta(**self.make_icp_default_params) 1413 output_file.flush() 1414 self.assertTrue(result) 1415 1416 # Checks that there is 1 ICP. 1417 aftl_image = aftl.get_aftl_image(output_file.name) 1418 self.assertEqual(aftl_image.image_header.icp_count, 1) 1419 1420 # Verifies the generated image. 1421 self.verify_icp_default_params['vbmeta_image_path'] = output_file.name 1422 result = aftl.verify_image_icp(**self.verify_icp_default_params) 1423 self.assertTrue(result) 1424 1425 # Prints the image details. 1426 self.info_icp_default_params['vbmeta_image_path'] = output_file.name 1427 result = aftl.info_image_icp(**self.info_icp_default_params) 1428 self.assertTrue(result) 1429 1430 def test_make_and_verify_icp_with_2_logs(self): 1431 """Tests make_icp_from_vbmeta, verify_image_icp & info_image_icp.""" 1432 aftl = self.get_aftl_implementation(self.test_avbm_resp) 1433 1434 # Reconfigures default parameters with two transparency logs. 1435 self.make_icp_default_params['transparency_log_configs'] = [ 1436 self.transparency_log_config, self.transparency_log_config] 1437 1438 # Make a VBmeta image with ICP. 1439 with tempfile.NamedTemporaryFile('wb+') as output_file: 1440 self.make_icp_default_params['output'] = output_file 1441 result = aftl.make_icp_from_vbmeta( 1442 **self.make_icp_default_params) 1443 output_file.flush() 1444 self.assertTrue(result) 1445 1446 # Checks that there are 2 ICPs. 1447 aftl_image = aftl.get_aftl_image(output_file.name) 1448 self.assertEqual(aftl_image.image_header.icp_count, 2) 1449 1450 # Verifies the generated image. 1451 self.verify_icp_default_params['vbmeta_image_path'] = output_file.name 1452 result = aftl.verify_image_icp(**self.verify_icp_default_params) 1453 self.assertTrue(result) 1454 1455 # Prints the image details. 1456 self.info_icp_default_params['vbmeta_image_path'] = output_file.name 1457 result = aftl.info_image_icp(**self.info_icp_default_params) 1458 self.assertTrue(result) 1459 1460 def test_info_image_icp(self): 1461 """Tests info_image_icp with vbmeta image with 2 ICP.""" 1462 # Always work with a mock independent if run as unit or integration tests. 1463 aftl = AftlMock(self.test_avbm_resp) 1464 1465 image_path = self.get_testdata_path( 1466 'aftl_output_vbmeta_with_2_icp_same_log.img') 1467 self.info_icp_default_params['vbmeta_image_path'] = image_path 1468 1469 # Verifies the generated image. 1470 result = aftl.info_image_icp(**self.info_icp_default_params) 1471 self.assertTrue(result) 1472 1473 def test_info_image_icp_fail(self): 1474 """Tests info_image_icp with invalid vbmeta image.""" 1475 # Always work with a mock independent if run as unit or integration tests. 1476 aftl = AftlMock(self.test_avbm_resp) 1477 1478 image_path = self.get_testdata_path('large_blob.bin') 1479 self.info_icp_default_params['vbmeta_image_path'] = image_path 1480 1481 # Verifies the generated image. 1482 result = aftl.info_image_icp(**self.info_icp_default_params) 1483 self.assertFalse(result) 1484 1485 def test_verify_image_icp(self): 1486 """Tets verify_image_icp with 2 ICP with all matching log keys.""" 1487 # Always work with a mock independent if run as unit or integration tests. 1488 aftl = AftlMock(self.test_avbm_resp) 1489 1490 image_path = self.get_testdata_path( 1491 'aftl_output_vbmeta_with_2_icp_same_log.img') 1492 self.verify_icp_default_params['vbmeta_image_path'] = image_path 1493 self.verify_icp_default_params['transparency_log_pub_keys'] = [ 1494 self.get_testdata_path('aftl_pubkey_1.pem'), 1495 ] 1496 1497 result = aftl.verify_image_icp(**self.verify_icp_default_params) 1498 self.assertTrue(result) 1499 1500 def test_make_icp_with_invalid_grpc_service(self): 1501 """Tests make_icp_from_vbmeta command with a host not supporting GRPC.""" 1502 aftl = self.get_aftl_implementation(aftltool.AftlError('Comms error')) 1503 self.make_icp_default_params[ 1504 'transparency_log_configs'][0].target = 'www.google.com:80' 1505 with tempfile.NamedTemporaryFile('wb+') as output_file: 1506 self.make_icp_default_params['output'] = output_file 1507 result = aftl.make_icp_from_vbmeta( 1508 **self.make_icp_default_params) 1509 self.assertFalse(result) 1510 1511 def test_make_icp_grpc_timeout(self): 1512 """Tests make_icp_from_vbmeta command when running into GRPC timeout.""" 1513 aftl = self.get_aftl_implementation(aftltool.AftlError('Comms error')) 1514 1515 # The timeout is set to 1 second which is way below the minimum processing 1516 # time of the transparency log per load test results in b/139407814#2 where 1517 # it was 3.43 seconds. 1518 self.make_icp_default_params['timeout'] = 1 1519 with tempfile.NamedTemporaryFile('wb+') as output_file: 1520 self.make_icp_default_params['output'] = output_file 1521 result = aftl.make_icp_from_vbmeta( 1522 **self.make_icp_default_params) 1523 self.assertFalse(result) 1524 1525 def test_load_test_single_process_single_submission(self): 1526 """Tests load_test_aftl command with 1 process which does 1 submission.""" 1527 aftl = self.get_aftl_implementation(self.test_avbm_resp) 1528 1529 with tempfile.TemporaryDirectory() as tmp_dir: 1530 self.load_test_aftl_default_params[ 1531 'stats_filename'] = os.path.join(tmp_dir, 'load_test.csv') 1532 result = aftl.load_test_aftl(**self.load_test_aftl_default_params) 1533 self.assertTrue(result) 1534 1535 output = self.load_test_aftl_default_params['output'].getvalue() 1536 self.assertRegex(output, 'Succeeded:.+?1\n') 1537 self.assertRegex(output, 'Failed:.+?0\n') 1538 1539 self.assertTrue(os.path.exists( 1540 self.load_test_aftl_default_params['stats_filename'])) 1541 1542 def test_load_test_multi_process_multi_submission(self): 1543 """Tests load_test_aftl command with 2 processes and 2 submissions each.""" 1544 aftl = self.get_aftl_implementation(self.test_avbm_resp) 1545 1546 self.load_test_aftl_default_params['process_count'] = 2 1547 self.load_test_aftl_default_params['submission_count'] = 2 1548 with tempfile.TemporaryDirectory() as tmp_dir: 1549 self.load_test_aftl_default_params[ 1550 'stats_filename'] = os.path.join(tmp_dir, 'load_test.csv') 1551 result = aftl.load_test_aftl(**self.load_test_aftl_default_params) 1552 self.assertTrue(result) 1553 1554 output = self.load_test_aftl_default_params['output'].getvalue() 1555 self.assertRegex(output, 'Succeeded:.+?4\n') 1556 self.assertRegex(output, 'Failed:.+?0\n') 1557 1558 self.assertTrue(os.path.exists( 1559 self.load_test_aftl_default_params['stats_filename'])) 1560 1561 def test_load_test_invalid_grpc_service(self): 1562 """Tests load_test_aftl command with a host that does not support GRPC.""" 1563 aftl = self.get_aftl_implementation(aftltool.AftlError('Comms error')) 1564 1565 self.load_test_aftl_default_params[ 1566 'transparency_log_config'].target = 'www.google.com:80' 1567 result = aftl.load_test_aftl(**self.load_test_aftl_default_params) 1568 self.assertFalse(result) 1569 1570 output = self.load_test_aftl_default_params['output'].getvalue() 1571 self.assertRegex(output, 'Succeeded:.+?0\n') 1572 self.assertRegex(output, 'Failed:.+?1\n') 1573 1574 def test_load_test_grpc_timeout(self): 1575 """Tests load_test_aftl command when running into timeout.""" 1576 aftl = self.get_aftl_implementation(aftltool.AftlError('Comms error')) 1577 1578 self.load_test_aftl_default_params['timeout'] = 1 1579 result = aftl.load_test_aftl(**self.load_test_aftl_default_params) 1580 self.assertFalse(result) 1581 1582 output = self.load_test_aftl_default_params['output'].getvalue() 1583 self.assertRegex(output, 'Succeeded:.+?0\n') 1584 self.assertRegex(output, 'Failed:.+?1\n') 1585 1586 1587class TransparencyLogConfigTestCase(unittest.TestCase): 1588 1589 def test_from_argument(self): 1590 log = aftltool.TransparencyLogConfig.from_argument( 1591 "example.com:8080,mykey.pub") 1592 self.assertEqual(log.target, "example.com:8080") 1593 self.assertEqual(log.pub_key, "mykey.pub") 1594 1595 with self.assertRaises(argparse.ArgumentTypeError): 1596 aftltool.TransparencyLogConfig.from_argument("example.com:8080,") 1597 1598 with self.assertRaises(argparse.ArgumentTypeError): 1599 aftltool.TransparencyLogConfig.from_argument(",") 1600 1601 def test_from_argument_with_api_key(self): 1602 log = aftltool.TransparencyLogConfig.from_argument( 1603 "example.com:8080,mykey.pub,Aipl29gj3x9") 1604 self.assertEqual(log.target, "example.com:8080") 1605 self.assertEqual(log.pub_key, "mykey.pub") 1606 self.assertEqual(log.api_key, "Aipl29gj3x9") 1607 1608if __name__ == '__main__': 1609 unittest.main(verbosity=2) 1610