1#!/usr/bin/env python3 2 3# Copyright 2020, The Android Open Source Project 4# 5# Permission is hereby granted, free of charge, to any person 6# obtaining a copy of this software and associated documentation 7# files (the "Software"), to deal in the Software without 8# restriction, including without limitation the rights to use, copy, 9# modify, merge, publish, distribute, sublicense, and/or sell copies 10# of the Software, and to permit persons to whom the Software is 11# furnished to do so, subject to the following conditions: 12# 13# The above copyright notice and this permission notice shall be 14# included in all copies or substantial portions of the Software. 15# 16# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 17# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 18# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 19# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 20# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 21# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 22# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 23# SOFTWARE. 24# 25"""Command-line tool for AFTL support for Android Verified Boot images.""" 26 27import abc 28import argparse 29import enum 30import hashlib 31import io 32import multiprocessing 33import os 34import queue 35import struct 36import subprocess 37import sys 38import tempfile 39import time 40 41# This is to work around temporarily with the issue that python3 does not permit 42# relative imports anymore going forward. This adds the proto directory relative 43# to the location of aftltool to the sys.path. 44# TODO(b/154068467): Implement proper importing of generated *_pb2 modules. 45EXEC_PATH = os.path.dirname(os.path.realpath(__file__)) 46sys.path.append(os.path.join(EXEC_PATH, 'proto')) 47 48# pylint: disable=wrong-import-position,import-error 49import avbtool 50import api_pb2 51# pylint: enable=wrong-import-position,import-error 52 53 54class AftlError(Exception): 55 """Application-specific errors. 56 57 These errors represent issues for which a stack-trace should not be 58 presented. 59 60 Attributes: 61 message: Error message. 62 """ 63 64 def __init__(self, message): 65 Exception.__init__(self, message) 66 67 68def rsa_key_read_pem_bytes(key_path): 69 """Reads the bytes out of the passed in PEM file. 70 71 Arguments: 72 key_path: A string containing the path to the PEM file. 73 74 Returns: 75 A bytearray containing the DER encoded bytes in the PEM file. 76 77 Raises: 78 AftlError: If openssl cannot decode the PEM file. 79 """ 80 # Use openssl to decode the PEM file. 81 args = ['openssl', 'rsa', '-in', key_path, '-pubout', '-outform', 'DER'] 82 p = subprocess.Popen(args, 83 stdin=subprocess.PIPE, 84 stdout=subprocess.PIPE, 85 stderr=subprocess.PIPE) 86 (pout, perr) = p.communicate() 87 retcode = p.wait() 88 if retcode != 0: 89 raise AftlError('Error decoding: {}'.format(perr)) 90 return pout 91 92 93def check_signature(log_root, log_root_sig, 94 transparency_log_pub_key): 95 """Validates the signature provided by the transparency log. 96 97 Arguments: 98 log_root: The transparency log_root data structure. 99 log_root_sig: The signature of the transparency log_root data structure. 100 transparency_log_pub_key: The file path to the transparency log public key. 101 102 Returns: 103 True if the signature check passes, otherwise False. 104 """ 105 106 logsig_tmp = tempfile.NamedTemporaryFile() 107 logsig_tmp.write(log_root_sig) 108 logsig_tmp.flush() 109 logroot_tmp = tempfile.NamedTemporaryFile() 110 logroot_tmp.write(log_root) 111 logroot_tmp.flush() 112 113 p = subprocess.Popen(['openssl', 'dgst', '-sha256', '-verify', 114 transparency_log_pub_key, 115 '-signature', logsig_tmp.name, logroot_tmp.name], 116 stdin=subprocess.PIPE, 117 stdout=subprocess.PIPE, 118 stderr=subprocess.PIPE) 119 120 p.communicate() 121 retcode = p.wait() 122 if not retcode: 123 return True 124 return False 125 126 127# AFTL Merkle Tree Functionality 128def rfc6962_hash_leaf(leaf): 129 """RFC6962 hashing function for hashing leaves of a Merkle tree. 130 131 Arguments: 132 leaf: A bytearray containing the Merkle tree leaf to be hashed. 133 134 Returns: 135 A bytearray containing the RFC6962 SHA256 hash of the leaf. 136 """ 137 hasher = hashlib.sha256() 138 # RFC6962 states a '0' byte should be prepended to the data. 139 # This is done in conjunction with the '1' byte for non-leaf 140 # nodes for 2nd preimage attack resistance. 141 hasher.update(b'\x00') 142 hasher.update(leaf) 143 return hasher.digest() 144 145 146def rfc6962_hash_children(l, r): 147 """Calculates the inner Merkle tree node hash of child nodes l and r. 148 149 Arguments: 150 l: A bytearray containing the left child node to be hashed. 151 r: A bytearray containing the right child node to be hashed. 152 153 Returns: 154 A bytearray containing the RFC6962 SHA256 hash of 1|l|r. 155 """ 156 hasher = hashlib.sha256() 157 # RFC6962 states a '1' byte should be prepended to the concatenated data. 158 # This is done in conjunction with the '0' byte for leaf 159 # nodes for 2nd preimage attack resistance. 160 hasher.update(b'\x01') 161 hasher.update(l) 162 hasher.update(r) 163 return hasher.digest() 164 165 166def chain_border_right(seed, proof): 167 """Computes a subtree hash along the left-side tree border. 168 169 Arguments: 170 seed: A bytearray containing the starting hash. 171 proof: A list of bytearrays representing the hashes in the inclusion proof. 172 173 Returns: 174 A bytearray containing the left-side subtree hash. 175 """ 176 for h in proof: 177 seed = rfc6962_hash_children(h, seed) 178 return seed 179 180 181def chain_inner(seed, proof, leaf_index): 182 """Computes a subtree hash on or below the tree's right border. 183 184 Arguments: 185 seed: A bytearray containing the starting hash. 186 proof: A list of bytearrays representing the hashes in the inclusion proof. 187 leaf_index: The current leaf index. 188 189 Returns: 190 A bytearray containing the subtree hash. 191 """ 192 for i, h in enumerate(proof): 193 if leaf_index >> i & 1 == 0: 194 seed = rfc6962_hash_children(seed, h) 195 else: 196 seed = rfc6962_hash_children(h, seed) 197 return seed 198 199 200def root_from_icp(leaf_index, tree_size, proof, leaf_hash): 201 """Calculates the expected Merkle tree root hash. 202 203 Arguments: 204 leaf_index: The current leaf index. 205 tree_size: The number of nodes in the Merkle tree. 206 proof: A list of bytearrays containing the inclusion proof. 207 leaf_hash: A bytearray containing the initial leaf hash. 208 209 Returns: 210 A bytearray containing the calculated Merkle tree root hash. 211 212 Raises: 213 AftlError: If invalid parameters are passed in. 214 """ 215 if leaf_index < 0: 216 raise AftlError('Invalid leaf_index value: {}'.format(leaf_index)) 217 if tree_size < 0: 218 raise AftlError('Invalid tree_size value: {}'.format(tree_size)) 219 if leaf_index >= tree_size: 220 err_str = 'leaf_index cannot be equal or larger than tree_size: {}, {}' 221 raise AftlError(err_str.format(leaf_index, tree_size)) 222 if proof is None: 223 raise AftlError('Inclusion proof not provided.') 224 if leaf_hash is None: 225 raise AftlError('No leaf hash provided.') 226 # Calculate the point to split the proof into two parts. 227 # The split is where the paths to leaves diverge. 228 inner = (leaf_index ^ (tree_size - 1)).bit_length() 229 result = chain_inner(leaf_hash, proof[:inner], leaf_index) 230 result = chain_border_right(result, proof[inner:]) 231 return result 232 233 234class AftlImageHeader(object): 235 """A class for representing the AFTL image header. 236 237 Attributes: 238 magic: Magic for identifying the AftlImage. 239 required_icp_version_major: The major version of AVB that wrote the entry. 240 required_icp_version_minor: The minor version of AVB that wrote the entry. 241 aftl_image_size: Total size of the AftlImage. 242 icp_count: Number of inclusion proofs represented in this structure. 243 """ 244 245 SIZE = 18 # The size of the structure, in bytes 246 MAGIC = b'AFTL' 247 FORMAT_STRING = ('!4s2L' # magic, major & minor version. 248 'L' # AFTL image size. 249 'H') # number of inclusion proof entries. 250 251 def __init__(self, data=None): 252 """Initializes a new AftlImageHeader object. 253 254 Arguments: 255 data: If not None, must be a bytearray of size |SIZE|. 256 257 Raises: 258 AftlError: If invalid structure for AftlImageHeader. 259 """ 260 assert struct.calcsize(self.FORMAT_STRING) == self.SIZE 261 262 if data: 263 (self.magic, self.required_icp_version_major, 264 self.required_icp_version_minor, self.aftl_image_size, 265 self.icp_count) = struct.unpack(self.FORMAT_STRING, data) 266 else: 267 self.magic = self.MAGIC 268 self.required_icp_version_major = avbtool.AVB_VERSION_MAJOR 269 self.required_icp_version_minor = avbtool.AVB_VERSION_MINOR 270 self.aftl_image_size = self.SIZE 271 self.icp_count = 0 272 if not self.is_valid(): 273 raise AftlError('Invalid structure for AftlImageHeader.') 274 275 def encode(self): 276 """Serializes the AftlImageHeader |SIZE| to bytes. 277 278 Returns: 279 The encoded AftlImageHeader as bytes. 280 281 Raises: 282 AftlError: If invalid structure for AftlImageHeader. 283 """ 284 if not self.is_valid(): 285 raise AftlError('Invalid structure for AftlImageHeader') 286 return struct.pack(self.FORMAT_STRING, self.magic, 287 self.required_icp_version_major, 288 self.required_icp_version_minor, 289 self.aftl_image_size, 290 self.icp_count) 291 292 def is_valid(self): 293 """Ensures that values in the AftlImageHeader are sane. 294 295 Returns: 296 True if the values in the AftlImageHeader are sane, False otherwise. 297 """ 298 if self.magic != AftlImageHeader.MAGIC: 299 sys.stderr.write( 300 'AftlImageHeader: magic value mismatch: {}\n' 301 .format(repr(self.magic))) 302 return False 303 304 if self.required_icp_version_major > avbtool.AVB_VERSION_MAJOR: 305 sys.stderr.write('AftlImageHeader: major version mismatch: {}\n'.format( 306 self.required_icp_version_major)) 307 return False 308 309 if self.required_icp_version_minor > avbtool.AVB_VERSION_MINOR: 310 sys.stderr.write('AftlImageHeader: minor version mismatch: {}\n'.format( 311 self.required_icp_version_minor)) 312 return False 313 314 if self.aftl_image_size < self.SIZE: 315 sys.stderr.write('AftlImageHeader: Invalid AFTL image size: {}\n'.format( 316 self.aftl_image_size)) 317 return False 318 319 if self.icp_count < 0 or self.icp_count > 65535: 320 sys.stderr.write( 321 'AftlImageHeader: ICP entry count out of range: {}\n'.format( 322 self.icp_count)) 323 return False 324 return True 325 326 def print_desc(self, o): 327 """Print the AftlImageHeader. 328 329 Arguments: 330 o: The object to write the output to. 331 """ 332 o.write(' AFTL image header:\n') 333 i = ' ' * 4 334 fmt = '{}{:25}{}\n' 335 o.write(fmt.format(i, 'Major version:', self.required_icp_version_major)) 336 o.write(fmt.format(i, 'Minor version:', self.required_icp_version_minor)) 337 o.write(fmt.format(i, 'Image size:', self.aftl_image_size)) 338 o.write(fmt.format(i, 'ICP entries count:', self.icp_count)) 339 340 341class AftlIcpEntry(object): 342 """A class for the transparency log inclusion proof entries. 343 344 The data that represents each of the components of the ICP entry are stored 345 immediately following the ICP entry header. The format is log_url, 346 SignedLogRoot, and inclusion proof hashes. 347 348 Attributes: 349 log_url_size: Length of the string representing the transparency log URL. 350 leaf_index: Leaf index in the transparency log representing this entry. 351 log_root_descriptor_size: Size of the transparency log's SignedLogRoot. 352 annotation_leaf_size: Size of the SignedVBMetaPrimaryAnnotationLeaf passed 353 to the log. 354 log_root_sig_size: Size in bytes of the log_root_signature 355 proof_hash_count: Number of hashes comprising the inclusion proof. 356 inc_proof_size: The total size of the inclusion proof, in bytes. 357 log_url: The URL for the transparency log that generated this inclusion 358 proof. 359 log_root_descriptor: The data comprising the signed tree head structure. 360 annotation_leaf: The data comprising the SignedVBMetaPrimaryAnnotationLeaf 361 leaf. 362 log_root_signature: The data comprising the log root signature. 363 proofs: The hashes comprising the inclusion proof. 364 365 """ 366 SIZE = 27 # The size of the structure, in bytes 367 FORMAT_STRING = ('!L' # transparency log server url size 368 'Q' # leaf index 369 'L' # log root descriptor size 370 'L' # firmware info leaf size 371 'H' # log root signature size 372 'B' # number of hashes in the inclusion proof 373 'L') # size of the inclusion proof in bytes 374 # This header is followed by the log_url, log_root_descriptor, 375 # annotation leaf, log root signature, and the proofs elements. 376 377 def __init__(self, data=None): 378 """Initializes a new ICP entry object. 379 380 Arguments: 381 data: If not None, must be a bytearray of size >= |SIZE|. 382 383 Raises: 384 AftlError: If data does not represent a well-formed AftlIcpEntry. 385 """ 386 # Assert the header structure is of a sane size. 387 assert struct.calcsize(self.FORMAT_STRING) == self.SIZE 388 389 if data: 390 # Deserialize the header from the data. 391 (self._log_url_size_expected, 392 self.leaf_index, 393 self._log_root_descriptor_size_expected, 394 self._annotation_leaf_size_expected, 395 self._log_root_sig_size_expected, 396 self._proof_hash_count_expected, 397 self._inc_proof_size_expected) = struct.unpack(self.FORMAT_STRING, 398 data[0:self.SIZE]) 399 400 # Deserialize ICP entry components from the data. 401 expected_format_string = '{}s{}s{}s{}s{}s'.format( 402 self._log_url_size_expected, 403 self._log_root_descriptor_size_expected, 404 self._annotation_leaf_size_expected, 405 self._log_root_sig_size_expected, 406 self._inc_proof_size_expected) 407 408 (log_url, log_root_descriptor_bytes, annotation_leaf_bytes, 409 self.log_root_signature, proof_bytes) = struct.unpack( 410 expected_format_string, data[self.SIZE:self.get_expected_size()]) 411 412 self.log_url = log_url.decode('ascii') 413 self.log_root_descriptor = TrillianLogRootDescriptor( 414 log_root_descriptor_bytes) 415 416 self.annotation_leaf = SignedVBMetaPrimaryAnnotationLeaf.parse( 417 annotation_leaf_bytes) 418 419 self.proofs = [] 420 if self._proof_hash_count_expected > 0: 421 proof_idx = 0 422 hash_size = (self._inc_proof_size_expected 423 // self._proof_hash_count_expected) 424 for _ in range(self._proof_hash_count_expected): 425 proof = proof_bytes[proof_idx:(proof_idx+hash_size)] 426 self.proofs.append(proof) 427 proof_idx += hash_size 428 else: 429 self.leaf_index = 0 430 self.log_url = '' 431 self.log_root_descriptor = TrillianLogRootDescriptor() 432 self.annotation_leaf = SignedVBMetaPrimaryAnnotationLeaf() 433 self.log_root_signature = b'' 434 self.proofs = [] 435 if not self.is_valid(): 436 raise AftlError('Invalid structure for AftlIcpEntry') 437 438 @property 439 def log_url_size(self): 440 """Gets the size of the log_url attribute.""" 441 if hasattr(self, 'log_url'): 442 return len(self.log_url) 443 return self._log_url_size_expected 444 445 @property 446 def log_root_descriptor_size(self): 447 """Gets the size of the log_root_descriptor attribute.""" 448 if hasattr(self, 'log_root_descriptor'): 449 return self.log_root_descriptor.get_expected_size() 450 return self._log_root_descriptor_size_expected 451 452 @property 453 def annotation_leaf_size(self): 454 """Gets the size of the annotation_leaf attribute.""" 455 if hasattr(self, 'annotation_leaf'): 456 return self.annotation_leaf.get_expected_size() 457 return self._annotation_leaf_size_expected 458 459 @property 460 def log_root_sig_size(self): 461 """Gets the size of the log_root signature.""" 462 if hasattr(self, 'log_root_signature'): 463 return len(self.log_root_signature) 464 return self._log_root_sig_size_expected 465 466 @property 467 def proof_hash_count(self): 468 """Gets the number of proof hashes.""" 469 if hasattr(self, 'proofs'): 470 return len(self.proofs) 471 return self._proof_hash_count_expected 472 473 @property 474 def inc_proof_size(self): 475 """Gets the total size of the proof hashes in bytes.""" 476 if hasattr(self, 'proofs'): 477 result = 0 478 for proof in self.proofs: 479 result += len(proof) 480 return result 481 return self._inc_proof_size_expected 482 483 def verify_icp(self, transparency_log_pub_key): 484 """Verifies the contained inclusion proof given the public log key. 485 486 Arguments: 487 transparency_log_pub_key: The path to the trusted public key for the log. 488 489 Returns: 490 True if the calculated signature matches AftlIcpEntry's. False otherwise. 491 """ 492 if not transparency_log_pub_key: 493 return False 494 495 leaf_hash = rfc6962_hash_leaf(self.annotation_leaf.encode()) 496 calc_root = root_from_icp(self.leaf_index, 497 self.log_root_descriptor.tree_size, 498 self.proofs, 499 leaf_hash) 500 if ((calc_root == self.log_root_descriptor.root_hash) and 501 check_signature( 502 self.log_root_descriptor.encode(), 503 self.log_root_signature, 504 transparency_log_pub_key)): 505 return True 506 return False 507 508 def verify_vbmeta_image(self, vbmeta_image, transparency_log_pub_key): 509 """Verify the inclusion proof for the given VBMeta image. 510 511 Arguments: 512 vbmeta_image: A bytearray with the VBMeta image. 513 transparency_log_pub_key: File path to the PEM file containing the trusted 514 transparency log public key. 515 516 Returns: 517 True if the inclusion proof validates and the vbmeta hash of the given 518 VBMeta image matches the one in the annotation leaf; otherwise False. 519 """ 520 if not vbmeta_image: 521 return False 522 523 # Calculate the hash of the vbmeta image. 524 vbmeta_hash = hashlib.sha256(vbmeta_image).digest() 525 526 # Validates the inclusion proof and then compare the calculated vbmeta_hash 527 # against the one in the inclusion proof. 528 return (self.verify_icp(transparency_log_pub_key) 529 and self.annotation_leaf.annotation.vbmeta_hash == vbmeta_hash) 530 531 def encode(self): 532 """Serializes the header |SIZE| and data to bytes. 533 534 Returns: 535 bytes with the encoded header. 536 537 Raises: 538 AftlError: If invalid entry structure. 539 """ 540 proof_bytes = bytearray() 541 if not self.is_valid(): 542 raise AftlError('Invalid AftlIcpEntry structure') 543 544 expected_format_string = '{}{}s{}s{}s{}s{}s'.format( 545 self.FORMAT_STRING, 546 self.log_url_size, 547 self.log_root_descriptor_size, 548 self.annotation_leaf_size, 549 self.log_root_sig_size, 550 self.inc_proof_size) 551 552 for proof in self.proofs: 553 proof_bytes.extend(proof) 554 555 return struct.pack(expected_format_string, 556 self.log_url_size, self.leaf_index, 557 self.log_root_descriptor_size, self.annotation_leaf_size, 558 self.log_root_sig_size, self.proof_hash_count, 559 self.inc_proof_size, self.log_url.encode('ascii'), 560 self.log_root_descriptor.encode(), 561 self.annotation_leaf.encode(), 562 self.log_root_signature, 563 proof_bytes) 564 565 def translate_response(self, log_url, avbm_response): 566 """Translates an AddVBMetaResponse object to an AftlIcpEntry. 567 568 Arguments: 569 log_url: String representing the transparency log URL. 570 avbm_response: The AddVBMetaResponse object to translate. 571 """ 572 self.log_url = log_url 573 574 # Deserializes from AddVBMetaResponse. 575 proof = avbm_response.annotation_proof 576 self.leaf_index = proof.proof.leaf_index 577 self.log_root_descriptor = TrillianLogRootDescriptor(proof.sth.log_root) 578 self.annotation_leaf = SignedVBMetaPrimaryAnnotationLeaf.parse( 579 avbm_response.annotation_leaf) 580 self.log_root_signature = proof.sth.log_root_signature 581 self.proofs = proof.proof.hashes 582 583 def get_expected_size(self): 584 """Gets the expected size of the full entry out of the header. 585 586 Returns: 587 The expected size of the AftlIcpEntry from the header. 588 """ 589 return (self.SIZE + self.log_url_size + self.log_root_descriptor_size + 590 self.annotation_leaf_size + self.log_root_sig_size + 591 self.inc_proof_size) 592 593 def is_valid(self): 594 """Ensures that values in an AftlIcpEntry structure are sane. 595 596 Returns: 597 True if the values in the AftlIcpEntry are sane, False otherwise. 598 """ 599 if self.leaf_index < 0: 600 sys.stderr.write('ICP entry: leaf index out of range: ' 601 '{}.\n'.format(self.leaf_index)) 602 return False 603 604 if (not self.log_root_descriptor or 605 not isinstance(self.log_root_descriptor, TrillianLogRootDescriptor) or 606 not self.log_root_descriptor.is_valid()): 607 sys.stderr.write('ICP entry: invalid TrillianLogRootDescriptor.\n') 608 return False 609 610 if (not self.annotation_leaf or 611 not isinstance(self.annotation_leaf, Leaf)): 612 sys.stderr.write('ICP entry: invalid Leaf.\n') 613 return False 614 return True 615 616 def print_desc(self, o): 617 """Print the ICP entry. 618 619 Arguments: 620 o: The object to write the output to. 621 """ 622 i = ' ' * 4 623 fmt = '{}{:25}{}\n' 624 o.write(fmt.format(i, 'Transparency Log:', self.log_url)) 625 o.write(fmt.format(i, 'Leaf index:', self.leaf_index)) 626 o.write(' ICP hashes: ') 627 for i, proof_hash in enumerate(self.proofs): 628 if i != 0: 629 o.write(' ' * 29) 630 o.write('{}\n'.format(proof_hash.hex())) 631 self.log_root_descriptor.print_desc(o) 632 self.annotation_leaf.print_desc(o) 633 634 635class TrillianLogRootDescriptor(object): 636 """A class representing the Trillian log_root descriptor. 637 638 Taken from Trillian definitions: 639 https://github.com/google/trillian/blob/master/trillian.proto#L255 640 641 Attributes: 642 version: The version number of the descriptor. Currently only version=1 is 643 supported. 644 tree_size: The size of the tree. 645 root_hash_size: The size of the root hash in bytes. Valid values are between 646 0 and 128. 647 root_hash: The root hash as bytearray(). 648 timestamp: The timestamp in nanoseconds. 649 revision: The revision number as long. 650 metadata_size: The size of the metadata in bytes. Valid values are between 651 0 and 65535. 652 metadata: The metadata as bytearray(). 653 """ 654 FORMAT_STRING_PART_1 = ('!H' # version 655 'Q' # tree_size 656 'B' # root_hash_size 657 ) 658 659 FORMAT_STRING_PART_2 = ('!Q' # timestamp 660 'Q' # revision 661 'H' # metadata_size 662 ) 663 664 def __init__(self, data=None): 665 """Initializes a new TrillianLogRoot descriptor.""" 666 if data: 667 # Parses first part of the log_root descriptor. 668 data_length = struct.calcsize(self.FORMAT_STRING_PART_1) 669 (self.version, self.tree_size, self.root_hash_size) = struct.unpack( 670 self.FORMAT_STRING_PART_1, data[0:data_length]) 671 data = data[data_length:] 672 673 # Parses the root_hash bytes if the size indicates existance. 674 if self.root_hash_size > 0: 675 self.root_hash = data[0:self.root_hash_size] 676 data = data[self.root_hash_size:] 677 else: 678 self.root_hash = b'' 679 680 # Parses second part of the log_root descriptor. 681 data_length = struct.calcsize(self.FORMAT_STRING_PART_2) 682 (self.timestamp, self.revision, self.metadata_size) = struct.unpack( 683 self.FORMAT_STRING_PART_2, data[0:data_length]) 684 data = data[data_length:] 685 686 # Parses the metadata if the size indicates existance. 687 if self.metadata_size > 0: 688 self.metadata = data[0:self.metadata_size] 689 else: 690 self.metadata = b'' 691 else: 692 self.version = 1 693 self.tree_size = 0 694 self.root_hash_size = 0 695 self.root_hash = b'' 696 self.timestamp = 0 697 self.revision = 0 698 self.metadata_size = 0 699 self.metadata = b'' 700 701 if not self.is_valid(): 702 raise AftlError('Invalid structure for TrillianLogRootDescriptor.') 703 704 def get_expected_size(self): 705 """Calculates the expected size of the TrillianLogRootDescriptor. 706 707 Returns: 708 The expected size of the TrillianLogRootDescriptor. 709 """ 710 return (struct.calcsize(self.FORMAT_STRING_PART_1) + self.root_hash_size + 711 struct.calcsize(self.FORMAT_STRING_PART_2) + self.metadata_size) 712 713 def encode(self): 714 """Serializes the TrillianLogDescriptor to a bytearray(). 715 716 Returns: 717 A bytearray() with the encoded header. 718 719 Raises: 720 AftlError: If invalid entry structure. 721 """ 722 if not self.is_valid(): 723 raise AftlError('Invalid structure for TrillianLogRootDescriptor.') 724 725 expected_format_string = '{}{}s{}{}s'.format( 726 self.FORMAT_STRING_PART_1, 727 self.root_hash_size, 728 self.FORMAT_STRING_PART_2[1:], 729 self.metadata_size) 730 731 return struct.pack(expected_format_string, 732 self.version, self.tree_size, self.root_hash_size, 733 self.root_hash, self.timestamp, self.revision, 734 self.metadata_size, self.metadata) 735 736 def is_valid(self): 737 """Ensures that values in the descritor are sane. 738 739 Returns: 740 True if the values are sane; otherwise False. 741 """ 742 cls = self.__class__.__name__ 743 if self.version != 1: 744 sys.stderr.write('{}: Bad version value {}.\n'.format(cls, self.version)) 745 return False 746 if self.tree_size < 0: 747 sys.stderr.write('{}: Bad tree_size value {}.\n'.format(cls, 748 self.tree_size)) 749 return False 750 if self.root_hash_size < 0 or self.root_hash_size > 128: 751 sys.stderr.write('{}: Bad root_hash_size value {}.\n'.format( 752 cls, self.root_hash_size)) 753 return False 754 if len(self.root_hash) != self.root_hash_size: 755 sys.stderr.write('{}: root_hash_size {} does not match with length of ' 756 'root_hash {}.\n'.format(cls, self.root_hash_size, 757 len(self.root_hash))) 758 return False 759 if self.timestamp < 0: 760 sys.stderr.write('{}: Bad timestamp value {}.\n'.format(cls, 761 self.timestamp)) 762 return False 763 if self.revision < 0: 764 sys.stderr.write('{}: Bad revision value {}.\n'.format(cls, 765 self.revision)) 766 return False 767 if self.metadata_size < 0 or self.metadata_size > 65535: 768 sys.stderr.write('{}: Bad metadatasize value {}.\n'.format( 769 cls, self.metadata_size)) 770 return False 771 if len(self.metadata) != self.metadata_size: 772 sys.stderr.write('{}: metadata_size {} does not match with length of' 773 'metadata {}.\n'.format(cls, self.metadata_size, 774 len(self.metadata))) 775 return False 776 return True 777 778 def print_desc(self, o): 779 """Print the TrillianLogRootDescriptor. 780 781 Arguments: 782 o: The object to write the output to. 783 """ 784 o.write(' Log Root Descriptor:\n') 785 i = ' ' * 6 786 fmt = '{}{:23}{}\n' 787 o.write(fmt.format(i, 'Version:', self.version)) 788 o.write(fmt.format(i, 'Tree size:', self.tree_size)) 789 o.write(fmt.format(i, 'Root hash size:', self.root_hash_size)) 790 if self.root_hash_size > 0: 791 o.write(fmt.format(i, 'Root hash:', self.root_hash.hex())) 792 o.write(fmt.format(i, 'Timestamp (ns):', self.timestamp)) 793 o.write(fmt.format(i, 'Revision:', self.revision)) 794 o.write(fmt.format(i, 'Metadata size:', self.metadata_size)) 795 if self.metadata_size > 0: 796 o.write(fmt.format(i, 'Metadata:', self.metadata.hex())) 797 798 799def tls_decode_bytes(byte_size, stream): 800 """Decodes a variable-length vector. 801 802 In the TLS presentation language, a variable-length vector is a pair 803 (size, value). |size| describes the size of the |value| to read 804 in bytes. All values are encoded in big-endian. 805 See https://tools.ietf.org/html/rfc8446#section-3 for more details. 806 807 Arguments: 808 byte_size: A format character as described in the struct module 809 which describes the expected length of the size. For 810 instance, "B", "H", "L" or "Q". 811 stream: a BytesIO which contains the value to decode. 812 813 Returns: 814 A bytes containing the value decoded. 815 816 Raises: 817 AftlError: If |byte_size| is not a known format character, or if not 818 enough data is available to decode the size or the value. 819 """ 820 byte_size_format = "!" + byte_size 821 try: 822 byte_size_length = struct.calcsize(byte_size_format) 823 except struct.error: 824 raise AftlError("Invalid byte_size character: {}. It must be a " 825 "format supported by struct.".format(byte_size)) 826 try: 827 value_size = struct.unpack(byte_size_format, 828 stream.read(byte_size_length))[0] 829 except struct.error: 830 raise AftlError("Not enough data to read size: {}".format(byte_size)) 831 value = stream.read(value_size) 832 if value_size != len(value): 833 raise AftlError("Not enough data to read value: " 834 "{} != {}".format(value_size, len(value))) 835 return value 836 837def tls_encode_bytes(byte_size, value, stream): 838 """Encodes a variable-length vector. 839 840 In the TLS presentation language, a variable-length vector is a pair 841 (size, value). |size| describes the size of the |value| to read 842 in bytes. All values are encoded in big-endian. 843 See https://tools.ietf.org/html/rfc8446#section-3 for more details. 844 845 Arguments: 846 byte_size: A format character as described in the struct module 847 which describes the expected length of the size. For 848 instance, "B", "H", "L" or "Q". 849 value: the value to encode. The length of |value| must be 850 representable with |byte_size|. 851 stream: a BytesIO to which the value is encoded to. 852 853 Raises: 854 AftlError: If |byte_size| is not a known format character, or if 855 |value|'s length cannot be represent with |byte_size|. 856 """ 857 byte_size_format = "!" + byte_size 858 try: 859 stream.write(struct.pack(byte_size_format, len(value))) 860 except struct.error: 861 # Whether byte_size is invalid or not large enough to represent value, 862 # struct returns an struct.error exception. Instead of matching on the 863 # exception message, capture both cases in a generic message. 864 raise AftlError("Invalid byte_size to store {} bytes".format(len(value))) 865 stream.write(value) 866 867class HashAlgorithm(enum.Enum): 868 SHA256 = 0 869 870class SignatureAlgorithm(enum.Enum): 871 RSA = 0 872 ECDSA = 1 873 874class Signature(object): 875 """Represents a signature of some data. 876 877 It is usually made using a manufacturer key and used to sign part of a leaf 878 that belongs to the transparency log. The encoding of this structure must 879 match the server expectation. 880 881 Attributes: 882 hash_algorithm: the HashAlgorithm used for the signature. 883 signature_algorithm: the SignatureAlgorithm used. 884 signature: the raw signature in bytes. 885 """ 886 FORMAT_STRING = ('!B' # Hash algorithm 887 'B' # Signing algorithm 888 ) 889 # Followed by the raw signature, encoded as a TLS variable-length vector 890 # which size is represented using 2 bytes. 891 892 def __init__(self, hash_algorithm=HashAlgorithm.SHA256, 893 signature_algorithm=SignatureAlgorithm.RSA, signature=b''): 894 self.hash_algorithm = hash_algorithm 895 self.signature_algorithm = signature_algorithm 896 self.signature = signature 897 898 @classmethod 899 def parse(cls, stream): 900 """Parses a TLS-encoded structure and returns a new Signature. 901 902 Arguments: 903 stream: a BytesIO to read the signature from. 904 905 Returns: 906 A new Signature object. 907 908 Raises: 909 AftlError: If the hash algorithm or signature algorithm value is 910 unknown; or if the decoding failed. 911 """ 912 data_length = struct.calcsize(cls.FORMAT_STRING) 913 (hash_algorithm, signature_algorithm) = struct.unpack( 914 cls.FORMAT_STRING, stream.read(data_length)) 915 try: 916 hash_algorithm = HashAlgorithm(hash_algorithm) 917 except ValueError: 918 raise AftlError('unknown hash algorithm: {}'.format(hash_algorithm)) 919 try: 920 signature_algorithm = SignatureAlgorithm(signature_algorithm) 921 except ValueError: 922 raise AftlError('unknown signature algorithm: {}'.format( 923 signature_algorithm)) 924 signature = tls_decode_bytes('H', stream) 925 return Signature(hash_algorithm, signature_algorithm, signature) 926 927 def get_expected_size(self): 928 """Returns the size of the encoded Signature.""" 929 return struct.calcsize(self.FORMAT_STRING) + \ 930 struct.calcsize('H') + len(self.signature) 931 932 def encode(self, stream): 933 """Encodes the Signature. 934 935 Arguments: 936 stream: a BytesIO to which the signature is written. 937 """ 938 stream.write(struct.pack(self.FORMAT_STRING, self.hash_algorithm.value, 939 self.signature_algorithm.value)) 940 tls_encode_bytes('H', self.signature, stream) 941 942class VBMetaPrimaryAnnotation(object): 943 """An annotation that contains metadata about a VBMeta image. 944 945 Attributes: 946 vbmeta_hash: the SHA256 of the VBMeta it references. 947 version_incremental: the version incremental of the build, as string. 948 manufacturer_key_hash: the hash of the manufacturer key that will 949 sign this annotation. 950 description: a free-form field. 951 """ 952 953 def __init__(self, vbmeta_hash=b'', version_incremental='', 954 manufacturer_key_hash=b'', description=''): 955 """Default constructor.""" 956 self.vbmeta_hash = vbmeta_hash 957 self.version_incremental = version_incremental 958 self.manufacturer_key_hash = manufacturer_key_hash 959 self.description = description 960 961 @classmethod 962 def parse(cls, stream): 963 """Parses a VBMetaPrimaryAnnotation from data. 964 965 Arguments: 966 stream: an io.BytesIO to decode the annotation from. 967 968 Returns: 969 A new VBMetaPrimaryAnnotation. 970 971 Raises: 972 AftlError: If an error occured while parsing the annotation. 973 """ 974 vbmeta_hash = tls_decode_bytes("B", stream) 975 version_incremental = tls_decode_bytes("B", stream) 976 try: 977 version_incremental = version_incremental.decode("ascii") 978 except UnicodeError: 979 raise AftlError('Failed to convert version incremental to an ASCII' 980 'string') 981 manufacturer_key_hash = tls_decode_bytes("B", stream) 982 description = tls_decode_bytes("H", stream) 983 try: 984 description = description.decode("utf-8") 985 except UnicodeError: 986 raise AftlError('Failed to convert description to an UTF-8 string') 987 return cls(vbmeta_hash, version_incremental, manufacturer_key_hash, 988 description) 989 990 def sign(self, manufacturer_key_path, signing_helper=None, 991 signing_helper_with_files=None): 992 """Signs the annotation. 993 994 Arguments: 995 manufacturer_key_path: Path to key used to sign messages sent to the 996 transparency log servers. 997 signing_helper: Program which signs a hash and returns a signature. 998 signing_helper_with_files: Same as signing_helper but uses files instead. 999 1000 Returns: 1001 A new SignedVBMetaPrimaryAnnotation. 1002 1003 Raises: 1004 AftlError: If an error occured while signing the annotation. 1005 """ 1006 # AFTL supports SHA256_RSA4096 for now, more will be available. 1007 algorithm_name = 'SHA256_RSA4096' 1008 encoded_leaf = io.BytesIO() 1009 self.encode(encoded_leaf) 1010 try: 1011 rsa_key = avbtool.RSAPublicKey(manufacturer_key_path) 1012 raw_signature = rsa_key.sign(algorithm_name, encoded_leaf.getvalue(), 1013 signing_helper, signing_helper_with_files) 1014 except avbtool.AvbError as e: 1015 raise AftlError('Failed to sign VBMetaPrimaryAnnotation with ' 1016 '--manufacturer_key: {}'.format(e)) 1017 signature = Signature(hash_algorithm=HashAlgorithm.SHA256, 1018 signature_algorithm=SignatureAlgorithm.RSA, 1019 signature=raw_signature) 1020 return SignedVBMetaPrimaryAnnotation(signature=signature, annotation=self) 1021 1022 def encode(self, stream): 1023 """Encodes the VBMetaPrimaryAnnotation. 1024 1025 Arguments: 1026 stream: a BytesIO to which the signature is written. 1027 1028 Raises: 1029 AftlError: If the encoding failed. 1030 """ 1031 tls_encode_bytes("B", self.vbmeta_hash, stream) 1032 try: 1033 tls_encode_bytes("B", self.version_incremental.encode("ascii"), stream) 1034 except UnicodeError: 1035 raise AftlError('Unable to encode version incremental to ASCII') 1036 tls_encode_bytes("B", self.manufacturer_key_hash, stream) 1037 try: 1038 tls_encode_bytes("H", self.description.encode("utf-8"), stream) 1039 except UnicodeError: 1040 raise AftlError('Unable to encode description to UTF-8') 1041 1042 def get_expected_size(self): 1043 """Returns the size of the encoded annotation.""" 1044 b = io.BytesIO() 1045 self.encode(b) 1046 return len(b.getvalue()) 1047 1048 def print_desc(self, o): 1049 """Print the VBMetaPrimaryAnnotation. 1050 1051 Arguments: 1052 o: The object to write the output to. 1053 """ 1054 o.write(' VBMeta Primary Annotation:\n') 1055 i = ' ' * 8 1056 fmt = '{}{:23}{}\n' 1057 if self.vbmeta_hash: 1058 o.write(fmt.format(i, 'VBMeta hash:', self.vbmeta_hash.hex())) 1059 if self.version_incremental: 1060 o.write(fmt.format(i, 'Version incremental:', self.version_incremental)) 1061 if self.manufacturer_key_hash: 1062 o.write(fmt.format(i, 'Manufacturer key hash:', 1063 self.manufacturer_key_hash.hex())) 1064 if self.description: 1065 o.write(fmt.format(i, 'Description:', self.description)) 1066 1067 1068class SignedVBMetaPrimaryAnnotation(object): 1069 """A Signed VBMetaPrimaryAnnotation. 1070 1071 Attributes: 1072 signature: a Signature. 1073 annotation: a VBMetaPrimaryAnnotation. 1074 """ 1075 1076 def __init__(self, signature=None, annotation=None): 1077 """Default constructor.""" 1078 if not signature: 1079 signature = Signature() 1080 self.signature = signature 1081 if not annotation: 1082 annotation = VBMetaPrimaryAnnotation() 1083 self.annotation = annotation 1084 1085 @classmethod 1086 def parse(cls, stream): 1087 """Parses a signed annotation.""" 1088 signature = Signature.parse(stream) 1089 annotation = VBMetaPrimaryAnnotation.parse(stream) 1090 return cls(signature, annotation) 1091 1092 def get_expected_size(self): 1093 """Returns the size of the encoded signed annotation.""" 1094 return self.signature.get_expected_size() + \ 1095 self.annotation.get_expected_size() 1096 1097 def encode(self, stream): 1098 """Encodes the SignedVBMetaPrimaryAnnotation. 1099 1100 Arguments: 1101 stream: a BytesIO to which the object is written. 1102 1103 Raises: 1104 AftlError: If the encoding failed. 1105 """ 1106 self.signature.encode(stream) 1107 self.annotation.encode(stream) 1108 1109 def print_desc(self, o): 1110 """Prints the annotation. 1111 1112 Arguments: 1113 o: The object to write the output to. 1114 """ 1115 self.annotation.print_desc(o) 1116 1117class Leaf(abc.ABC): 1118 """An abstract class to represent the leaves in the transparency log.""" 1119 FORMAT_STRING = ('!B' # Version 1120 'Q' # Timestamp 1121 'B' # LeafType 1122 ) 1123 1124 class LeafType(enum.Enum): 1125 VBMetaType = 0 1126 SignedVBMetaPrimaryAnnotationType = 1 1127 1128 def __init__(self, version=1, timestamp=0, leaf_type=LeafType.VBMetaType): 1129 """Build a new leaf.""" 1130 self.version = version 1131 self.timestamp = timestamp 1132 self.leaf_type = leaf_type 1133 1134 @classmethod 1135 def _parse_header(cls, stream): 1136 """Parses the header of a leaf. 1137 1138 This is called with the parse method of the subclasses. 1139 1140 Arguments: 1141 stream: a BytesIO to read the header from. 1142 1143 Returns: 1144 A tuple (version, timestamp, leaf_type). 1145 1146 Raises: 1147 AftlError: If the header cannot be decoded; or if the leaf type is 1148 unknown. 1149 """ 1150 data_length = struct.calcsize(cls.FORMAT_STRING) 1151 try: 1152 (version, timestamp, leaf_type) = struct.unpack( 1153 cls.FORMAT_STRING, stream.read(data_length)) 1154 except struct.error: 1155 raise AftlError("Not enough data to parse leaf header") 1156 try: 1157 leaf_type = cls.LeafType(leaf_type) 1158 except ValueError: 1159 raise AftlError("Unknown leaf type: {}".format(leaf_type)) 1160 return version, timestamp, leaf_type 1161 1162 @classmethod 1163 @abc.abstractmethod 1164 def parse(cls, data): 1165 """Parses a leaf and returned a new object. 1166 1167 This abstract method must be implemented by the subclass. It may use 1168 _parse_header to parse the common fields. 1169 1170 Arguments: 1171 data: a bytes-like object. 1172 1173 Returns: 1174 An object of the type of the particular subclass. 1175 1176 Raises: 1177 AftlError: If the leaf type is incorrect; or if the decoding failed. 1178 """ 1179 1180 @abc.abstractmethod 1181 def encode(self): 1182 """Encodes a leaf. 1183 1184 This abstract method must be implemented by the subclass. It may use 1185 _encode_header to encode the common fields. 1186 1187 Returns: 1188 A bytes with the encoded leaf. 1189 1190 Raises: 1191 AftlError: If the encoding failed. 1192 """ 1193 1194 def _get_expected_header_size(self): 1195 """Returns the size of the leaf header.""" 1196 return struct.calcsize(self.FORMAT_STRING) 1197 1198 def _encode_header(self, stream): 1199 """Encodes the header of the leaf. 1200 1201 This method is called by the encode method in the subclass. 1202 1203 Arguments: 1204 stream: a BytesIO to which the object is written. 1205 1206 Raises: 1207 AftlError: If the encoding failed. 1208 """ 1209 try: 1210 stream.write(struct.pack(self.FORMAT_STRING, self.version, self.timestamp, 1211 self.leaf_type.value)) 1212 except struct.error: 1213 raise AftlError('Unable to encode the leaf header') 1214 1215 def print_desc(self, o): 1216 """Prints the leaf header. 1217 1218 Arguments: 1219 o: The object to write the output to. 1220 """ 1221 i = ' ' * 6 1222 fmt = '{}{:23}{}\n' 1223 o.write(fmt.format(i, 'Version:', self.version)) 1224 o.write(fmt.format(i, 'Timestamp:', self.timestamp)) 1225 o.write(fmt.format(i, 'Type:', self.leaf_type)) 1226 1227 1228class SignedVBMetaPrimaryAnnotationLeaf(Leaf): 1229 """A Signed VBMetaPrimaryAnnotation leaf.""" 1230 1231 def __init__(self, version=1, timestamp=0, 1232 signed_vbmeta_primary_annotation=None): 1233 """Builds a new Signed VBMeta Primary Annotation leaf.""" 1234 super(SignedVBMetaPrimaryAnnotationLeaf, self).__init__( 1235 version=version, timestamp=timestamp, 1236 leaf_type=self.LeafType.SignedVBMetaPrimaryAnnotationType) 1237 if not signed_vbmeta_primary_annotation: 1238 signed_vbmeta_primary_annotation = SignedVBMetaPrimaryAnnotation() 1239 self.signed_vbmeta_primary_annotation = signed_vbmeta_primary_annotation 1240 1241 @property 1242 def annotation(self): 1243 """Returns the VBMetaPrimaryAnnotation contained in the leaf.""" 1244 return self.signed_vbmeta_primary_annotation.annotation 1245 1246 @property 1247 def signature(self): 1248 """Returns the Signature contained in the leaf.""" 1249 return self.signed_vbmeta_primary_annotation.signature 1250 1251 @classmethod 1252 def parse(cls, data): 1253 """Parses an encoded contained in data. 1254 1255 Arguments: 1256 data: a bytes-like object. 1257 1258 Returns: 1259 A SignedVBMetaPrimaryAnnotationLeaf. 1260 1261 Raises: 1262 AftlError if the leaf type is incorrect; or if the decoding failed. 1263 """ 1264 encoded_leaf = io.BytesIO(data) 1265 version, timestamp, leaf_type = Leaf._parse_header(encoded_leaf) 1266 if leaf_type != Leaf.LeafType.SignedVBMetaPrimaryAnnotationType: 1267 raise AftlError("Incorrect leaf type") 1268 signed_annotation = SignedVBMetaPrimaryAnnotation.parse(encoded_leaf) 1269 return cls(version=version, timestamp=timestamp, 1270 signed_vbmeta_primary_annotation=signed_annotation) 1271 1272 def get_expected_size(self): 1273 """Returns the size of the leaf.""" 1274 size = self._get_expected_header_size() 1275 if self.signed_vbmeta_primary_annotation: 1276 size += self.signed_vbmeta_primary_annotation.get_expected_size() 1277 return size 1278 1279 def encode(self): 1280 """Encodes the leaf. 1281 1282 Returns: 1283 bytes which contains the encoded leaf. 1284 1285 Raises: 1286 AftlError: If the encoding failed. 1287 """ 1288 stream = io.BytesIO() 1289 self._encode_header(stream) 1290 self.signed_vbmeta_primary_annotation.encode(stream) 1291 return stream.getvalue() 1292 1293 def print_desc(self, o): 1294 """Prints the leaf. 1295 1296 Arguments: 1297 o: The object to write the output to. 1298 """ 1299 i = ' ' * 4 1300 fmt = '{}{:25}{}\n' 1301 o.write(fmt.format(i, 'Leaf:', '')) 1302 super(SignedVBMetaPrimaryAnnotationLeaf, self).print_desc(o) 1303 self.signed_vbmeta_primary_annotation.print_desc(o) 1304 1305 1306class AftlImage(object): 1307 """A class for the AFTL image, which contains the transparency log ICPs. 1308 1309 This encapsulates an AFTL ICP section with all information required to 1310 validate an inclusion proof. 1311 1312 Attributes: 1313 image_header: A header for the section. 1314 icp_entries: A list of AftlIcpEntry objects representing the inclusion 1315 proofs. 1316 """ 1317 1318 def __init__(self, data=None): 1319 """Initializes a new AftlImage section. 1320 1321 Arguments: 1322 data: If not None, must be a bytearray representing an AftlImage. 1323 1324 Raises: 1325 AftlError: If the data does not represent a well-formed AftlImage. 1326 """ 1327 if data: 1328 image_header_bytes = data[0:AftlImageHeader.SIZE] 1329 self.image_header = AftlImageHeader(image_header_bytes) 1330 if not self.image_header.is_valid(): 1331 raise AftlError('Invalid AftlImageHeader.') 1332 icp_count = self.image_header.icp_count 1333 1334 # Jump past the header for entry deserialization. 1335 icp_index = AftlImageHeader.SIZE 1336 # Validate each entry. 1337 self.icp_entries = [] 1338 # add_icp_entry() updates entries and header, so set header count to 1339 # compensate. 1340 self.image_header.icp_count = 0 1341 for i in range(icp_count): 1342 # Get the entry header from the AftlImage. 1343 cur_icp_entry = AftlIcpEntry(data[icp_index:]) 1344 cur_icp_entry_size = cur_icp_entry.get_expected_size() 1345 # Now validate the entry structure. 1346 if not cur_icp_entry.is_valid(): 1347 raise AftlError('Validation of ICP entry {} failed.'.format(i)) 1348 self.add_icp_entry(cur_icp_entry) 1349 icp_index += cur_icp_entry_size 1350 else: 1351 self.image_header = AftlImageHeader() 1352 self.icp_entries = [] 1353 if not self.is_valid(): 1354 raise AftlError('Invalid AftlImage.') 1355 1356 def add_icp_entry(self, icp_entry): 1357 """Adds a new AftlIcpEntry to the AftlImage, updating fields as needed. 1358 1359 Arguments: 1360 icp_entry: An AftlIcpEntry structure. 1361 """ 1362 self.icp_entries.append(icp_entry) 1363 self.image_header.icp_count += 1 1364 self.image_header.aftl_image_size += icp_entry.get_expected_size() 1365 1366 def verify_vbmeta_image(self, vbmeta_image, transparency_log_pub_keys): 1367 """Verifies the contained inclusion proof given the public log key. 1368 1369 Arguments: 1370 vbmeta_image: The vbmeta_image that should be verified against the 1371 inclusion proof. 1372 transparency_log_pub_keys: List of paths to PEM files containing trusted 1373 public keys that correspond with the transparency_logs. 1374 1375 Returns: 1376 True if all the inclusion proofs in the AfltDescriptor validate, are 1377 signed by one of the give transparency log public keys; otherwise false. 1378 """ 1379 if not transparency_log_pub_keys or not self.icp_entries: 1380 return False 1381 1382 icp_verified = 0 1383 for icp_entry in self.icp_entries: 1384 verified = False 1385 for pub_key in transparency_log_pub_keys: 1386 if icp_entry.verify_vbmeta_image(vbmeta_image, pub_key): 1387 verified = True 1388 break 1389 if verified: 1390 icp_verified += 1 1391 return icp_verified == len(self.icp_entries) 1392 1393 def encode(self): 1394 """Serialize the AftlImage to a bytearray(). 1395 1396 Returns: 1397 A bytearray() with the encoded AFTL image. 1398 1399 Raises: 1400 AftlError: If invalid AFTL image structure. 1401 """ 1402 # The header and entries are guaranteed to be valid when encode is called. 1403 # Check the entire structure as a whole. 1404 if not self.is_valid(): 1405 raise AftlError('Invalid AftlImage structure.') 1406 1407 aftl_image = bytearray() 1408 aftl_image.extend(self.image_header.encode()) 1409 for icp_entry in self.icp_entries: 1410 aftl_image.extend(icp_entry.encode()) 1411 return aftl_image 1412 1413 def is_valid(self): 1414 """Ensures that values in the AftlImage are sane. 1415 1416 Returns: 1417 True if the values in the AftlImage are sane, False otherwise. 1418 """ 1419 if not self.image_header.is_valid(): 1420 return False 1421 1422 if self.image_header.icp_count != len(self.icp_entries): 1423 return False 1424 1425 for icp_entry in self.icp_entries: 1426 if not icp_entry.is_valid(): 1427 return False 1428 return True 1429 1430 def print_desc(self, o): 1431 """Print the AFTL image. 1432 1433 Arguments: 1434 o: The object to write the output to. 1435 """ 1436 o.write('Android Firmware Transparency Image:\n') 1437 self.image_header.print_desc(o) 1438 for i, icp_entry in enumerate(self.icp_entries): 1439 o.write(' Entry #{}:\n'.format(i + 1)) 1440 icp_entry.print_desc(o) 1441 1442 1443class AftlCommunication(object): 1444 """Class to abstract the communication layer with the transparency log.""" 1445 1446 def __init__(self, transparency_log_config, timeout): 1447 """Initializes the object. 1448 1449 Arguments: 1450 transparency_log_config: A TransparencyLogConfig instance. 1451 timeout: Duration in seconds before requests to the AFTL times out. A 1452 value of 0 or None means there will be no timeout. 1453 """ 1454 self.transparency_log_config = transparency_log_config 1455 if timeout: 1456 self.timeout = timeout 1457 else: 1458 self.timeout = None 1459 1460 def add_vbmeta(self, request): 1461 """Calls the AddVBMeta RPC on the AFTL server. 1462 1463 Arguments: 1464 request: An AddVBMetaRequest message. 1465 1466 Returns: 1467 An AddVBMetaResponse message. 1468 1469 Raises: 1470 AftlError: If grpc or the proto modules cannot be loaded, if there is an 1471 error communicating with the log. 1472 """ 1473 raise NotImplementedError( 1474 'add_vbmeta() needs to be implemented by subclass.') 1475 1476 1477class AftlGrpcCommunication(AftlCommunication): 1478 """Class that implements GRPC communication to the AFTL server.""" 1479 1480 def add_vbmeta(self, request): 1481 """Calls the AddVBMeta RPC on the AFTL server. 1482 1483 Arguments: 1484 request: An AddVBMetaRequest message. 1485 1486 Returns: 1487 An AddVBMetaResponse message. 1488 1489 Raises: 1490 AftlError: If grpc or the proto modules cannot be loaded, if there is an 1491 error communicating with the log. 1492 """ 1493 # Import grpc now to avoid global dependencies as it otherwise breaks 1494 # running unittest with atest. 1495 try: 1496 import grpc # pylint: disable=import-outside-toplevel 1497 from proto import api_pb2_grpc # pylint: disable=import-outside-toplevel 1498 except ImportError as e: 1499 err_str = 'grpc can be installed with python pip install grpcio.\n' 1500 raise AftlError('Failed to import module: ({}).\n{}'.format(e, err_str)) 1501 1502 # Set up the gRPC channel with the transparency log. 1503 sys.stdout.write('Preparing to request inclusion proof from {}. This could ' 1504 'take ~30 seconds for the process to complete.\n'.format( 1505 self.transparency_log_config.target)) 1506 channel = grpc.insecure_channel(self.transparency_log_config.target) 1507 stub = api_pb2_grpc.AFTLogStub(channel) 1508 1509 metadata = [] 1510 if self.transparency_log_config.api_key: 1511 metadata.append(('x-api-key', self.transparency_log_config.api_key)) 1512 1513 # Attempt to transmit to the transparency log. 1514 sys.stdout.write('ICP is about to be requested from transparency log ' 1515 'with domain {}.\n'.format( 1516 self.transparency_log_config.target)) 1517 try: 1518 response = stub.AddVBMeta(request, timeout=self.timeout, 1519 metadata=metadata) 1520 except grpc.RpcError as e: 1521 raise AftlError('Error: grpc failure ({})'.format(e)) 1522 return response 1523 1524 1525class Aftl(avbtool.Avb): 1526 """Business logic for aftltool command-line tool.""" 1527 1528 def get_vbmeta_image(self, image_filename): 1529 """Gets the VBMeta struct bytes from image. 1530 1531 Arguments: 1532 image_filename: Image file to get information from. 1533 1534 Returns: 1535 A tuple with following elements: 1536 1. A bytearray with the vbmeta structure or None if the file does not 1537 contain a VBMeta structure. 1538 2. The VBMeta image footer. 1539 """ 1540 # Reads and parses the vbmeta image. 1541 try: 1542 image = avbtool.ImageHandler(image_filename, read_only=True) 1543 except (IOError, ValueError) as e: 1544 sys.stderr.write('The image does not contain a valid VBMeta structure: ' 1545 '{}.\n'.format(e)) 1546 return None, None 1547 1548 try: 1549 (footer, header, _, _) = self._parse_image(image) 1550 except avbtool.AvbError as e: 1551 sys.stderr.write('The image cannot be parsed: {}.\n'.format(e)) 1552 return None, None 1553 1554 # Seeks for the start of the vbmeta image and calculates its size. 1555 offset = 0 1556 if footer: 1557 offset = footer.vbmeta_offset 1558 vbmeta_image_size = (offset + header.SIZE 1559 + header.authentication_data_block_size 1560 + header.auxiliary_data_block_size) 1561 1562 # Reads the vbmeta image bytes. 1563 try: 1564 image.seek(offset) 1565 except RuntimeError as e: 1566 sys.stderr.write('Given vbmeta image offset is invalid: {}.\n'.format(e)) 1567 return None, None 1568 return image.read(vbmeta_image_size), footer 1569 1570 def get_aftl_image(self, image_filename): 1571 """Gets the AftlImage from image. 1572 1573 Arguments: 1574 image_filename: Image file to get information from. 1575 1576 Returns: 1577 An AftlImage or None if the file does not contain a AftlImage. 1578 """ 1579 # Reads the vbmeta image bytes. 1580 vbmeta_image, _ = self.get_vbmeta_image(image_filename) 1581 if not vbmeta_image: 1582 return None 1583 1584 try: 1585 image = avbtool.ImageHandler(image_filename, read_only=True) 1586 except ValueError as e: 1587 sys.stderr.write('The image does not contain a valid VBMeta structure: ' 1588 '{}.\n'.format(e)) 1589 return None 1590 1591 # Seeks for the start of the AftlImage. 1592 try: 1593 image.seek(len(vbmeta_image)) 1594 except RuntimeError as e: 1595 sys.stderr.write('Given AftlImage image offset is invalid: {}.\n' 1596 .format(e)) 1597 return None 1598 1599 # Parses the header for the AftlImage size. 1600 tmp_header_bytes = image.read(AftlImageHeader.SIZE) 1601 if not tmp_header_bytes or len(tmp_header_bytes) != AftlImageHeader.SIZE: 1602 sys.stderr.write('This image does not contain an AftlImage.\n') 1603 return None 1604 1605 try: 1606 tmp_header = AftlImageHeader(tmp_header_bytes) 1607 except AftlError as e: 1608 sys.stderr.write('This image does not contain a valid AftlImage: ' 1609 '{}.\n'.format(e)) 1610 return None 1611 1612 # Resets to the beginning of the AftlImage. 1613 try: 1614 image.seek(len(vbmeta_image)) 1615 except RuntimeError as e: 1616 sys.stderr.write('Given AftlImage image offset is invalid: {}.\n' 1617 .format(e)) 1618 return None 1619 1620 # Parses the full AftlImage. 1621 aftl_image_bytes = image.read(tmp_header.aftl_image_size) 1622 aftl_image = None 1623 try: 1624 aftl_image = AftlImage(aftl_image_bytes) 1625 except AftlError as e: 1626 sys.stderr.write('The image does not contain a valid AftlImage: ' 1627 '{}.\n'.format(e)) 1628 return aftl_image 1629 1630 def info_image_icp(self, vbmeta_image_path, output): 1631 """Implements the 'info_image_icp' command. 1632 1633 Arguments: 1634 vbmeta_image_path: Image file to get information from. 1635 output: Output file to write human-readable information to (file object). 1636 1637 Returns: 1638 True if the given image has an AftlImage and could successfully 1639 be processed; otherwise False. 1640 """ 1641 aftl_image = self.get_aftl_image(vbmeta_image_path) 1642 if not aftl_image: 1643 return False 1644 aftl_image.print_desc(output) 1645 return True 1646 1647 def verify_image_icp(self, vbmeta_image_path, transparency_log_pub_keys, 1648 output): 1649 """Implements the 'verify_image_icp' command. 1650 1651 Arguments: 1652 vbmeta_image_path: Image file to get information from. 1653 transparency_log_pub_keys: List of paths to PEM files containing trusted 1654 public keys that correspond with the transparency_logs. 1655 output: Output file to write human-readable information to (file object). 1656 1657 Returns: 1658 True if for the given image the inclusion proof validates; otherwise 1659 False. 1660 """ 1661 vbmeta_image, _ = self.get_vbmeta_image(vbmeta_image_path) 1662 aftl_image = self.get_aftl_image(vbmeta_image_path) 1663 if not aftl_image or not vbmeta_image: 1664 return False 1665 verified = aftl_image.verify_vbmeta_image(vbmeta_image, 1666 transparency_log_pub_keys) 1667 if not verified: 1668 output.write('The inclusion proofs for the image do not validate.\n') 1669 return False 1670 output.write('The inclusion proofs for the image successfully validate.\n') 1671 return True 1672 1673 def request_inclusion_proof(self, transparency_log_config, vbmeta_image, 1674 version_inc, manufacturer_key_path, 1675 signing_helper, signing_helper_with_files, 1676 timeout, aftl_comms=None): 1677 """Packages and sends a request to the specified transparency log. 1678 1679 Arguments: 1680 transparency_log_config: A TransparencyLogConfig instance. 1681 vbmeta_image: A bytearray with the VBMeta image. 1682 version_inc: Subcomponent of the build fingerprint. 1683 manufacturer_key_path: Path to key used to sign messages sent to the 1684 transparency log servers. 1685 signing_helper: Program which signs a hash and returns a signature. 1686 signing_helper_with_files: Same as signing_helper but uses files instead. 1687 timeout: Duration in seconds before requests to the transparency log 1688 timeout. 1689 aftl_comms: A subclass of the AftlCommunication class. The default is 1690 to use AftlGrpcCommunication. 1691 1692 Returns: 1693 An AftlIcpEntry with the inclusion proof for the log entry. 1694 1695 Raises: 1696 AftlError: If grpc or the proto modules cannot be loaded, if there is an 1697 error communicating with the log, if the manufacturer_key_path 1698 cannot be decoded, or if the log submission cannot be signed. 1699 """ 1700 # Calculate the hash of the vbmeta image. 1701 vbmeta_hash = hashlib.sha256(vbmeta_image).digest() 1702 1703 # Extract the key data from the PEM file if of size 4096. 1704 manufacturer_key = avbtool.RSAPublicKey(manufacturer_key_path) 1705 if manufacturer_key.num_bits != 4096: 1706 raise AftlError('Manufacturer keys not of size 4096: {}'.format( 1707 manufacturer_key.num_bits)) 1708 manufacturer_key_data = rsa_key_read_pem_bytes(manufacturer_key_path) 1709 1710 # Calculate the hash of the manufacturer key data. 1711 m_key_hash = hashlib.sha256(manufacturer_key_data).digest() 1712 1713 # Build VBMetaPrimaryAnnotation with that data. 1714 annotation = VBMetaPrimaryAnnotation( 1715 vbmeta_hash=vbmeta_hash, version_incremental=version_inc, 1716 manufacturer_key_hash=m_key_hash) 1717 1718 # Sign annotation and add it to the request. 1719 signed_annotation = annotation.sign( 1720 manufacturer_key_path, signing_helper=signing_helper, 1721 signing_helper_with_files=signing_helper_with_files) 1722 1723 encoded_signed_annotation = io.BytesIO() 1724 signed_annotation.encode(encoded_signed_annotation) 1725 request = api_pb2.AddVBMetaRequest( 1726 vbmeta=vbmeta_image, 1727 signed_vbmeta_primary_annotation=encoded_signed_annotation.getvalue()) 1728 1729 # Submit signed VBMeta annotation to the server. 1730 if not aftl_comms: 1731 aftl_comms = AftlGrpcCommunication(transparency_log_config, timeout) 1732 response = aftl_comms.add_vbmeta(request) 1733 1734 # Return an AftlIcpEntry representing this response. 1735 icp_entry = AftlIcpEntry() 1736 icp_entry.translate_response(transparency_log_config.target, response) 1737 return icp_entry 1738 1739 def make_icp_from_vbmeta(self, vbmeta_image_path, output, 1740 signing_helper, signing_helper_with_files, 1741 version_incremental, transparency_log_configs, 1742 manufacturer_key, padding_size, timeout): 1743 """Generates a vbmeta image with inclusion proof given a vbmeta image. 1744 1745 The AftlImage contains the information required to validate an inclusion 1746 proof for a specific vbmeta image. It consists of a header (struct 1747 AftlImageHeader) and zero or more entry structures (struct AftlIcpEntry) 1748 that contain the vbmeta leaf hash, tree size, root hash, inclusion proof 1749 hashes, and the signature for the root hash. 1750 1751 The vbmeta image, its hash, the version_incremental part of the build 1752 fingerprint, and the hash of the manufacturer key are sent to the 1753 transparency log, with the message signed by the manufacturer key. 1754 An inclusion proof is calculated and returned. This inclusion proof is 1755 then packaged in an AftlImage structure. The existing vbmeta data is 1756 copied to a new file, appended with the AftlImage data, and written to 1757 output. Validation of the inclusion proof does not require 1758 communication with the transparency log. 1759 1760 Arguments: 1761 vbmeta_image_path: Path to a vbmeta image file. 1762 output: File to write the results to. 1763 signing_helper: Program which signs a hash and returns a signature. 1764 signing_helper_with_files: Same as signing_helper but uses files instead. 1765 version_incremental: A string representing the subcomponent of the 1766 build fingerprint used to identify the vbmeta in the transparency log. 1767 transparency_log_configs: List of TransparencyLogConfig used to request 1768 the inclusion proofs. 1769 manufacturer_key: Path to PEM file containting the key file used to sign 1770 messages sent to the transparency log servers. 1771 padding_size: If not 0, pads output so size is a multiple of the number. 1772 timeout: Duration in seconds before requests to the AFTL times out. A 1773 value of 0 or None means there will be no timeout. 1774 1775 Returns: 1776 True if the inclusion proofs could be fetched from the transparency log 1777 servers and could be successfully validated; otherwise False. 1778 """ 1779 # Retrieves vbmeta structure from given partition image. 1780 vbmeta_image, footer = self.get_vbmeta_image(vbmeta_image_path) 1781 1782 # Fetches inclusion proofs for vbmeta structure from all transparency logs. 1783 aftl_image = AftlImage() 1784 for log_config in transparency_log_configs: 1785 try: 1786 icp_entry = self.request_inclusion_proof(log_config, vbmeta_image, 1787 version_incremental, 1788 manufacturer_key, 1789 signing_helper, 1790 signing_helper_with_files, 1791 timeout) 1792 if not icp_entry.verify_vbmeta_image(vbmeta_image, log_config.pub_key): 1793 sys.stderr.write('The inclusion proof from {} could not be verified.' 1794 '\n'.format(log_config.target)) 1795 aftl_image.add_icp_entry(icp_entry) 1796 except AftlError as e: 1797 # The inclusion proof request failed. Continue and see if others will. 1798 sys.stderr.write('Requesting inclusion proof failed: {}.\n'.format(e)) 1799 continue 1800 1801 # Checks that the resulting AftlImage is sane. 1802 if aftl_image.image_header.icp_count != len(transparency_log_configs): 1803 sys.stderr.write('Valid inclusion proofs could only be retrieved from {} ' 1804 'out of {} transparency logs.\n' 1805 .format(aftl_image.image_header.icp_count, 1806 len(transparency_log_configs))) 1807 return False 1808 if not aftl_image.is_valid(): 1809 sys.stderr.write('Resulting AftlImage structure is malformed.\n') 1810 return False 1811 keys = [log.pub_key for log in transparency_log_configs] 1812 if not aftl_image.verify_vbmeta_image(vbmeta_image, keys): 1813 sys.stderr.write('Resulting AftlImage inclusion proofs do not ' 1814 'validate.\n') 1815 return False 1816 1817 # Writes original VBMeta image, followed by the AftlImage into the output. 1818 if footer: # Checks if it is a chained partition. 1819 # TODO(b/147217370): Determine the best way to handle chained partitions 1820 # like the system.img. Currently, we only put the main vbmeta.img in the 1821 # transparency log. 1822 sys.stderr.write('Image has a footer and ICP for this format is not ' 1823 'implemented.\n') 1824 return False 1825 1826 output.seek(0) 1827 output.write(vbmeta_image) 1828 encoded_aftl_image = aftl_image.encode() 1829 output.write(encoded_aftl_image) 1830 1831 if padding_size > 0: 1832 total_image_size = len(vbmeta_image) + len(encoded_aftl_image) 1833 padded_size = avbtool.round_to_multiple(total_image_size, padding_size) 1834 padding_needed = padded_size - total_image_size 1835 output.write('\0' * padding_needed) 1836 1837 sys.stdout.write('VBMeta image with AFTL image successfully created.\n') 1838 return True 1839 1840 def _load_test_process_function(self, vbmeta_image_path, 1841 transparency_log_config, 1842 manufacturer_key, 1843 process_number, submission_count, 1844 preserve_icp_images, timeout, result_queue): 1845 """Function to be used by multiprocessing.Process. 1846 1847 Arguments: 1848 vbmeta_image_path: Path to a vbmeta image file. 1849 transparency_log_config: A TransparencyLogConfig instance used to request 1850 an inclusion proof. 1851 manufacturer_key: Path to PEM file containting the key file used to sign 1852 messages sent to the transparency log servers. 1853 process_number: The number of the processes executing the function. 1854 submission_count: Number of total submissions to perform per 1855 process_count. 1856 preserve_icp_images: Boolean to indicate if the generated vbmeta image 1857 files with inclusion proofs should be preserved in the temporary 1858 directory. 1859 timeout: Duration in seconds before requests to the AFTL times out. A 1860 value of 0 or None means there will be no timeout. 1861 result_queue: Multiprocessing.Queue object for posting execution results. 1862 """ 1863 for count in range(0, submission_count): 1864 version_incremental = 'aftl_load_testing_{}_{}'.format(process_number, 1865 count) 1866 output_file = os.path.join(tempfile.gettempdir(), 1867 '{}_icp.img'.format(version_incremental)) 1868 output = open(output_file, 'wb') 1869 1870 # Instrumented section. 1871 start_time = time.time() 1872 result = self.make_icp_from_vbmeta( 1873 vbmeta_image_path=vbmeta_image_path, 1874 output=output, 1875 signing_helper=None, 1876 signing_helper_with_files=None, 1877 version_incremental=version_incremental, 1878 transparency_log_configs=[transparency_log_config], 1879 manufacturer_key=manufacturer_key, 1880 padding_size=0, 1881 timeout=timeout) 1882 end_time = time.time() 1883 1884 output.close() 1885 if not preserve_icp_images: 1886 os.unlink(output_file) 1887 1888 # Puts the result onto the result queue. 1889 execution_time = end_time - start_time 1890 result_queue.put((start_time, end_time, execution_time, 1891 version_incremental, result)) 1892 1893 def load_test_aftl(self, vbmeta_image_path, output, transparency_log_config, 1894 manufacturer_key, 1895 process_count, submission_count, stats_filename, 1896 preserve_icp_images, timeout): 1897 """Performs multi-threaded load test on a given AFTL and prints stats. 1898 1899 Arguments: 1900 vbmeta_image_path: Path to a vbmeta image file. 1901 output: File to write the report to. 1902 transparency_log_config: A TransparencyLogConfig used to request an 1903 inclusion proof. 1904 manufacturer_key: Path to PEM file containting the key file used to sign 1905 messages sent to the transparency log servers. 1906 process_count: Number of processes used for parallel testing. 1907 submission_count: Number of total submissions to perform per 1908 process_count. 1909 stats_filename: Path to the stats file to write the raw execution data to. 1910 If None, it will be written to the temp directory. 1911 preserve_icp_images: Boolean to indicate if the generated vbmeta 1912 image files with inclusion proofs should preserved. 1913 timeout: Duration in seconds before requests to the AFTL times out. A 1914 value of 0 or None means there will be no timeout. 1915 1916 Returns: 1917 True if the load tested succeeded without errors; otherwise False. 1918 """ 1919 if process_count < 1 or submission_count < 1: 1920 sys.stderr.write('Values for --processes/--submissions ' 1921 'must be at least 1.\n') 1922 return False 1923 1924 if not stats_filename: 1925 stats_filename = os.path.join( 1926 tempfile.gettempdir(), 1927 'load_test_p{}_s{}.csv'.format(process_count, submission_count)) 1928 1929 stats_file = None 1930 try: 1931 stats_file = open(stats_filename, 'wt') 1932 stats_file.write('start_time,end_time,execution_time,version_incremental,' 1933 'result\n') 1934 except IOError as e: 1935 sys.stderr.write('Could not open stats file {}: {}.\n' 1936 .format(stats_file, e)) 1937 return False 1938 1939 # Launch all the processes with their workloads. 1940 result_queue = multiprocessing.Queue() 1941 processes = set() 1942 execution_times = [] 1943 results = [] 1944 for i in range(0, process_count): 1945 p = multiprocessing.Process( 1946 target=self._load_test_process_function, 1947 args=(vbmeta_image_path, transparency_log_config, 1948 manufacturer_key, i, submission_count, 1949 preserve_icp_images, timeout, result_queue)) 1950 p.start() 1951 processes.add(p) 1952 1953 while processes: 1954 # Processes the results queue and writes these to a stats file. 1955 try: 1956 (start_time, end_time, execution_time, version_incremental, 1957 result) = result_queue.get(timeout=1) 1958 stats_file.write('{},{},{},{},{}\n'.format(start_time, end_time, 1959 execution_time, 1960 version_incremental, result)) 1961 execution_times.append(execution_time) 1962 results.append(result) 1963 except queue.Empty: 1964 pass 1965 1966 # Checks if processes are still alive; if not clean them up. join() would 1967 # have been nicer but we want to continously stream out the stats to file. 1968 for p in processes.copy(): 1969 if not p.is_alive(): 1970 processes.remove(p) 1971 1972 # Prepares stats. 1973 executions = sorted(execution_times) 1974 execution_count = len(execution_times) 1975 median = 0 1976 1977 # pylint: disable=old-division 1978 if execution_count % 2 == 0: 1979 median = (executions[execution_count // 2 - 1] 1980 + executions[execution_count // 2]) / 2 1981 else: 1982 median = executions[execution_count // 2] 1983 1984 # Outputs the stats report. 1985 o = output 1986 o.write('Load testing results:\n') 1987 o.write(' Processes: {}\n'.format(process_count)) 1988 o.write(' Submissions per process: {}\n'.format(submission_count)) 1989 o.write('\n') 1990 o.write(' Submissions:\n') 1991 o.write(' Total: {}\n'.format(len(executions))) 1992 o.write(' Succeeded: {}\n'.format(results.count(True))) 1993 o.write(' Failed: {}\n'.format(results.count(False))) 1994 o.write('\n') 1995 o.write(' Submission execution durations:\n') 1996 o.write(' Average: {:.2f} sec\n'.format( 1997 sum(execution_times) / execution_count)) 1998 o.write(' Median: {:.2f} sec\n'.format(median)) 1999 o.write(' Min: {:.2f} sec\n'.format(min(executions))) 2000 o.write(' Max: {:.2f} sec\n'.format(max(executions))) 2001 2002 # Close the stats file. 2003 stats_file.close() 2004 if results.count(False): 2005 return False 2006 return True 2007 2008 2009class TransparencyLogConfig(object): 2010 """Class that gathers the fields representing a transparency log. 2011 2012 Attributes: 2013 target: The hostname and port of the server in hostname:port format. 2014 pub_key: A PEM file that contains the public key of the transparency 2015 log server. 2016 api_key: The API key to use to interact with the transparency log 2017 server. 2018 """ 2019 2020 @staticmethod 2021 def from_argument(arg): 2022 """Build an object from a command line argument string. 2023 2024 Arguments: 2025 arg: The transparency log as passed in the command line argument. 2026 It must be in the format: host:port,key_file[,api_key]. 2027 2028 Returns: 2029 The TransparencyLogConfig instance. 2030 2031 Raises: 2032 argparse.ArgumentTypeError: If the format of arg is invalid. 2033 """ 2034 api_key = None 2035 try: 2036 target, pub_key, *rest = arg.split(",", maxsplit=2) 2037 except ValueError: 2038 raise argparse.ArgumentTypeError("incorrect format for transparency log " 2039 "server, expected " 2040 "host:port,publickey_file.") 2041 if not target: 2042 raise argparse.ArgumentTypeError("incorrect format for transparency log " 2043 "server: host:port cannot be empty.") 2044 if not pub_key: 2045 raise argparse.ArgumentTypeError("incorrect format for transparency log " 2046 "server: publickey_file cannot be " 2047 "empty.") 2048 if rest: 2049 api_key = rest[0] 2050 return TransparencyLogConfig(target, pub_key, api_key) 2051 2052 def __init__(self, target, pub_key, api_key=None): 2053 """Initializes a new TransparencyLogConfig object.""" 2054 self.target = target 2055 self.pub_key = pub_key 2056 self.api_key = api_key 2057 2058 2059class AftlTool(avbtool.AvbTool): 2060 """Object for aftltool command-line tool.""" 2061 2062 def __init__(self): 2063 """Initializer method.""" 2064 self.aftl = Aftl() 2065 super(AftlTool, self).__init__() 2066 2067 def make_icp_from_vbmeta(self, args): 2068 """Implements the 'make_icp_from_vbmeta' sub-command.""" 2069 args = self._fixup_common_args(args) 2070 return self.aftl.make_icp_from_vbmeta(args.vbmeta_image_path, 2071 args.output, 2072 args.signing_helper, 2073 args.signing_helper_with_files, 2074 args.version_incremental, 2075 args.transparency_log_servers, 2076 args.manufacturer_key, 2077 args.padding_size, 2078 args.timeout) 2079 2080 def info_image_icp(self, args): 2081 """Implements the 'info_image_icp' sub-command.""" 2082 return self.aftl.info_image_icp(args.vbmeta_image_path.name, args.output) 2083 2084 def verify_image_icp(self, args): 2085 """Implements the 'verify_image_icp' sub-command.""" 2086 return self.aftl.verify_image_icp(args.vbmeta_image_path.name, 2087 args.transparency_log_pub_keys, 2088 args.output) 2089 2090 def load_test_aftl(self, args): 2091 """Implements the 'load_test_aftl' sub-command.""" 2092 return self.aftl.load_test_aftl(args.vbmeta_image_path, 2093 args.output, 2094 args.transparency_log_server, 2095 args.manufacturer_key, 2096 args.processes, 2097 args.submissions, 2098 args.stats_file, 2099 args.preserve_icp_images, 2100 args.timeout) 2101 2102 def run(self, argv): 2103 """Command-line processor. 2104 2105 Arguments: 2106 argv: Pass sys.argv from main. 2107 """ 2108 parser = argparse.ArgumentParser() 2109 subparsers = parser.add_subparsers(title='subcommands') 2110 2111 # Command: make_icp_from_vbmeta 2112 sub_parser = subparsers.add_parser('make_icp_from_vbmeta', 2113 help='Makes an ICP enhanced vbmeta image' 2114 ' from an existing vbmeta image.') 2115 sub_parser.add_argument('--output', 2116 help='Output file name.', 2117 type=argparse.FileType('wb'), 2118 default=sys.stdout) 2119 sub_parser.add_argument('--vbmeta_image_path', 2120 help='Path to a generate vbmeta image file.', 2121 required=True) 2122 sub_parser.add_argument('--version_incremental', 2123 help='Current build ID.', 2124 required=True) 2125 sub_parser.add_argument('--manufacturer_key', 2126 help='Path to the PEM file containing the ' 2127 'manufacturer key for use with the log.', 2128 required=True) 2129 sub_parser.add_argument('--transparency_log_servers', 2130 help='List of transparency log servers in ' 2131 'host:port,publickey_file[,api_key] format. The ' 2132 'publickey_file must be in the PEM format.', 2133 nargs='+', type=TransparencyLogConfig.from_argument) 2134 sub_parser.add_argument('--padding_size', 2135 metavar='NUMBER', 2136 help='If non-zero, pads output with NUL bytes so ' 2137 'its size is a multiple of NUMBER (default: 0)', 2138 type=avbtool.parse_number, 2139 default=0) 2140 sub_parser.add_argument('--timeout', 2141 metavar='SECONDS', 2142 help='Timeout in seconds for transparency log ' 2143 'requests (default: 600 sec). A value of 0 means ' 2144 'no timeout.', 2145 type=avbtool.parse_number, 2146 default=600) 2147 self._add_common_args(sub_parser) 2148 sub_parser.set_defaults(func=self.make_icp_from_vbmeta) 2149 2150 # Command: info_image_icp 2151 sub_parser = subparsers.add_parser( 2152 'info_image_icp', 2153 help='Show information about AFTL ICPs in vbmeta or footer.') 2154 sub_parser.add_argument('--vbmeta_image_path', 2155 help='Path to vbmeta image for AFTL information.', 2156 type=argparse.FileType('rb'), 2157 required=True) 2158 sub_parser.add_argument('--output', 2159 help='Write info to file', 2160 type=argparse.FileType('wt'), 2161 default=sys.stdout) 2162 sub_parser.set_defaults(func=self.info_image_icp) 2163 2164 # Arguments for verify_image_icp. 2165 sub_parser = subparsers.add_parser( 2166 'verify_image_icp', 2167 help='Verify AFTL ICPs in vbmeta or footer.') 2168 2169 sub_parser.add_argument('--vbmeta_image_path', 2170 help='Image to verify the inclusion proofs.', 2171 type=argparse.FileType('rb'), 2172 required=True) 2173 sub_parser.add_argument('--transparency_log_pub_keys', 2174 help='Paths to PEM files containing transparency ' 2175 'log server key(s). This must not be None.', 2176 nargs='*', 2177 required=True) 2178 sub_parser.add_argument('--output', 2179 help='Write info to file', 2180 type=argparse.FileType('wt'), 2181 default=sys.stdout) 2182 sub_parser.set_defaults(func=self.verify_image_icp) 2183 2184 # Command: load_test_aftl 2185 sub_parser = subparsers.add_parser( 2186 'load_test_aftl', 2187 help='Perform load testing against one AFTL log server. Note: This MUST' 2188 ' not be performed against a production system.') 2189 sub_parser.add_argument('--vbmeta_image_path', 2190 help='Path to a generate vbmeta image file.', 2191 required=True) 2192 sub_parser.add_argument('--output', 2193 help='Write report to file.', 2194 type=argparse.FileType('wt'), 2195 default=sys.stdout) 2196 sub_parser.add_argument('--manufacturer_key', 2197 help='Path to the PEM file containing the ' 2198 'manufacturer key for use with the log.', 2199 required=True) 2200 sub_parser.add_argument('--transparency_log_server', 2201 help='Transparency log server to test against in ' 2202 'host:port,publickey_file[,api_key] format. The ' 2203 'publickey_file must be in the PEM format.', 2204 required=True, 2205 type=TransparencyLogConfig.from_argument) 2206 sub_parser.add_argument('--processes', 2207 help='Number of parallel processes to use for ' 2208 'testing (default: 1).', 2209 type=avbtool.parse_number, 2210 default=1) 2211 sub_parser.add_argument('--submissions', 2212 help='Number of submissions to perform against the ' 2213 'log per process (default: 1).', 2214 type=avbtool.parse_number, 2215 default=1) 2216 sub_parser.add_argument('--stats_file', 2217 help='Path to the stats file to write the raw ' 2218 'execution data to (Default: ' 2219 'load_test_p[processes]_s[submissions].csv.') 2220 sub_parser.add_argument('--preserve_icp_images', 2221 help='Boolean flag to indicate if the generated ' 2222 'vbmeta image files with inclusion proofs should ' 2223 'preserved.', 2224 action='store_true') 2225 sub_parser.add_argument('--timeout', 2226 metavar='SECONDS', 2227 help='Timeout in seconds for transparency log ' 2228 'requests (default: 0). A value of 0 means ' 2229 'no timeout.', 2230 type=avbtool.parse_number, 2231 default=0) 2232 sub_parser.set_defaults(func=self.load_test_aftl) 2233 2234 args = parser.parse_args(argv[1:]) 2235 if not 'func' in args: 2236 # This error gets raised when the command line tool is called without any 2237 # arguments. It mimics the original Python 2 behavior. 2238 parser.print_usage() 2239 print('aftltool: error: too few arguments') 2240 sys.exit(2) 2241 try: 2242 success = args.func(args) 2243 except AftlError as e: 2244 # Signals to calling tools that an unhandled exception occured. 2245 sys.stderr.write('Unhandled AftlError occured: {}\n'.format(e)) 2246 sys.exit(2) 2247 2248 if not success: 2249 # Signals to calling tools that the command has failed. 2250 sys.exit(1) 2251 2252if __name__ == '__main__': 2253 tool = AftlTool() 2254 tool.run(sys.argv) 2255