1#!/usr/bin/env python 2 3# Copyright 2020, The Android Open Source Project 4# 5# Permission is hereby granted, free of charge, to any person 6# obtaining a copy of this software and associated documentation 7# files (the "Software"), to deal in the Software without 8# restriction, including without limitation the rights to use, copy, 9# modify, merge, publish, distribute, sublicense, and/or sell copies 10# of the Software, and to permit persons to whom the Software is 11# furnished to do so, subject to the following conditions: 12# 13# The above copyright notice and this permission notice shall be 14# included in all copies or substantial portions of the Software. 15# 16# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 17# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 18# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 19# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 20# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 21# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 22# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 23# SOFTWARE. 24# 25"""Command-line tool for AFTL support for Android Verified Boot images.""" 26 27from __future__ import division 28 29import argparse 30import base64 31import binascii 32import hashlib 33import json 34import multiprocessing 35import os 36import Queue # pylint: disable=bad-python3-import 37import struct 38import subprocess 39import sys 40import tempfile 41import time 42 43 44import avbtool 45from proto import aftl_pb2 46from proto import api_pb2 47from proto.crypto import sigpb 48 49 50class AftlError(Exception): 51 """Application-specific errors. 52 53 These errors represent issues for which a stack-trace should not be 54 presented. 55 56 Attributes: 57 message: Error message. 58 """ 59 60 def __init__(self, message): 61 Exception.__init__(self, message) 62 63 64def rsa_key_read_pem_bytes(key_path): 65 """Reads the bytes out of the passed in PEM file. 66 67 Arguments: 68 key_path: A string containing the path to the PEM file. 69 70 Returns: 71 A bytearray containing the DER encoded bytes in the PEM file. 72 73 Raises: 74 AftlError: If openssl cannot decode the PEM file. 75 """ 76 # Use openssl to decode the PEM file. 77 args = ['openssl', 'rsa', '-in', key_path, '-pubout', '-outform', 'DER'] 78 p = subprocess.Popen(args, 79 stdin=subprocess.PIPE, 80 stdout=subprocess.PIPE, 81 stderr=subprocess.PIPE) 82 (pout, perr) = p.communicate() 83 retcode = p.wait() 84 if retcode != 0: 85 raise AftlError('Error decoding: {}'.format(perr)) 86 return bytearray(pout) 87 88 89def check_signature(log_root, log_root_sig, 90 transparency_log_pub_key): 91 """Validates the signature provided by the transparency log. 92 93 Arguments: 94 log_root: The transparency log_root data structure. 95 log_root_sig: The signature of the transparency log_root data structure. 96 transparency_log_pub_key: The file path to the transparency log public key. 97 98 Returns: 99 True if the signature check passes, otherwise False. 100 """ 101 102 logsig_tmp = tempfile.NamedTemporaryFile() 103 logsig_tmp.write(log_root_sig) 104 logsig_tmp.flush() 105 logroot_tmp = tempfile.NamedTemporaryFile() 106 logroot_tmp.write(log_root) 107 logroot_tmp.flush() 108 109 p = subprocess.Popen(['openssl', 'dgst', '-sha256', '-verify', 110 transparency_log_pub_key, 111 '-signature', logsig_tmp.name, logroot_tmp.name], 112 stdin=subprocess.PIPE, 113 stdout=subprocess.PIPE, 114 stderr=subprocess.PIPE) 115 116 p.communicate() 117 retcode = p.wait() 118 if not retcode: 119 return True 120 return False 121 122 123# AFTL Merkle Tree Functionality 124def rfc6962_hash_leaf(leaf): 125 """RFC6962 hashing function for hashing leaves of a Merkle tree. 126 127 Arguments: 128 leaf: A bytearray containing the Merkle tree leaf to be hashed. 129 130 Returns: 131 A bytearray containing the RFC6962 SHA256 hash of the leaf. 132 """ 133 hasher = hashlib.sha256() 134 # RFC6962 states a '0' byte should be prepended to the data. 135 # This is done in conjunction with the '1' byte for non-leaf 136 # nodes for 2nd preimage attack resistance. 137 hasher.update(b'\x00') 138 hasher.update(leaf) 139 return hasher.digest() 140 141 142def rfc6962_hash_children(l, r): 143 """Calculates the inner Merkle tree node hash of child nodes l and r. 144 145 Arguments: 146 l: A bytearray containing the left child node to be hashed. 147 r: A bytearray containing the right child node to be hashed. 148 149 Returns: 150 A bytearray containing the RFC6962 SHA256 hash of 1|l|r. 151 """ 152 hasher = hashlib.sha256() 153 # RFC6962 states a '1' byte should be prepended to the concatenated data. 154 # This is done in conjunction with the '0' byte for leaf 155 # nodes for 2nd preimage attack resistance. 156 hasher.update(b'\x01') 157 hasher.update(l) 158 hasher.update(r) 159 return hasher.digest() 160 161 162def chain_border_right(seed, proof): 163 """Computes a subtree hash along the left-side tree border. 164 165 Arguments: 166 seed: A bytearray containing the starting hash. 167 proof: A list of bytearrays representing the hashes in the inclusion proof. 168 169 Returns: 170 A bytearray containing the left-side subtree hash. 171 """ 172 for h in proof: 173 seed = rfc6962_hash_children(h, seed) 174 return seed 175 176 177def chain_inner(seed, proof, leaf_index): 178 """Computes a subtree hash on or below the tree's right border. 179 180 Arguments: 181 seed: A bytearray containing the starting hash. 182 proof: A list of bytearrays representing the hashes in the inclusion proof. 183 leaf_index: The current leaf index. 184 185 Returns: 186 A bytearray containing the subtree hash. 187 """ 188 for i, h in enumerate(proof): 189 if leaf_index >> i & 1 == 0: 190 seed = rfc6962_hash_children(seed, h) 191 else: 192 seed = rfc6962_hash_children(h, seed) 193 return seed 194 195 196def root_from_icp(leaf_index, tree_size, proof, leaf_hash): 197 """Calculates the expected Merkle tree root hash. 198 199 Arguments: 200 leaf_index: The current leaf index. 201 tree_size: The number of nodes in the Merkle tree. 202 proof: A list of bytearrays containing the inclusion proof. 203 leaf_hash: A bytearray containing the initial leaf hash. 204 205 Returns: 206 A bytearray containing the calculated Merkle tree root hash. 207 208 Raises: 209 AftlError: If invalid parameters are passed in. 210 """ 211 if leaf_index < 0: 212 raise AftlError('Invalid leaf_index value: {}'.format(leaf_index)) 213 if tree_size < 0: 214 raise AftlError('Invalid tree_size value: {}'.format(tree_size)) 215 if leaf_index >= tree_size: 216 err_str = 'leaf_index cannot be equal or larger than tree_size: {}, {}' 217 raise AftlError(err_str.format(leaf_index, tree_size)) 218 if proof is None: 219 raise AftlError('Inclusion proof not provided.') 220 if leaf_hash is None: 221 raise AftlError('No leaf hash provided.') 222 # Calculate the point to split the proof into two parts. 223 # The split is where the paths to leaves diverge. 224 inner = (leaf_index ^ (tree_size - 1)).bit_length() 225 result = chain_inner(leaf_hash, proof[:inner], leaf_index) 226 result = chain_border_right(result, proof[inner:]) 227 return result 228 229 230class AftlIcpHeader(object): 231 """A class for the transparency log inclusion proof header. 232 233 Attributes: 234 magic: Magic for identifying the ICP header. 235 required_icp_version_major: The major version of AVB that wrote the entry. 236 required_icp_version_minor: The minor version of AVB that wrote the entry. 237 aftl_descriptor_size: Total size of the header's AftlDescriptor. 238 icp_count: Number of inclusion proofs represented in this structure. 239 """ 240 241 SIZE = 18 # The size of the structure, in bytes 242 MAGIC = 'AFTL' 243 FORMAT_STRING = ('!4s2L' # magic, major & minor version 244 'L' # descriptor size 245 'H') # number of inclusion proof entries 246 247 def __init__(self, data=None): 248 """Initializes a new transparency header object. 249 250 Arguments: 251 data: If not None, must be a bytearray of size |SIZE|. 252 253 Raises: 254 AftlError: If invalid structure for AftlIcpHeader. 255 """ 256 assert struct.calcsize(self.FORMAT_STRING) == self.SIZE 257 258 if data: 259 (self.magic, self.required_icp_version_major, 260 self.required_icp_version_minor, self.aftl_descriptor_size, 261 self.icp_count) = struct.unpack(self.FORMAT_STRING, data) 262 else: 263 self.magic = self.MAGIC 264 self.required_icp_version_major = avbtool.AVB_VERSION_MAJOR 265 self.required_icp_version_minor = avbtool.AVB_VERSION_MINOR 266 self.aftl_descriptor_size = self.SIZE 267 self.icp_count = 0 268 if not self.is_valid(): 269 raise AftlError('Invalid structure for AftlIcpHeader') 270 271 def save(self, output): 272 """Serializes the transparency header |SIZE| to disk. 273 274 Arguments: 275 output: The object to write the header to. 276 277 Raises: 278 AftlError: If invalid structure for AftlIcpHeader. 279 """ 280 output.write(self.encode()) 281 282 def encode(self): 283 """Serializes the header |SIZE| to a bytearray(). 284 285 Returns: 286 A bytearray() with the encoded header. 287 288 Raises: 289 AftlError: If invalid structure for AftlIcpHeader. 290 """ 291 if not self.is_valid(): 292 raise AftlError('Invalid structure for AftlIcpHeader') 293 return struct.pack(self.FORMAT_STRING, self.magic, 294 self.required_icp_version_major, 295 self.required_icp_version_minor, 296 self.aftl_descriptor_size, 297 self.icp_count) 298 299 def is_valid(self): 300 """Ensures that values in an AftlIcpHeader structure are sane. 301 302 Returns: 303 True if the values in the AftlIcpHeader are sane, False otherwise. 304 """ 305 if self.magic != AftlIcpHeader.MAGIC: 306 sys.stderr.write( 307 'ICP Header: magic value mismatch: {}\n'.format(self.magic)) 308 return False 309 310 if self.required_icp_version_major > avbtool.AVB_VERSION_MAJOR: 311 sys.stderr.write('ICP header: major version mismatch: {}\n'.format( 312 self.required_icp_version_major)) 313 return False 314 315 if self.required_icp_version_minor > avbtool.AVB_VERSION_MINOR: 316 sys.stderr.write('ICP header: minor version mismatch: {}\n'.format( 317 self.required_icp_version_minor)) 318 return False 319 320 if self.aftl_descriptor_size < self.SIZE: 321 sys.stderr.write('ICP Header: Invalid descriptor size: {}\n'.format( 322 self.aftl_descriptor_size)) 323 return False 324 325 if self.icp_count < 0 or self.icp_count > 65535: 326 sys.stderr.write( 327 'ICP header: ICP entry count out of range: {}\n'.format( 328 self.icp_count)) 329 return False 330 return True 331 332 def print_desc(self, o): 333 """Print the descriptor. 334 335 Arguments: 336 o: The object to write the output to. 337 """ 338 i = ' ' * 4 339 fmt = '{}{:25}{}\n' 340 o.write(fmt.format(i, 'Major version:', self.required_icp_version_major)) 341 o.write(fmt.format(i, 'Minor version:', self.required_icp_version_minor)) 342 o.write(fmt.format(i, 'Descriptor size:', self.aftl_descriptor_size)) 343 o.write(fmt.format(i, 'ICP entries count:', self.icp_count)) 344 345 346class AftlIcpEntry(object): 347 """A class for the transparency log inclusion proof entries. 348 349 The data that represents each of the components of the ICP entry are stored 350 immediately following the ICP entry header. The format is log_url, 351 SignedLogRoot, and inclusion proof hashes. 352 353 Attributes: 354 log_url_size: Length of the string representing the transparency log URL. 355 leaf_index: Leaf index in the transparency log representing this entry. 356 log_root_descriptor_size: Size of the transparency log's SignedLogRoot. 357 fw_info_leaf_size: Size of the FirmwareInfo leaf passed to the log. 358 log_root_sig_size: Size in bytes of the log_root_signature 359 proof_hash_count: Number of hashes comprising the inclusion proof. 360 inc_proof_size: The total size of the inclusion proof, in bytes. 361 log_url: The URL for the transparency log that generated this inclusion 362 proof. 363 log_root_descriptor: The data comprising the signed tree head structure. 364 fw_info_leaf: The data comprising the FirmwareInfo leaf. 365 log_root_signature: The data comprising the log root signature. 366 proofs: The hashes comprising the inclusion proof. 367 368 """ 369 SIZE = 27 # The size of the structure, in bytes 370 FORMAT_STRING = ('!L' # transparency log server url size 371 'Q' # leaf index 372 'L' # log root descriptor size 373 'L' # firmware info leaf size 374 'H' # log root signature size 375 'B' # number of hashes in the inclusion proof 376 'L') # size of the inclusion proof in bytes 377 # These are used to capture the log_url, log_root_descriptor, 378 # fw_info leaf, log root signature, and the proofs elements for the 379 # encode & save functions. 380 381 def __init__(self, data=None): 382 """Initializes a new ICP entry object. 383 384 Arguments: 385 data: If not None, must be a bytearray of size >= |SIZE|. 386 387 Raises: 388 AftlError: If data does not represent a well-formed AftlIcpEntry. 389 """ 390 # Assert the header structure is of a sane size. 391 assert struct.calcsize(self.FORMAT_STRING) == self.SIZE 392 393 if data: 394 # Deserialize the header from the data descriptor. 395 (self._log_url_size_expected, 396 self.leaf_index, 397 self._log_root_descriptor_size_expected, 398 self._fw_info_leaf_size_expected, 399 self._log_root_sig_size_expected, 400 self._proof_hash_count_expected, 401 self._inc_proof_size_expected) = struct.unpack(self.FORMAT_STRING, 402 data[0:self.SIZE]) 403 404 # Deserialize ICP entry components from the data. 405 expected_format_string = '{}s{}s{}s{}s{}s'.format( 406 self._log_url_size_expected, 407 self._log_root_descriptor_size_expected, 408 self._fw_info_leaf_size_expected, 409 self._log_root_sig_size_expected, 410 self._inc_proof_size_expected) 411 412 (self.log_url, log_root_descriptor_bytes, fw_info_leaf_bytes, 413 self.log_root_signature, proof_bytes) = struct.unpack( 414 expected_format_string, data[self.SIZE:self.get_expected_size()]) 415 416 self.log_root_descriptor = TrillianLogRootDescriptor( 417 log_root_descriptor_bytes) 418 self.fw_info_leaf = FirmwareInfoLeaf(fw_info_leaf_bytes) 419 420 self.proofs = [] 421 if self._proof_hash_count_expected > 0: 422 proof_idx = 0 423 hash_size = self._inc_proof_size_expected // self._proof_hash_count_expected 424 for _ in range(self._proof_hash_count_expected): 425 proof = proof_bytes[proof_idx:(proof_idx+hash_size)] 426 self.proofs.append(proof) 427 proof_idx += hash_size 428 else: 429 self.leaf_index = 0 430 self.log_url = '' 431 self.log_root_descriptor = TrillianLogRootDescriptor() 432 self.fw_info_leaf = FirmwareInfoLeaf() 433 self.log_root_signature = '' 434 self.proofs = [] 435 if not self.is_valid(): 436 raise AftlError('Invalid structure for AftlIcpEntry') 437 438 @property 439 def log_url_size(self): 440 """Gets the size of the log_url attribute.""" 441 if hasattr(self, 'log_url'): 442 return len(self.log_url) 443 return self._log_url_size_expected 444 445 @property 446 def log_root_descriptor_size(self): 447 """Gets the size of the log_root_descriptor attribute.""" 448 if hasattr(self, 'log_root_descriptor'): 449 return self.log_root_descriptor.get_expected_size() 450 return self._log_root_descriptor_size_expected 451 452 @property 453 def fw_info_leaf_size(self): 454 """Gets the size of the fw_info_leaf attribute.""" 455 if hasattr(self, 'fw_info_leaf'): 456 return self.fw_info_leaf.get_expected_size() 457 return self._fw_info_leaf_size_expected 458 459 @property 460 def log_root_sig_size(self): 461 """Gets the size of the log_root signature.""" 462 if hasattr(self, 'log_root_signature'): 463 return len(self.log_root_signature) 464 return self._log_root_sig_size_expected 465 466 @property 467 def proof_hash_count(self): 468 """Gets the number of proof hashes.""" 469 if hasattr(self, 'proofs'): 470 return len(self.proofs) 471 return self._proof_hash_count_expected 472 473 @property 474 def inc_proof_size(self): 475 """Gets the total size of the proof hashes in bytes.""" 476 if hasattr(self, 'proofs'): 477 result = 0 478 for proof in self.proofs: 479 result += len(proof) 480 return result 481 return self._inc_proof_size_expected 482 483 def verify_icp(self, transparency_log_pub_key): 484 """Verifies the contained inclusion proof given the public log key. 485 486 Arguments: 487 transparency_log_pub_key: The path to the trusted public key for the log. 488 489 Returns: 490 True if the calculated signature matches AftlIcpEntry's. False otherwise. 491 """ 492 if not transparency_log_pub_key: 493 return False 494 495 leaf_hash = rfc6962_hash_leaf(self.fw_info_leaf.encode()) 496 calc_root = root_from_icp(self.leaf_index, 497 self.log_root_descriptor.tree_size, 498 self.proofs, 499 leaf_hash) 500 if ((calc_root == self.log_root_descriptor.root_hash) and 501 check_signature( 502 self.log_root_descriptor.encode(), 503 self.log_root_signature, 504 transparency_log_pub_key)): 505 return True 506 return False 507 508 def verify_vbmeta_image(self, vbmeta_descriptor, transparency_log_pub_key): 509 """Verify the inclusion proof for the given vbmeta_descriptor. 510 511 Arguments: 512 vbmeta_descriptor: A bytearray with the vbmeta descriptor. 513 transparency_log_pub_key: File path to the PEM file containing the trusted 514 transparency log public key. 515 516 Returns: 517 True if the inclusion proof validates and the vbmeta hash of the given 518 descriptor matches the one in the fw_info_leaf; otherwise False. 519 """ 520 if not vbmeta_descriptor: 521 return False 522 523 # Calculate the hash of the vbmeta image. 524 vbmeta_hash = hashlib.sha256(vbmeta_descriptor).digest() 525 526 # Validates the inclusion proof and then compare the calculated vbmeta_hash 527 # against the one in the inclusion proof. 528 return (self.verify_icp(transparency_log_pub_key) 529 and self.fw_info_leaf.vbmeta_hash == vbmeta_hash) 530 531 def save(self, output): 532 """Serializes the transparency header |SIZE| and data to disk. 533 534 Arguments: 535 output: The object to write the header to. 536 537 Raises: 538 AftlError: If invalid entry structure. 539 """ 540 output.write(self.encode()) 541 542 def encode(self): 543 """Serializes the header |SIZE| and data to a bytearray(). 544 545 Returns: 546 A bytearray() with the encoded header. 547 548 Raises: 549 AftlError: If invalid entry structure. 550 """ 551 proof_bytes = bytearray() 552 if not self.is_valid(): 553 raise AftlError('Invalid AftlIcpEntry structure') 554 555 expected_format_string = '{}{}s{}s{}s{}s{}s'.format( 556 self.FORMAT_STRING, 557 self.log_url_size, 558 self.log_root_descriptor_size, 559 self.fw_info_leaf_size, 560 self.log_root_sig_size, 561 self.inc_proof_size) 562 563 for proof in self.proofs: 564 proof_bytes.extend(proof) 565 566 return struct.pack(expected_format_string, 567 self.log_url_size, self.leaf_index, 568 self.log_root_descriptor_size, self.fw_info_leaf_size, 569 self.log_root_sig_size, self.proof_hash_count, 570 self.inc_proof_size, self.log_url, 571 self.log_root_descriptor.encode(), 572 str(self.fw_info_leaf.encode()), 573 str(self.log_root_signature), 574 str(proof_bytes)) 575 576 def translate_response(self, transparency_log, afi_response): 577 """Translates an AddFirmwareInfoResponse object to an AftlIcpEntry. 578 579 Arguments: 580 transparency_log: String representing the transparency log URL. 581 afi_response: The AddFirmwareResponse object to translate. 582 """ 583 self.log_url = transparency_log 584 585 # Deserializes from AddFirmwareInfoResponse. 586 self.leaf_index = afi_response.fw_info_proof.proof.leaf_index 587 self.log_root_descriptor = TrillianLogRootDescriptor( 588 afi_response.fw_info_proof.sth.log_root) 589 self.fw_info_leaf = FirmwareInfoLeaf(afi_response.fw_info_leaf) 590 self.log_root_signature = afi_response.fw_info_proof.sth.log_root_signature 591 self.proofs = afi_response.fw_info_proof.proof.hashes 592 593 def get_expected_size(self): 594 """Gets the expected size of the full entry out of the header. 595 596 Returns: 597 The expected size of the AftlIcpEntry from the header. 598 """ 599 return (self.SIZE + self.log_url_size + self.log_root_descriptor_size + 600 self.fw_info_leaf_size + self.log_root_sig_size + 601 self.inc_proof_size) 602 603 def is_valid(self): 604 """Ensures that values in an AftlIcpEntry structure are sane. 605 606 Returns: 607 True if the values in the AftlIcpEntry are sane, False otherwise. 608 """ 609 if self.leaf_index < 0: 610 sys.stderr.write('ICP entry: leaf index out of range: ' 611 '{}.\n'.format(self.leaf_index)) 612 return False 613 614 if (not self.log_root_descriptor or 615 not isinstance(self.log_root_descriptor, TrillianLogRootDescriptor) or 616 not self.log_root_descriptor.is_valid()): 617 sys.stderr.write('ICP entry: invalid TrillianLogRootDescriptor.\n') 618 return False 619 620 if (not self.fw_info_leaf or 621 not isinstance(self.fw_info_leaf, FirmwareInfoLeaf)): 622 sys.stderr.write('ICP entry: invalid FirmwareInfo.\n') 623 return False 624 return True 625 626 def print_desc(self, o): 627 """Print the descriptor. 628 629 Arguments: 630 o: The object to write the output to. 631 """ 632 i = ' ' * 4 633 fmt = '{}{:25}{}\n' 634 o.write(fmt.format(i, 'Transparency Log:', self.log_url)) 635 o.write(fmt.format(i, 'Leaf index:', self.leaf_index)) 636 o.write(' ICP hashes: ') 637 for i, proof_hash in enumerate(self.proofs): 638 if i != 0: 639 o.write(' ' * 29) 640 o.write('{}\n'.format(binascii.hexlify(proof_hash))) 641 self.log_root_descriptor.print_desc(o) 642 self.fw_info_leaf.print_desc(o) 643 644 645class TrillianLogRootDescriptor(object): 646 """A class representing the Trillian log_root descriptor. 647 648 Taken from Trillian definitions: 649 https://github.com/google/trillian/blob/master/trillian.proto#L255 650 651 Attributes: 652 version: The version number of the descriptor. Currently only version=1 is 653 supported. 654 tree_size: The size of the tree. 655 root_hash_size: The size of the root hash in bytes. Valid values are between 656 0 and 128. 657 root_hash: The root hash as bytearray(). 658 timestamp: The timestamp in nanoseconds. 659 revision: The revision number as long. 660 metadata_size: The size of the metadata in bytes. Valid values are between 661 0 and 65535. 662 metadata: The metadata as bytearray(). 663 """ 664 FORMAT_STRING_PART_1 = ('!H' # version 665 'Q' # tree_size 666 'B' # root_hash_size 667 ) 668 669 FORMAT_STRING_PART_2 = ('!Q' # timestamp 670 'Q' # revision 671 'H' # metadata_size 672 ) 673 674 def __init__(self, data=None): 675 """Initializes a new TrillianLogRoot descriptor.""" 676 if data: 677 # Parses first part of the log_root descriptor. 678 data_length = struct.calcsize(self.FORMAT_STRING_PART_1) 679 (self.version, self.tree_size, self.root_hash_size) = struct.unpack( 680 self.FORMAT_STRING_PART_1, data[0:data_length]) 681 data = data[data_length:] 682 683 # Parses the root_hash bytes if the size indicates existance. 684 if self.root_hash_size > 0: 685 self.root_hash = data[0:self.root_hash_size] 686 data = data[self.root_hash_size:] 687 else: 688 self.root_hash = bytearray() 689 690 # Parses second part of the log_root descriptor. 691 data_length = struct.calcsize(self.FORMAT_STRING_PART_2) 692 (self.timestamp, self.revision, self.metadata_size) = struct.unpack( 693 self.FORMAT_STRING_PART_2, data[0:data_length]) 694 data = data[data_length:] 695 696 # Parses the metadata if the size indicates existance. 697 if self.metadata_size > 0: 698 self.metadata = data[0:self.metadata_size] 699 else: 700 self.metadata = bytearray() 701 else: 702 self.version = 1 703 self.tree_size = 0 704 self.root_hash_size = 0 705 self.root_hash = bytearray() 706 self.timestamp = 0 707 self.revision = 0 708 self.metadata_size = 0 709 self.metadata = bytearray() 710 711 if not self.is_valid(): 712 raise AftlError('Invalid structure for TrillianLogRootDescriptor.') 713 714 def get_expected_size(self): 715 """Calculates the expected size of the TrillianLogRootDescriptor. 716 717 Returns: 718 The expected size of the TrillianLogRootDescriptor. 719 """ 720 return (struct.calcsize(self.FORMAT_STRING_PART_1) + self.root_hash_size + 721 struct.calcsize(self.FORMAT_STRING_PART_2) + self.metadata_size) 722 723 def encode(self): 724 """Serializes the TrillianLogDescriptor to a bytearray(). 725 726 Returns: 727 A bytearray() with the encoded header. 728 729 Raises: 730 AftlError: If invalid entry structure. 731 """ 732 if not self.is_valid(): 733 raise AftlError('Invalid structure for TrillianLogRootDescriptor.') 734 735 expected_format_string = '{}{}s{}{}s'.format( 736 self.FORMAT_STRING_PART_1, 737 self.root_hash_size, 738 self.FORMAT_STRING_PART_2[1:], 739 self.metadata_size) 740 741 return struct.pack(expected_format_string, 742 self.version, self.tree_size, self.root_hash_size, 743 str(self.root_hash), self.timestamp, self.revision, 744 self.metadata_size, str(self.metadata)) 745 746 def is_valid(self): 747 """Ensures that values in the descritor are sane. 748 749 Returns: 750 True if the values are sane; otherwise False. 751 """ 752 cls = self.__class__.__name__ 753 if self.version != 1: 754 sys.stderr.write('{}: Bad version value {}.\n'.format(cls, self.version)) 755 return False 756 if self.tree_size < 0: 757 sys.stderr.write('{}: Bad tree_size value {}.\n'.format(cls, 758 self.tree_size)) 759 return False 760 if self.root_hash_size < 0 or self.root_hash_size > 128: 761 sys.stderr.write('{}: Bad root_hash_size value {}.\n'.format( 762 cls, self.root_hash_size)) 763 return False 764 if len(self.root_hash) != self.root_hash_size: 765 sys.stderr.write('{}: root_hash_size {} does not match with length of ' 766 'root_hash {}.\n'.format(cls, self.root_hash_size, 767 len(self.root_hash))) 768 return False 769 if self.timestamp < 0: 770 sys.stderr.write('{}: Bad timestamp value {}.\n'.format(cls, 771 self.timestamp)) 772 return False 773 if self.revision < 0: 774 sys.stderr.write('{}: Bad revision value {}.\n'.format(cls, 775 self.revision)) 776 return False 777 if self.metadata_size < 0 or self.metadata_size > 65535: 778 sys.stderr.write('{}: Bad metadatasize value {}.\n'.format( 779 cls, self.metadata_size)) 780 return False 781 if len(self.metadata) != self.metadata_size: 782 sys.stderr.write('{}: metadata_size {} does not match with length of' 783 'metadata {}.\n'.format(cls, self.metadata_size, 784 len(self.metadata))) 785 return False 786 return True 787 788 def print_desc(self, o): 789 """Print the descriptor. 790 791 Arguments: 792 o: The object to write the output to. 793 """ 794 o.write(' Log Root Descriptor:\n') 795 i = ' ' * 6 796 fmt = '{}{:23}{}\n' 797 o.write(fmt.format(i, 'Version:', self.version)) 798 o.write(fmt.format(i, 'Tree size:', self.tree_size)) 799 o.write(fmt.format(i, 'Root hash size:', self.root_hash_size)) 800 if self.root_hash_size > 0: 801 o.write(fmt.format(i, 'Root hash:', binascii.hexlify(self.root_hash))) 802 o.write(fmt.format(i, 'Timestamp (ns):', self.timestamp)) 803 o.write(fmt.format(i, 'Revision:', self.revision)) 804 o.write(fmt.format(i, 'Metadata size:', self.metadata_size)) 805 if self.metadata_size > 0: 806 o.write(fmt.format(i, 'Metadata:', binascii.hexlify(self.metadata))) 807 808 809class FirmwareInfoLeaf(object): 810 """A class representing the FirmwareInfo leaf. 811 812 AFTL returns the fw_info_leaf as a JSON blob and this class is able to 813 parse the blog and provide necessary attributes needed for validation. 814 815 Attributes: 816 vbmeta_hash: This is the SHA256 hash of vbmeta. 817 version_incremental: Subcomponent of the build fingerprint as defined at 818 https://source.android.com/compatibility/android-cdd#3_2_2_build_parameters. 819 For example, a Pixel device with the following build fingerprint 820 google/crosshatch/crosshatch:9/PQ3A.190605.003/5524043:user/release-keys, 821 would have 5524043 for the version incremental. 822 platform_key: Public key of the platform. This is the same key used to sign 823 the vbmeta. 824 manufacturer_key_hash: SHA256 of the manufacturer public key (DER-encoded, 825 x509 subjectPublicKeyInfo format). The public key MUST already be in the 826 list of root keys known and trusted by the AFTL. 827 description: Free form description field. It can be used to annotate this 828 message with further context on the build (e.g., carrier specific build). 829 """ 830 831 def __init__(self, data=None): 832 """Initializes a new FirmwareInfoLeaf descriptor.""" 833 if data: 834 # We have to preserve the original fw_info_leaf bytes in order to preserve 835 # hash equivalence with what is stored in the Trillian log and matches up 836 # with the proofs. 837 self._fw_info_leaf_bytes = data 838 839 # Deserialize the JSON blob and keep only the FirmwareInfo parts. 840 try: 841 fw_info_leaf = json.loads(self._fw_info_leaf_bytes) 842 self._fw_info_leaf_dict = ( 843 fw_info_leaf['Value']['FwInfo']['info']['info']) 844 except (ValueError, KeyError) as e: 845 raise AftlError('Invalid structure for FirmwareInfoLeaf: {}'.format(e)) 846 else: 847 self._fw_info_leaf_bytes = '' 848 self._fw_info_leaf_dict = dict() 849 850 if not self.is_valid(): 851 raise AftlError('Invalid structure for FirmwareInfoLeaf.') 852 853 @property 854 def vbmeta_hash(self): 855 """Gets the vbmeta_hash attribute.""" 856 return self._lookup_base64_attribute('vbmeta_hash') 857 858 @property 859 def version_incremental(self): 860 """Gets the version_incremental attribute.""" 861 return self._fw_info_leaf_dict.get('version_incremental') 862 863 @property 864 def platform_key(self): 865 """Gets the platform key attribute.""" 866 return self._lookup_base64_attribute('platform_key') 867 868 @property 869 def manufacturer_key_hash(self): 870 """Gets the manufacturer_key_hash attribute.""" 871 return self._lookup_base64_attribute('manufacturer_key_hash') 872 873 @property 874 def description(self): 875 """Gets the description attribute.""" 876 return self._fw_info_leaf_dict.get('description') 877 878 def _lookup_base64_attribute(self, key): 879 """Looks up an attribute that is Base64 encoded and decodes it. 880 881 Arguments: 882 key: The name of the attribute to look up. 883 884 Returns: 885 The attribute value or None if not defined. 886 """ 887 result = self._fw_info_leaf_dict.get(key) 888 if result: 889 result = base64.b64decode(result) 890 return result 891 892 def get_expected_size(self): 893 """Gets the expected size of the JSON-serialized FirmwareInfoLeaf. 894 895 Returns: 896 The expected size of the FirmwareInfo leaf in byte or 0 if not initalized. 897 """ 898 if not self._fw_info_leaf_bytes: 899 return 0 900 return len(self._fw_info_leaf_bytes) 901 902 def encode(self): 903 """Serializes the FirmwareInfoLeaf. 904 905 Returns: 906 A bytearray() with the JSON-serialized FirmwareInfoLeaf. 907 """ 908 return self._fw_info_leaf_bytes 909 910 def is_valid(self): 911 """Ensures that values in the descritor are sane. 912 913 Returns: 914 True if the values are sane; otherwise False. 915 """ 916 # Checks that the decode fw_info_leaf at max contains values defined in the 917 # FirmwareInfo proto buf. 918 expected_fields = set(aftl_pb2.FirmwareInfo() 919 .DESCRIPTOR.fields_by_name.keys()) 920 actual_fields = set(self._fw_info_leaf_dict.keys()) 921 if actual_fields.issubset(expected_fields): 922 return True 923 return False 924 925 def print_desc(self, o): 926 """Print the descriptor. 927 928 Arguments: 929 o: The object to write the output to. 930 """ 931 o.write(' Firmware Info Leaf:\n') 932 # The order of the fields is based on the definition in 933 # proto.aftl_pb2.FirmwareInfo. 934 i = ' ' * 6 935 fmt = '{}{:23}{}\n' 936 if self.vbmeta_hash: 937 o.write(fmt.format(i, 'VBMeta hash:', binascii.hexlify(self.vbmeta_hash))) 938 if self.version_incremental: 939 o.write(fmt.format(i, 'Version incremental:', self.version_incremental)) 940 if self.platform_key: 941 o.write(fmt.format(i, 'Platform key:', self.platform_key)) 942 if self.manufacturer_key_hash: 943 o.write(fmt.format(i, 'Manufacturer key hash:', 944 binascii.hexlify(self.manufacturer_key_hash))) 945 if self.description: 946 o.write(fmt.format(i, 'Description:', self.description)) 947 948 949class AftlDescriptor(object): 950 """A class for the transparency log inclusion proof descriptor. 951 952 This encapsulates an AFTL ICP section with all information required to 953 validate an inclusion proof. 954 955 Attributes: 956 icp_header: A header for the section. 957 icp_entries: A list of AftlIcpEntry objects representing the inclusion 958 proofs. 959 """ 960 961 def __init__(self, data=None): 962 """Initializes a new AftlDescriptor section. 963 964 Arguments: 965 data: If not None, must be a bytearray representing an AftlDescriptor. 966 967 Raises: 968 AftlError: If the data does not represent a well-formed AftlDescriptor. 969 """ 970 if data: 971 icp_header_bytes = data[0:AftlIcpHeader.SIZE] 972 self.icp_header = AftlIcpHeader(icp_header_bytes) 973 if not self.icp_header.is_valid(): 974 raise AftlError('Invalid ICP header.') 975 icp_count = self.icp_header.icp_count 976 977 # Jump past the header for entry deserialization. 978 icp_index = AftlIcpHeader.SIZE 979 # Validate each entry. 980 self.icp_entries = [] 981 # Add_icp_entry updates entries and header, so set header count to 982 # compensate. 983 self.icp_header.icp_count = 0 984 for i in range(icp_count): 985 # Get the entry header from the AftlDescriptor. 986 cur_icp_entry = AftlIcpEntry(data[icp_index:]) 987 cur_icp_entry_size = cur_icp_entry.get_expected_size() 988 # Now validate the entry structure. 989 if not cur_icp_entry.is_valid(): 990 raise AftlError('Validation of ICP entry {} failed.'.format(i)) 991 self.add_icp_entry(cur_icp_entry) 992 icp_index += cur_icp_entry_size 993 else: 994 self.icp_header = AftlIcpHeader() 995 self.icp_entries = [] 996 if not self.is_valid(): 997 raise AftlError('Malformed AFTLDescriptor') 998 999 def add_icp_entry(self, icp_entry): 1000 """Adds a new AftlIcpEntry to the AftlDescriptor, updating fields as needed. 1001 1002 Arguments: 1003 icp_entry: An AftlIcpEntry structure. 1004 """ 1005 self.icp_entries.append(icp_entry) 1006 self.icp_header.icp_count += 1 1007 self.icp_header.aftl_descriptor_size += icp_entry.get_expected_size() 1008 1009 def verify_vbmeta_image(self, vbmeta_image, transparency_log_pub_keys): 1010 """Verifies the contained inclusion proof given the public log key. 1011 1012 Arguments: 1013 vbmeta_image: The vbmeta_image that should be verified against the 1014 inclusion proof. 1015 transparency_log_pub_keys: List of paths to PEM files containing trusted 1016 public keys that correspond with the transparency_logs. 1017 1018 Returns: 1019 True if all the inclusion proofs in the AfltDescriptor validate, are 1020 signed by one of the give transparency log public keys; otherwise false. 1021 """ 1022 if not transparency_log_pub_keys or not self.icp_entries: 1023 return False 1024 1025 icp_verified = 0 1026 for icp_entry in self.icp_entries: 1027 verified = False 1028 for pub_key in transparency_log_pub_keys: 1029 if icp_entry.verify_vbmeta_image(vbmeta_image, pub_key): 1030 verified = True 1031 break 1032 if verified: 1033 icp_verified += 1 1034 return icp_verified == len(self.icp_entries) 1035 1036 def save(self, output): 1037 """Serializes the AftlDescriptor to disk. 1038 1039 Arguments: 1040 output: The object to write the descriptor to. 1041 1042 Raises: 1043 AftlError: If invalid descriptor structure. 1044 """ 1045 output.write(self.encode()) 1046 1047 def encode(self): 1048 """Serialize the AftlDescriptor to a bytearray(). 1049 1050 Returns: 1051 A bytearray() with the encoded header. 1052 1053 Raises: 1054 AftlError: If invalid descriptor structure. 1055 """ 1056 # The header and entries are guaranteed to be valid when encode is called. 1057 # Check the entire structure as a whole. 1058 if not self.is_valid(): 1059 raise AftlError('Invalid AftlDescriptor structure.') 1060 1061 icp_descriptor = bytearray() 1062 icp_descriptor.extend(self.icp_header.encode()) 1063 for icp_entry in self.icp_entries: 1064 icp_descriptor.extend(icp_entry.encode()) 1065 return icp_descriptor 1066 1067 def is_valid(self): 1068 """Ensures that values in the AftlDescriptor are sane. 1069 1070 Returns: 1071 True if the values in the AftlDescriptor are sane, False otherwise. 1072 """ 1073 if not self.icp_header.is_valid(): 1074 return False 1075 1076 if self.icp_header.icp_count != len(self.icp_entries): 1077 return False 1078 1079 for icp_entry in self.icp_entries: 1080 if not icp_entry.is_valid(): 1081 return False 1082 return True 1083 1084 def print_desc(self, o): 1085 """Print the descriptor. 1086 1087 Arguments: 1088 o: The object to write the output to. 1089 """ 1090 o.write('Android Firmware Transparency Descriptor:\n') 1091 o.write(' Header:\n') 1092 self.icp_header.print_desc(o) 1093 for i, icp_entry in enumerate(self.icp_entries): 1094 o.write(' Entry #{}:\n'.format(i + 1)) 1095 icp_entry.print_desc(o) 1096 1097 1098class AftlCommunication(object): 1099 """Class to abstract the communication layer with the transparency log.""" 1100 1101 def __init__(self, transparency_log, timeout): 1102 """Initializes the object. 1103 1104 Arguments: 1105 transparency_log: String containing the URL of a transparency log server. 1106 timeout: Duration in seconds before requests to the AFTL times out. A 1107 value of 0 or None means there will be no timeout. 1108 """ 1109 self.transparency_log = transparency_log 1110 if timeout: 1111 self.timeout = timeout 1112 else: 1113 self.timeout = None 1114 1115 def add_firmware_info(self, request): 1116 """Calls the AddFirmwareInfo RPC on the AFTL server. 1117 1118 Arguments: 1119 request: A AddFirmwareInfoRequest message. 1120 1121 Returns: 1122 An AddFirmwareInfoReponse message. 1123 1124 Raises: 1125 AftlError: If grpc or the proto modules cannot be loaded, if there is an 1126 error communicating with the log. 1127 """ 1128 raise NotImplementedError( 1129 'AddFirmwareInfo() needs to be implemented by subclass.') 1130 1131 1132class AftlGrpcCommunication(AftlCommunication): 1133 """Class that implements GRPC communication to the AFTL server.""" 1134 1135 def add_firmware_info(self, request): 1136 """Calls the AddFirmwareInfo RPC on the AFTL server. 1137 1138 Arguments: 1139 request: A AddFirmwareInfoRequest message. 1140 1141 Returns: 1142 An AddFirmwareInfoReponse message. 1143 1144 Raises: 1145 AftlError: If grpc or the proto modules cannot be loaded, if there is an 1146 error communicating with the log. 1147 """ 1148 # Import grpc now to avoid global dependencies as it otherwise breakes 1149 # running unittest with atest. 1150 try: 1151 import grpc 1152 from proto import api_pb2_grpc 1153 except ImportError as e: 1154 err_str = 'grpc can be installed with python pip install grpcio.\n' 1155 raise AftlError('Failed to import module: ({}).\n{}'.format(e, err_str)) 1156 1157 # Set up the gRPC channel with the transparency log. 1158 sys.stdout.write('Preparing to request inclusion proof from {}. This could ' 1159 'take ~30 seconds for the process to complete.\n'.format( 1160 self.transparency_log)) 1161 channel = grpc.insecure_channel(self.transparency_log) 1162 stub = api_pb2_grpc.AFTLogStub(channel) 1163 1164 # Attempt to transmit to the transparency log. 1165 sys.stdout.write('ICP is about to be requested from transparency log ' 1166 'with domain {}.\n'.format(self.transparency_log)) 1167 try: 1168 response = stub.AddFirmwareInfo(request, timeout=self.timeout) 1169 except grpc.RpcError as e: 1170 raise AftlError('Error: grpc failure ({})'.format(e)) 1171 return response 1172 1173 1174class Aftl(avbtool.Avb): 1175 """Business logic for aftltool command-line tool.""" 1176 1177 def get_vbmeta_image(self, image_filename): 1178 """Gets the VBMeta struct bytes from image. 1179 1180 Arguments: 1181 image_filename: Image file to get information from. 1182 1183 Returns: 1184 A tuple with following elements: 1185 1. A bytearray with the vbmeta structure or None if the file does not 1186 contain a VBMeta structure. 1187 2. The VBMeta image footer. 1188 """ 1189 # Reads and parses the vbmeta image. 1190 try: 1191 image = avbtool.ImageHandler(image_filename) 1192 except (IOError, ValueError) as e: 1193 sys.stderr.write('The image does not contain a valid VBMeta structure: ' 1194 '{}.\n'.format(e)) 1195 return None, None 1196 1197 try: 1198 (footer, header, _, _) = self._parse_image(image) 1199 except avbtool.AvbError as e: 1200 sys.stderr.write('The image cannot be parsed: {}.\n'.format(e)) 1201 return None, None 1202 1203 # Seeks for the start of the vbmeta image and calculates its size. 1204 offset = 0 1205 if footer: 1206 offset = footer.vbmeta_offset 1207 vbmeta_image_size = (offset + header.SIZE 1208 + header.authentication_data_block_size 1209 + header.auxiliary_data_block_size) 1210 1211 # Reads the vbmeta image bytes. 1212 try: 1213 image.seek(offset) 1214 except RuntimeError as e: 1215 sys.stderr.write('Given vbmeta image offset is invalid: {}.\n'.format(e)) 1216 return None, None 1217 return image.read(vbmeta_image_size), footer 1218 1219 def get_aftl_descriptor(self, image_filename): 1220 """Gets the AftlDescriptor from image. 1221 1222 Arguments: 1223 image_filename: Image file to get information from. 1224 1225 Returns: 1226 An AftlDescriptor or None if the file does not contain a AftlDescriptor. 1227 """ 1228 # Reads the vbmeta image bytes. 1229 vbmeta_image, _ = self.get_vbmeta_image(image_filename) 1230 if not vbmeta_image: 1231 return None 1232 1233 try: 1234 image = avbtool.ImageHandler(image_filename) 1235 except ValueError as e: 1236 sys.stderr.write('The image does not contain a valid VBMeta structure: ' 1237 '{}.\n'.format(e)) 1238 return None 1239 1240 # Seeks for the start of the AftlDescriptor. 1241 try: 1242 image.seek(len(vbmeta_image)) 1243 except RuntimeError as e: 1244 sys.stderr.write('Given AftlDescriptor image offset is invalid: {}.\n' 1245 .format(e)) 1246 return None 1247 1248 # Parses the header for the AftlDescriptor size. 1249 tmp_header_bytes = image.read(AftlIcpHeader.SIZE) 1250 if not tmp_header_bytes or len(tmp_header_bytes) != AftlIcpHeader.SIZE: 1251 sys.stderr.write('This image does not contain a AftlDescriptor.\n') 1252 return None 1253 1254 try: 1255 tmp_header = AftlIcpHeader(tmp_header_bytes) 1256 except AftlError as e: 1257 sys.stderr.write('This image does not contain a valid AftlDescriptor: ' 1258 '{}.\n'.format(e)) 1259 return None 1260 1261 # Resets to the beginning of the AftlDescriptor. 1262 try: 1263 image.seek(len(vbmeta_image)) 1264 except RuntimeError as e: 1265 sys.stderr.write('Given AftlDescriptor image offset is invalid: {}.\n' 1266 .format(e)) 1267 return None 1268 1269 # Parses the full AftlDescriptor. 1270 icp_bytes = image.read(tmp_header.aftl_descriptor_size) 1271 aftl_descriptor = None 1272 try: 1273 aftl_descriptor = AftlDescriptor(icp_bytes) 1274 except AftlError as e: 1275 sys.stderr.write('The image does not contain a valid AftlDescriptor: ' 1276 '{}.\n'.format(e)) 1277 return aftl_descriptor 1278 1279 def info_image_icp(self, vbmeta_image_path, output): 1280 """Implements the 'info_image_icp' command. 1281 1282 Arguments: 1283 vbmeta_image_path: Image file to get information from. 1284 output: Output file to write human-readable information to (file object). 1285 1286 Returns: 1287 True if the given image has an AftlDescriptor and could successfully 1288 be processed; otherwise False. 1289 """ 1290 aftl_descriptor = self.get_aftl_descriptor(vbmeta_image_path) 1291 if not aftl_descriptor: 1292 return False 1293 aftl_descriptor.print_desc(output) 1294 return True 1295 1296 def verify_image_icp(self, vbmeta_image_path, transparency_log_pub_keys, 1297 output): 1298 """Implements the 'verify_image_icp' command. 1299 1300 Arguments: 1301 vbmeta_image_path: Image file to get information from. 1302 transparency_log_pub_keys: List of paths to PEM files containing trusted 1303 public keys that correspond with the transparency_logs. 1304 output: Output file to write human-readable information to (file object). 1305 1306 Returns: 1307 True if for the given image the inclusion proof validates; otherwise 1308 False. 1309 """ 1310 vbmeta_image, _ = self.get_vbmeta_image(vbmeta_image_path) 1311 aftl_descriptor = self.get_aftl_descriptor(vbmeta_image_path) 1312 if not aftl_descriptor or not vbmeta_image: 1313 return False 1314 verified = aftl_descriptor.verify_vbmeta_image(vbmeta_image, 1315 transparency_log_pub_keys) 1316 if not verified: 1317 sys.stderr.write('The inclusion proofs for the image do not validate.\n') 1318 return False 1319 sys.stdout.write('The inclusion proofs for the image successfully ' 1320 'validate.\n') 1321 return True 1322 1323 def request_inclusion_proof(self, transparency_log, vbmeta_descriptor, 1324 version_inc, manufacturer_key_path, 1325 signing_helper, signing_helper_with_files, 1326 timeout, aftl_comms=None): 1327 """Packages and sends a request to the specified transparency log. 1328 1329 Arguments: 1330 transparency_log: String containing the URL of a transparency log server. 1331 vbmeta_descriptor: A bytearray with the vbmeta descriptor. 1332 version_inc: Subcomponent of the build fingerprint. 1333 manufacturer_key_path: Path to key used to sign messages sent to the 1334 transparency log servers. 1335 signing_helper: Program which signs a hash and returns a signature. 1336 signing_helper_with_files: Same as signing_helper but uses files instead. 1337 timeout: Duration in seconds before requests to the transparency log 1338 timeout. 1339 aftl_comms: A subclass of the AftlCommunication class. The default is 1340 to use AftlGrpcCommunication. 1341 1342 Returns: 1343 An AftlIcpEntry with the inclusion proof for the log entry. 1344 1345 Raises: 1346 AftlError: If grpc or the proto modules cannot be loaded, if there is an 1347 error communicating with the log, if the manufacturer_key_path 1348 cannot be decoded, or if the log submission cannot be signed. 1349 """ 1350 # Calculate the hash of the vbmeta image. 1351 vbmeta_hash = hashlib.sha256(vbmeta_descriptor).digest() 1352 1353 # Extract the key data from the PEM file if of size 4096. 1354 manufacturer_key = avbtool.RSAPublicKey(manufacturer_key_path) 1355 if manufacturer_key.num_bits != 4096: 1356 raise AftlError('Manufacturer keys not of size 4096: {}'.format( 1357 manufacturer_key.num_bits)) 1358 manufacturer_key_data = rsa_key_read_pem_bytes(manufacturer_key_path) 1359 1360 # Calculate the hash of the manufacturer key data. 1361 m_key_hash = hashlib.sha256(manufacturer_key_data).digest() 1362 1363 # Create an AddFirmwareInfoRequest protobuf for transmission to AFTL. 1364 fw_info = aftl_pb2.FirmwareInfo(vbmeta_hash=vbmeta_hash, 1365 version_incremental=version_inc, 1366 manufacturer_key_hash=m_key_hash) 1367 signed_fw_info = bytearray() 1368 # AFTL supports SHA256_RSA4096 for now, more will be available. 1369 algorithm_name = 'SHA256_RSA4096' 1370 sig_num_bytes = 0 1371 alg_padding = '' 1372 try: 1373 alg = avbtool.ALGORITHMS[algorithm_name] 1374 sig_num_bytes = alg.signature_num_bytes 1375 alg_padding = alg.padding 1376 except KeyError: 1377 raise AftlError('Unknown algorithm with name {}'.format(algorithm_name)) 1378 1379 fw_info_hash = hashlib.sha256(fw_info.SerializeToString()).digest() 1380 padding_and_hash = str(bytearray(alg_padding)) + fw_info_hash 1381 try: 1382 signed_fw_info = avbtool.raw_sign(signing_helper, 1383 signing_helper_with_files, 1384 algorithm_name, 1385 sig_num_bytes, 1386 manufacturer_key_path, 1387 padding_and_hash) 1388 except avbtool.AvbError as e: 1389 raise AftlError('Failed to sign FirmwareInfo with ' 1390 '--manufacturer_key: {}'.format(e)) 1391 fw_info_sig = sigpb.sigpb_pb2.DigitallySigned( 1392 hash_algorithm='SHA256', 1393 signature_algorithm='RSA', 1394 signature=str(signed_fw_info)) 1395 1396 sfw_info = aftl_pb2.SignedFirmwareInfo(info=fw_info, 1397 info_signature=fw_info_sig) 1398 request = api_pb2.AddFirmwareInfoRequest(vbmeta=bytes( 1399 str(vbmeta_descriptor)), fw_info=sfw_info) 1400 1401 # Submit signed FirmwareInfo to the server. 1402 if not aftl_comms: 1403 aftl_comms = AftlGrpcCommunication(transparency_log, timeout) 1404 response = aftl_comms.add_firmware_info(request) 1405 1406 # Return an AftlIcpEntry representing this response. 1407 icp_entry = AftlIcpEntry() 1408 icp_entry.translate_response(transparency_log, response) 1409 return icp_entry 1410 1411 def make_icp_from_vbmeta(self, vbmeta_image_path, output, 1412 signing_helper, signing_helper_with_files, 1413 version_incremental, transparency_log_servers, 1414 transparency_log_pub_keys, manufacturer_key, 1415 padding_size, timeout): 1416 """Generates a vbmeta image with inclusion proof given a vbmeta image. 1417 1418 The descriptor (struct AftlDescriptor) contains the information required to 1419 validate an inclusion proof for a specific vbmeta image. It consists 1420 of a header (struct AftlIcpHeader) and zero or more entry structures 1421 (struct AftlIcpEntry) that contain the vbmeta leaf hash, tree size, 1422 root hash, inclusion proof hashes, and the signature for the root hash. 1423 1424 The vbmeta image, its hash, the version_incremental part of the build 1425 fingerprint, and the hash of the manufacturer key are sent to the 1426 transparency log, with the message signed by the manufacturer key. 1427 An inclusion proof is calculated and returned. This inclusion proof is 1428 then packaged in a AftlDescriptor structure. The existing vbmeta data is 1429 copied to a new file, appended with the AftlDescriptor data, and written to 1430 output. Validation of the inclusion proof does not require 1431 communication with the transparency log. 1432 1433 Arguments: 1434 vbmeta_image_path: Path to a vbmeta image file. 1435 output: File to write the results to. 1436 signing_helper: Program which signs a hash and returns a signature. 1437 signing_helper_with_files: Same as signing_helper but uses files instead. 1438 version_incremental: A string representing the subcomponent of the 1439 build fingerprint used to identify the vbmeta in the transparency log. 1440 transparency_log_servers: List of strings containing URLs of transparency 1441 log servers where inclusion proofs are requested from. 1442 transparency_log_pub_keys: List of paths to PEM files containing trusted 1443 public keys that correspond with the transparency_logs. There must be 1444 the same number of keys as log servers and they must be in the same 1445 order, that is, transparency_log_pub_keys[n] corresponds to 1446 transparency_log_servers[n]. 1447 manufacturer_key: Path to PEM file containting the key file used to sign 1448 messages sent to the transparency log servers. 1449 padding_size: If not 0, pads output so size is a multiple of the number. 1450 timeout: Duration in seconds before requests to the AFTL times out. A 1451 value of 0 or None means there will be no timeout. 1452 1453 Returns: 1454 True if the inclusion proofs could be fetched from the transparency log 1455 servers and could be successfully validated; otherwise False. 1456 """ 1457 # Validates command line parameters. 1458 if len(transparency_log_servers) != len(transparency_log_pub_keys): 1459 sys.stderr.write('Transparency log count and public key count mismatch: ' 1460 '{} servers and {} public keys.\n'.format( 1461 len(transparency_log_servers), 1462 len(transparency_log_pub_keys))) 1463 return False 1464 transparency_logs = zip(transparency_log_servers, transparency_log_pub_keys) 1465 1466 # Retrieves vbmeta structure from given partition image. 1467 vbmeta_image, footer = self.get_vbmeta_image(vbmeta_image_path) 1468 1469 # Fetches inclusion proofs for vbmeta structure from all transparency logs. 1470 aftl_descriptor = AftlDescriptor() 1471 for host, pub_key in transparency_logs: 1472 try: 1473 icp_entry = self.request_inclusion_proof(host, vbmeta_image, 1474 version_incremental, 1475 manufacturer_key, 1476 signing_helper, 1477 signing_helper_with_files, 1478 timeout) 1479 if not icp_entry.verify_vbmeta_image(vbmeta_image, pub_key): 1480 sys.stderr.write('The inclusion proof from {} could not be verified.' 1481 '\n'.format(host)) 1482 aftl_descriptor.add_icp_entry(icp_entry) 1483 except AftlError as e: 1484 # The inclusion proof request failed. Continue and see if others will. 1485 sys.stderr.write('Requesting inclusion proof failed: {}.\n'.format(e)) 1486 continue 1487 1488 # Checks that the resulting AftlDescriptor is sane. 1489 if aftl_descriptor.icp_header.icp_count != len(transparency_log_pub_keys): 1490 sys.stderr.write('Valid inclusion proofs could only be retrieved from {} ' 1491 'out of {} transparency logs.\n' 1492 .format(aftl_descriptor.icp_header.icp_count, 1493 len(transparency_log_pub_keys))) 1494 return False 1495 if not aftl_descriptor.is_valid(): 1496 sys.stderr.write('Resulting AftlDescriptor structure is malformed.\n') 1497 return False 1498 if not aftl_descriptor.verify_vbmeta_image(vbmeta_image, 1499 transparency_log_pub_keys): 1500 sys.stderr.write('Resulting AftlDescriptor inclusion proofs do not ' 1501 'validate.\n') 1502 return False 1503 1504 # Write the original vbmeta descriptor, followed by the AftlDescriptor. 1505 if footer: # Checks if it is a chained partition. 1506 # TODO(b/147217370): Determine the best way to handle chained partitions 1507 # like the system.img. Currently, we only put the main vbmeta.img in the 1508 # transparency log. 1509 sys.stderr.write('Image has a footer and ICP for this format is not ' 1510 'implemented.\n') 1511 return False 1512 1513 # Writes vbmeta image with inclusion proof into a new vbmeta image. 1514 output.seek(0) 1515 output.write(vbmeta_image) 1516 encoded_aftl_descriptor = aftl_descriptor.encode() 1517 output.write(encoded_aftl_descriptor) 1518 1519 if padding_size > 0: 1520 descriptor_size = len(vbmeta_image) + len(encoded_aftl_descriptor) 1521 padded_size = avbtool.round_to_multiple(descriptor_size, padding_size) 1522 padding_needed = padded_size - descriptor_size 1523 output.write('\0' * padding_needed) 1524 1525 sys.stdout.write('VBMeta image with AFTL inclusion proofs successfully ' 1526 'created.\n') 1527 return True 1528 1529 def _load_test_process_function(self, vbmeta_image_path, 1530 transparency_log_server, 1531 transparency_log_pub_key, manufacturer_key, 1532 process_number, submission_count, 1533 preserve_icp_images, timeout, result_queue): 1534 """Function to be used by multiprocessing.Process. 1535 1536 Arguments: 1537 vbmeta_image_path: Path to a vbmeta image file. 1538 transparency_log_server: A string in host:port format of transparency log 1539 servers where a inclusion proof is requested from. 1540 transparency_log_pub_key: Path to PEM file containing trusted 1541 public keys that corresponds with the transparency_log_server. 1542 manufacturer_key: Path to PEM file containting the key file used to sign 1543 messages sent to the transparency log servers. 1544 process_number: The number of the processes executing the function. 1545 submission_count: Number of total submissions to perform per 1546 process_count. 1547 preserve_icp_images: Boolean to indicate if the generated vbmeta 1548 image files with inclusion proofs should preserved. 1549 timeout: Duration in seconds before requests to the AFTL times out. A 1550 value of 0 or None means there will be no timeout. 1551 result_queue: Multiprocessing.Queue object for posting execution results. 1552 """ 1553 for count in range(0, submission_count): 1554 version_incremental = 'aftl_load_testing_{}_{}'.format(process_number, 1555 count) 1556 output_file = '{}_icp.img'.format(version_incremental) 1557 output = open(output_file, 'wb') 1558 1559 # Instrumented section. 1560 start_time = time.time() 1561 result = self.make_icp_from_vbmeta( 1562 vbmeta_image_path=vbmeta_image_path, 1563 output=output, 1564 signing_helper=None, 1565 signing_helper_with_files=None, 1566 version_incremental=version_incremental, 1567 transparency_log_servers=[transparency_log_server], 1568 transparency_log_pub_keys=[transparency_log_pub_key], 1569 manufacturer_key=manufacturer_key, 1570 padding_size=0, 1571 timeout=timeout) 1572 end_time = time.time() 1573 1574 output.close() 1575 if not preserve_icp_images: 1576 os.unlink(output_file) 1577 1578 # Puts the result onto the result queue. 1579 execution_time = end_time - start_time 1580 result_queue.put((start_time, end_time, execution_time, 1581 version_incremental, result)) 1582 1583 def load_test_aftl(self, vbmeta_image_path, output, transparency_log_server, 1584 transparency_log_pub_key, manufacturer_key, 1585 process_count, submission_count, stats_filename, 1586 preserve_icp_images, timeout): 1587 """Performs multi-threaded load test on a given AFTL and prints stats. 1588 1589 Arguments: 1590 vbmeta_image_path: Path to a vbmeta image file. 1591 output: File to write the report to. 1592 transparency_log_server: A string in host:port format of transparency log 1593 servers where a inclusion proof is requested from. 1594 transparency_log_pub_key: Path to PEM file containing trusted 1595 public keys that corresponds with the transparency_log_server. 1596 manufacturer_key: Path to PEM file containting the key file used to sign 1597 messages sent to the transparency log servers. 1598 process_count: Number of processes used for parallel testing. 1599 submission_count: Number of total submissions to perform per 1600 process_count. 1601 stats_filename: Path to the stats file to write the raw execution data to. 1602 preserve_icp_images: Boolean to indicate if the generated vbmeta 1603 image files with inclusion proofs should preserved. 1604 timeout: Duration in seconds before requests to the AFTL times out. A 1605 value of 0 or None means there will be no timeout. 1606 1607 Returns: 1608 True if the load tested succeeded without errors; otherwise False. 1609 """ 1610 if process_count < 1 or submission_count < 1: 1611 sys.stderr.write('Values for --processes/--submissions ' 1612 'must be at least 1.\n') 1613 return False 1614 1615 if not stats_filename: 1616 stats_filename = 'load_test_p{}_s{}.csv'.format(process_count, 1617 submission_count) 1618 try: 1619 stats_file = open(stats_filename, 'w') 1620 stats_file.write('start_time,end_time,execution_time,version_incremental,' 1621 'result\n') 1622 except IOError as e: 1623 sys.stderr.write('Could not open stats file {}: {}.\n' 1624 .format(stats_file, e)) 1625 return False 1626 1627 # Launch all the processes with their workloads. 1628 result_queue = multiprocessing.Queue() 1629 processes = set() 1630 execution_times = [] 1631 results = [] 1632 for i in range(0, process_count): 1633 p = multiprocessing.Process( 1634 target=self._load_test_process_function, 1635 args=(vbmeta_image_path, transparency_log_server, 1636 transparency_log_pub_key, manufacturer_key, i, submission_count, 1637 preserve_icp_images, timeout, result_queue)) 1638 p.start() 1639 processes.add(p) 1640 1641 while processes: 1642 # Processes the results queue and writes these to a stats file. 1643 try: 1644 (start_time, end_time, execution_time, version_incremental, 1645 result) = result_queue.get(timeout=1) 1646 stats_file.write('{},{},{},{},{}\n'.format(start_time, end_time, 1647 execution_time, 1648 version_incremental, result)) 1649 execution_times.append(execution_time) 1650 results.append(result) 1651 except Queue.Empty: 1652 pass 1653 1654 # Checks if processes are still alive; if not clean them up. join() would 1655 # have been nicer but we want to continously stream out the stats to file. 1656 for p in processes.copy(): 1657 if not p.is_alive(): 1658 processes.remove(p) 1659 1660 # Prepares stats. 1661 executions = sorted(execution_times) 1662 execution_count = len(execution_times) 1663 median = 0 1664 if execution_count % 2 == 0: 1665 median = (executions[execution_count // 2 - 1] 1666 + executions[execution_count // 2]) / 2 1667 else: 1668 median = executions[execution_count // 2] 1669 1670 # Outputs the stats report. 1671 o = output 1672 o.write('Load testing results:\n') 1673 o.write(' Processes: {}\n'.format(process_count)) 1674 o.write(' Submissions per process: {}\n'.format(submission_count)) 1675 o.write('\n') 1676 o.write(' Submissions:\n') 1677 o.write(' Total: {}\n'.format(len(executions))) 1678 o.write(' Succeeded: {}\n'.format(results.count(True))) 1679 o.write(' Failed: {}\n'.format(results.count(False))) 1680 o.write('\n') 1681 o.write(' Submission execution durations:\n') 1682 o.write(' Average: {:.2f} sec\n'.format( 1683 sum(execution_times) / execution_count)) 1684 o.write(' Median: {:.2f} sec\n'.format(median)) 1685 o.write(' Min: {:.2f} sec\n'.format(min(executions))) 1686 o.write(' Max: {:.2f} sec\n'.format(max(executions))) 1687 1688 # Close the stats file. 1689 stats_file.close() 1690 if results.count(False): 1691 return False 1692 return True 1693 1694 1695class AftlTool(avbtool.AvbTool): 1696 """Object for aftltool command-line tool.""" 1697 1698 def __init__(self): 1699 """Initializer method.""" 1700 self.aftl = Aftl() 1701 super(AftlTool, self).__init__() 1702 1703 def make_icp_from_vbmeta(self, args): 1704 """Implements the 'make_icp_from_vbmeta' sub-command.""" 1705 args = self._fixup_common_args(args) 1706 return self.aftl.make_icp_from_vbmeta(args.vbmeta_image_path, 1707 args.output, 1708 args.signing_helper, 1709 args.signing_helper_with_files, 1710 args.version_incremental, 1711 args.transparency_log_servers, 1712 args.transparency_log_pub_keys, 1713 args.manufacturer_key, 1714 args.padding_size, 1715 args.timeout) 1716 1717 def info_image_icp(self, args): 1718 """Implements the 'info_image_icp' sub-command.""" 1719 return self.aftl.info_image_icp(args.vbmeta_image_path.name, args.output) 1720 1721 def verify_image_icp(self, args): 1722 """Implements the 'verify_image_icp' sub-command.""" 1723 return self.aftl.verify_image_icp(args.vbmeta_image_path.name, 1724 args.transparency_log_pub_keys, 1725 args.output) 1726 1727 def load_test_aftl(self, args): 1728 """Implements the 'load_test_aftl' sub-command.""" 1729 return self.aftl.load_test_aftl(args.vbmeta_image_path, 1730 args.output, 1731 args.transparency_log_server, 1732 args.transparency_log_pub_key, 1733 args.manufacturer_key, 1734 args.processes, 1735 args.submissions, 1736 args.stats_file, 1737 args.preserve_icp_images, 1738 args.timeout) 1739 1740 def run(self, argv): 1741 """Command-line processor. 1742 1743 Arguments: 1744 argv: Pass sys.argv from main. 1745 """ 1746 parser = argparse.ArgumentParser() 1747 subparsers = parser.add_subparsers(title='subcommands') 1748 1749 # Command: make_icp_from_vbmeta 1750 sub_parser = subparsers.add_parser('make_icp_from_vbmeta', 1751 help='Makes an ICP enhanced vbmeta image' 1752 ' from an existing vbmeta image.') 1753 sub_parser.add_argument('--output', 1754 help='Output file name.', 1755 type=argparse.FileType('wb'), 1756 default=sys.stdout) 1757 sub_parser.add_argument('--vbmeta_image_path', 1758 help='Path to a generate vbmeta image file.', 1759 required=True) 1760 sub_parser.add_argument('--version_incremental', 1761 help='Current build ID.', 1762 required=True) 1763 sub_parser.add_argument('--manufacturer_key', 1764 help='Path to the PEM file containing the ' 1765 'manufacturer key for use with the log.', 1766 required=True) 1767 sub_parser.add_argument('--transparency_log_servers', 1768 help='List of transparency log servers in ' 1769 'host:port format. This must not be None and must ' 1770 'be the same size as transparency_log_pub_keys. ' 1771 'Also, transparency_log_servers[n] must correspond ' 1772 'to transparency_log_pub_keys[n] for all values n.', 1773 nargs='*', 1774 required=True) 1775 sub_parser.add_argument('--transparency_log_pub_keys', 1776 help='Paths to PEM files containing transparency ' 1777 'log server key(s). This must not be None and must ' 1778 'be the same size as transparency_log_servers. ' 1779 'Also, transparency_log_pub_keys[n] must ' 1780 'correspond to transparency_log_servers[n] for all ' 1781 'values n.', 1782 nargs='*', 1783 required=True) 1784 sub_parser.add_argument('--padding_size', 1785 metavar='NUMBER', 1786 help='If non-zero, pads output with NUL bytes so ' 1787 'its size is a multiple of NUMBER (default: 0)', 1788 type=avbtool.parse_number, 1789 default=0) 1790 sub_parser.add_argument('--timeout', 1791 metavar='SECONDS', 1792 help='Timeout in seconds for transparency log ' 1793 'requests (default: 600 sec). A value of 0 means ' 1794 'no timeout.', 1795 type=avbtool.parse_number, 1796 default=600) 1797 self._add_common_args(sub_parser) 1798 sub_parser.set_defaults(func=self.make_icp_from_vbmeta) 1799 1800 # Command: info_image_icp 1801 sub_parser = subparsers.add_parser( 1802 'info_image_icp', 1803 help='Show information about AFTL ICPs in vbmeta or footer.') 1804 sub_parser.add_argument('--vbmeta_image_path', 1805 help='Path to vbmeta image for AFTL information.', 1806 type=argparse.FileType('rb'), 1807 required=True) 1808 sub_parser.add_argument('--output', 1809 help='Write info to file', 1810 type=argparse.FileType('wt'), 1811 default=sys.stdout) 1812 sub_parser.set_defaults(func=self.info_image_icp) 1813 1814 # Arguments for verify_image_icp. 1815 sub_parser = subparsers.add_parser( 1816 'verify_image_icp', 1817 help='Verify AFTL ICPs in vbmeta or footer.') 1818 1819 sub_parser.add_argument('--vbmeta_image_path', 1820 help='Image to verify the inclusion proofs.', 1821 type=argparse.FileType('rb'), 1822 required=True) 1823 sub_parser.add_argument('--transparency_log_pub_keys', 1824 help='Paths to PEM files containing transparency ' 1825 'log server key(s). This must not be None.', 1826 nargs='*', 1827 required=True) 1828 sub_parser.add_argument('--output', 1829 help='Write info to file', 1830 type=argparse.FileType('wt'), 1831 default=sys.stdout) 1832 sub_parser.set_defaults(func=self.verify_image_icp) 1833 1834 # Command: load_test_aftl 1835 sub_parser = subparsers.add_parser( 1836 'load_test_aftl', 1837 help='Perform load testing against one AFTL log server. Note: This MUST' 1838 ' not be performed against a production system.') 1839 sub_parser.add_argument('--vbmeta_image_path', 1840 help='Path to a generate vbmeta image file.', 1841 required=True) 1842 sub_parser.add_argument('--output', 1843 help='Write report to file.', 1844 type=argparse.FileType('wt'), 1845 default=sys.stdout) 1846 sub_parser.add_argument('--manufacturer_key', 1847 help='Path to the PEM file containing the ' 1848 'manufacturer key for use with the log.', 1849 required=True) 1850 sub_parser.add_argument('--transparency_log_server', 1851 help='Transparency log servers to test against in ' 1852 'host:port format.', 1853 required=True) 1854 sub_parser.add_argument('--transparency_log_pub_key', 1855 help='Paths to PEM file containing transparency ' 1856 'log server key.', 1857 required=True) 1858 sub_parser.add_argument('--processes', 1859 help='Number of parallel processes to use for ' 1860 'testing (default: 1).', 1861 type=avbtool.parse_number, 1862 default=1) 1863 sub_parser.add_argument('--submissions', 1864 help='Number of submissions to perform against the ' 1865 'log per process (default: 1).', 1866 type=avbtool.parse_number, 1867 default=1) 1868 sub_parser.add_argument('--stats_file', 1869 help='Path to the stats file to write the raw ' 1870 'execution data to (Default: ' 1871 'load_test_p[processes]_s[submissions].csv.') 1872 sub_parser.add_argument('--preserve_icp_images', 1873 help='Boolean flag to indicate if the generated ' 1874 'vbmeta image files with inclusion proofs should ' 1875 'preserved.', 1876 action='store_true') 1877 sub_parser.add_argument('--timeout', 1878 metavar='SECONDS', 1879 help='Timeout in seconds for transparency log ' 1880 'requests (default: 0). A value of 0 means ' 1881 'no timeout.', 1882 type=avbtool.parse_number, 1883 default=0) 1884 sub_parser.set_defaults(func=self.load_test_aftl) 1885 1886 args = parser.parse_args(argv[1:]) 1887 try: 1888 success = args.func(args) 1889 if not success: 1890 # Signals to calling tools that the command has failed. 1891 sys.exit(1) 1892 except AftlError as e: 1893 # Signals to calling tools that an unhandled exeception occured. 1894 sys.stderr.write('Unhandled AftlError occured: {}\n'.format(e)) 1895 sys.exit(2) 1896 1897if __name__ == '__main__': 1898 tool = AftlTool() 1899 tool.run(sys.argv) 1900