• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2
3# Copyright 2020, The Android Open Source Project
4#
5# Permission is hereby granted, free of charge, to any person
6# obtaining a copy of this software and associated documentation
7# files (the "Software"), to deal in the Software without
8# restriction, including without limitation the rights to use, copy,
9# modify, merge, publish, distribute, sublicense, and/or sell copies
10# of the Software, and to permit persons to whom the Software is
11# furnished to do so, subject to the following conditions:
12#
13# The above copyright notice and this permission notice shall be
14# included in all copies or substantial portions of the Software.
15#
16# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
20# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
21# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
22# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23# SOFTWARE.
24#
25"""Command-line tool for AFTL support for Android Verified Boot images."""
26
27from __future__ import division
28
29import argparse
30import base64
31import binascii
32import hashlib
33import json
34import multiprocessing
35import os
36import Queue  # pylint: disable=bad-python3-import
37import struct
38import subprocess
39import sys
40import tempfile
41import time
42
43
44import avbtool
45from proto import aftl_pb2
46from proto import api_pb2
47from proto.crypto import sigpb
48
49
50class AftlError(Exception):
51  """Application-specific errors.
52
53  These errors represent issues for which a stack-trace should not be
54  presented.
55
56  Attributes:
57    message: Error message.
58  """
59
60  def __init__(self, message):
61    Exception.__init__(self, message)
62
63
64def rsa_key_read_pem_bytes(key_path):
65  """Reads the bytes out of the passed in PEM file.
66
67  Arguments:
68    key_path: A string containing the path to the PEM file.
69
70  Returns:
71    A bytearray containing the DER encoded bytes in the PEM file.
72
73  Raises:
74    AftlError: If openssl cannot decode the PEM file.
75  """
76  # Use openssl to decode the PEM file.
77  args = ['openssl', 'rsa', '-in', key_path, '-pubout', '-outform', 'DER']
78  p = subprocess.Popen(args,
79                       stdin=subprocess.PIPE,
80                       stdout=subprocess.PIPE,
81                       stderr=subprocess.PIPE)
82  (pout, perr) = p.communicate()
83  retcode = p.wait()
84  if retcode != 0:
85    raise AftlError('Error decoding: {}'.format(perr))
86  return bytearray(pout)
87
88
89def check_signature(log_root, log_root_sig,
90                    transparency_log_pub_key):
91  """Validates the signature provided by the transparency log.
92
93  Arguments:
94    log_root: The transparency log_root data structure.
95    log_root_sig: The signature of the transparency log_root data structure.
96    transparency_log_pub_key: The file path to the transparency log public key.
97
98  Returns:
99    True if the signature check passes, otherwise False.
100  """
101
102  logsig_tmp = tempfile.NamedTemporaryFile()
103  logsig_tmp.write(log_root_sig)
104  logsig_tmp.flush()
105  logroot_tmp = tempfile.NamedTemporaryFile()
106  logroot_tmp.write(log_root)
107  logroot_tmp.flush()
108
109  p = subprocess.Popen(['openssl', 'dgst', '-sha256', '-verify',
110                        transparency_log_pub_key,
111                        '-signature', logsig_tmp.name, logroot_tmp.name],
112                       stdin=subprocess.PIPE,
113                       stdout=subprocess.PIPE,
114                       stderr=subprocess.PIPE)
115
116  p.communicate()
117  retcode = p.wait()
118  if not retcode:
119    return True
120  return False
121
122
123# AFTL Merkle Tree Functionality
124def rfc6962_hash_leaf(leaf):
125  """RFC6962 hashing function for hashing leaves of a Merkle tree.
126
127  Arguments:
128    leaf: A bytearray containing the Merkle tree leaf to be hashed.
129
130  Returns:
131    A bytearray containing the RFC6962 SHA256 hash of the leaf.
132  """
133  hasher = hashlib.sha256()
134  # RFC6962 states a '0' byte should be prepended to the data.
135  # This is done in conjunction with the '1' byte for non-leaf
136  # nodes for 2nd preimage attack resistance.
137  hasher.update(b'\x00')
138  hasher.update(leaf)
139  return hasher.digest()
140
141
142def rfc6962_hash_children(l, r):
143  """Calculates the inner Merkle tree node hash of child nodes l and r.
144
145  Arguments:
146    l: A bytearray containing the left child node to be hashed.
147    r: A bytearray containing the right child node to be hashed.
148
149  Returns:
150    A bytearray containing the RFC6962 SHA256 hash of 1|l|r.
151  """
152  hasher = hashlib.sha256()
153  # RFC6962 states a '1' byte should be prepended to the concatenated data.
154  # This is done in conjunction with the '0' byte for leaf
155  # nodes for 2nd preimage attack resistance.
156  hasher.update(b'\x01')
157  hasher.update(l)
158  hasher.update(r)
159  return hasher.digest()
160
161
162def chain_border_right(seed, proof):
163  """Computes a subtree hash along the left-side tree border.
164
165  Arguments:
166    seed: A bytearray containing the starting hash.
167    proof: A list of bytearrays representing the hashes in the inclusion proof.
168
169  Returns:
170    A bytearray containing the left-side subtree hash.
171  """
172  for h in proof:
173    seed = rfc6962_hash_children(h, seed)
174  return seed
175
176
177def chain_inner(seed, proof, leaf_index):
178  """Computes a subtree hash on or below the tree's right border.
179
180  Arguments:
181    seed: A bytearray containing the starting hash.
182    proof: A list of bytearrays representing the hashes in the inclusion proof.
183    leaf_index: The current leaf index.
184
185  Returns:
186    A bytearray containing the subtree hash.
187  """
188  for i, h in enumerate(proof):
189    if leaf_index >> i & 1 == 0:
190      seed = rfc6962_hash_children(seed, h)
191    else:
192      seed = rfc6962_hash_children(h, seed)
193  return seed
194
195
196def root_from_icp(leaf_index, tree_size, proof, leaf_hash):
197  """Calculates the expected Merkle tree root hash.
198
199  Arguments:
200    leaf_index: The current leaf index.
201    tree_size: The number of nodes in the Merkle tree.
202    proof: A list of bytearrays containing the inclusion proof.
203    leaf_hash: A bytearray containing the initial leaf hash.
204
205  Returns:
206    A bytearray containing the calculated Merkle tree root hash.
207
208  Raises:
209    AftlError: If invalid parameters are passed in.
210  """
211  if leaf_index < 0:
212    raise AftlError('Invalid leaf_index value: {}'.format(leaf_index))
213  if tree_size < 0:
214    raise AftlError('Invalid tree_size value: {}'.format(tree_size))
215  if leaf_index >= tree_size:
216    err_str = 'leaf_index cannot be equal or larger than tree_size: {}, {}'
217    raise AftlError(err_str.format(leaf_index, tree_size))
218  if proof is None:
219    raise AftlError('Inclusion proof not provided.')
220  if leaf_hash is None:
221    raise AftlError('No leaf hash provided.')
222  # Calculate the point to split the proof into two parts.
223  # The split is where the paths to leaves diverge.
224  inner = (leaf_index ^ (tree_size - 1)).bit_length()
225  result = chain_inner(leaf_hash, proof[:inner], leaf_index)
226  result = chain_border_right(result, proof[inner:])
227  return result
228
229
230class AftlIcpHeader(object):
231  """A class for the transparency log inclusion proof header.
232
233  Attributes:
234    magic: Magic for identifying the ICP header.
235    required_icp_version_major: The major version of AVB that wrote the entry.
236    required_icp_version_minor: The minor version of AVB that wrote the entry.
237    aftl_descriptor_size: Total size of the header's AftlDescriptor.
238    icp_count: Number of inclusion proofs represented in this structure.
239  """
240
241  SIZE = 18  # The size of the structure, in bytes
242  MAGIC = 'AFTL'
243  FORMAT_STRING = ('!4s2L'  # magic, major & minor version
244                   'L'      # descriptor size
245                   'H')     # number of inclusion proof entries
246
247  def __init__(self, data=None):
248    """Initializes a new transparency header object.
249
250    Arguments:
251      data: If not None, must be a bytearray of size |SIZE|.
252
253    Raises:
254      AftlError: If invalid structure for AftlIcpHeader.
255    """
256    assert struct.calcsize(self.FORMAT_STRING) == self.SIZE
257
258    if data:
259      (self.magic, self.required_icp_version_major,
260       self.required_icp_version_minor, self.aftl_descriptor_size,
261       self.icp_count) = struct.unpack(self.FORMAT_STRING, data)
262    else:
263      self.magic = self.MAGIC
264      self.required_icp_version_major = avbtool.AVB_VERSION_MAJOR
265      self.required_icp_version_minor = avbtool.AVB_VERSION_MINOR
266      self.aftl_descriptor_size = self.SIZE
267      self.icp_count = 0
268    if not self.is_valid():
269      raise AftlError('Invalid structure for AftlIcpHeader')
270
271  def save(self, output):
272    """Serializes the transparency header |SIZE| to disk.
273
274    Arguments:
275      output: The object to write the header to.
276
277    Raises:
278      AftlError: If invalid structure for AftlIcpHeader.
279    """
280    output.write(self.encode())
281
282  def encode(self):
283    """Serializes the header |SIZE| to a bytearray().
284
285    Returns:
286      A bytearray() with the encoded header.
287
288    Raises:
289      AftlError: If invalid structure for AftlIcpHeader.
290    """
291    if not self.is_valid():
292      raise AftlError('Invalid structure for AftlIcpHeader')
293    return struct.pack(self.FORMAT_STRING, self.magic,
294                       self.required_icp_version_major,
295                       self.required_icp_version_minor,
296                       self.aftl_descriptor_size,
297                       self.icp_count)
298
299  def is_valid(self):
300    """Ensures that values in an AftlIcpHeader structure are sane.
301
302    Returns:
303      True if the values in the AftlIcpHeader are sane, False otherwise.
304    """
305    if self.magic != AftlIcpHeader.MAGIC:
306      sys.stderr.write(
307          'ICP Header: magic value mismatch: {}\n'.format(self.magic))
308      return False
309
310    if self.required_icp_version_major > avbtool.AVB_VERSION_MAJOR:
311      sys.stderr.write('ICP header: major version mismatch: {}\n'.format(
312          self.required_icp_version_major))
313      return False
314
315    if self.required_icp_version_minor > avbtool.AVB_VERSION_MINOR:
316      sys.stderr.write('ICP header: minor version mismatch: {}\n'.format(
317          self.required_icp_version_minor))
318      return False
319
320    if self.aftl_descriptor_size < self.SIZE:
321      sys.stderr.write('ICP Header: Invalid descriptor size: {}\n'.format(
322          self.aftl_descriptor_size))
323      return False
324
325    if self.icp_count < 0 or self.icp_count > 65535:
326      sys.stderr.write(
327          'ICP header: ICP entry count out of range: {}\n'.format(
328              self.icp_count))
329      return False
330    return True
331
332  def print_desc(self, o):
333    """Print the descriptor.
334
335    Arguments:
336      o: The object to write the output to.
337    """
338    i = ' ' * 4
339    fmt = '{}{:25}{}\n'
340    o.write(fmt.format(i, 'Major version:', self.required_icp_version_major))
341    o.write(fmt.format(i, 'Minor version:', self.required_icp_version_minor))
342    o.write(fmt.format(i, 'Descriptor size:', self.aftl_descriptor_size))
343    o.write(fmt.format(i, 'ICP entries count:', self.icp_count))
344
345
346class AftlIcpEntry(object):
347  """A class for the transparency log inclusion proof entries.
348
349  The data that represents each of the components of the ICP entry are stored
350  immediately following the ICP entry header. The format is log_url,
351  SignedLogRoot, and inclusion proof hashes.
352
353  Attributes:
354    log_url_size: Length of the string representing the transparency log URL.
355    leaf_index: Leaf index in the transparency log representing this entry.
356    log_root_descriptor_size: Size of the transparency log's SignedLogRoot.
357    fw_info_leaf_size: Size of the FirmwareInfo leaf passed to the log.
358    log_root_sig_size: Size in bytes of the log_root_signature
359    proof_hash_count: Number of hashes comprising the inclusion proof.
360    inc_proof_size: The total size of the inclusion proof, in bytes.
361    log_url: The URL for the transparency log that generated this inclusion
362        proof.
363    log_root_descriptor: The data comprising the signed tree head structure.
364    fw_info_leaf: The data comprising the FirmwareInfo leaf.
365    log_root_signature: The data comprising the log root signature.
366    proofs: The hashes comprising the inclusion proof.
367
368  """
369  SIZE = 27  # The size of the structure, in bytes
370  FORMAT_STRING = ('!L'   # transparency log server url size
371                   'Q'    # leaf index
372                   'L'    # log root descriptor size
373                   'L'    # firmware info leaf size
374                   'H'    # log root signature size
375                   'B'    # number of hashes in the inclusion proof
376                   'L')   # size of the inclusion proof in bytes
377  # These are used to capture the log_url, log_root_descriptor,
378  # fw_info leaf, log root signature, and the proofs elements for the
379  # encode & save functions.
380
381  def __init__(self, data=None):
382    """Initializes a new ICP entry object.
383
384    Arguments:
385      data: If not None, must be a bytearray of size >= |SIZE|.
386
387    Raises:
388      AftlError: If data does not represent a well-formed AftlIcpEntry.
389    """
390    # Assert the header structure is of a sane size.
391    assert struct.calcsize(self.FORMAT_STRING) == self.SIZE
392
393    if data:
394      # Deserialize the header from the data descriptor.
395      (self._log_url_size_expected,
396       self.leaf_index,
397       self._log_root_descriptor_size_expected,
398       self._fw_info_leaf_size_expected,
399       self._log_root_sig_size_expected,
400       self._proof_hash_count_expected,
401       self._inc_proof_size_expected) = struct.unpack(self.FORMAT_STRING,
402                                                      data[0:self.SIZE])
403
404      # Deserialize ICP entry components from the data.
405      expected_format_string = '{}s{}s{}s{}s{}s'.format(
406          self._log_url_size_expected,
407          self._log_root_descriptor_size_expected,
408          self._fw_info_leaf_size_expected,
409          self._log_root_sig_size_expected,
410          self._inc_proof_size_expected)
411
412      (self.log_url, log_root_descriptor_bytes, fw_info_leaf_bytes,
413       self.log_root_signature, proof_bytes) = struct.unpack(
414           expected_format_string, data[self.SIZE:self.get_expected_size()])
415
416      self.log_root_descriptor = TrillianLogRootDescriptor(
417          log_root_descriptor_bytes)
418      self.fw_info_leaf = FirmwareInfoLeaf(fw_info_leaf_bytes)
419
420      self.proofs = []
421      if self._proof_hash_count_expected > 0:
422        proof_idx = 0
423        hash_size = self._inc_proof_size_expected // self._proof_hash_count_expected
424        for _ in range(self._proof_hash_count_expected):
425          proof = proof_bytes[proof_idx:(proof_idx+hash_size)]
426          self.proofs.append(proof)
427          proof_idx += hash_size
428    else:
429      self.leaf_index = 0
430      self.log_url = ''
431      self.log_root_descriptor = TrillianLogRootDescriptor()
432      self.fw_info_leaf = FirmwareInfoLeaf()
433      self.log_root_signature = ''
434      self.proofs = []
435    if not self.is_valid():
436      raise AftlError('Invalid structure for AftlIcpEntry')
437
438  @property
439  def log_url_size(self):
440    """Gets the size of the log_url attribute."""
441    if hasattr(self, 'log_url'):
442      return len(self.log_url)
443    return self._log_url_size_expected
444
445  @property
446  def log_root_descriptor_size(self):
447    """Gets the size of the log_root_descriptor attribute."""
448    if hasattr(self, 'log_root_descriptor'):
449      return self.log_root_descriptor.get_expected_size()
450    return self._log_root_descriptor_size_expected
451
452  @property
453  def fw_info_leaf_size(self):
454    """Gets the size of the fw_info_leaf attribute."""
455    if hasattr(self, 'fw_info_leaf'):
456      return self.fw_info_leaf.get_expected_size()
457    return self._fw_info_leaf_size_expected
458
459  @property
460  def log_root_sig_size(self):
461    """Gets the size of the log_root signature."""
462    if hasattr(self, 'log_root_signature'):
463      return len(self.log_root_signature)
464    return self._log_root_sig_size_expected
465
466  @property
467  def proof_hash_count(self):
468    """Gets the number of proof hashes."""
469    if hasattr(self, 'proofs'):
470      return len(self.proofs)
471    return self._proof_hash_count_expected
472
473  @property
474  def inc_proof_size(self):
475    """Gets the total size of the proof hashes in bytes."""
476    if hasattr(self, 'proofs'):
477      result = 0
478      for proof in self.proofs:
479        result += len(proof)
480      return result
481    return self._inc_proof_size_expected
482
483  def verify_icp(self, transparency_log_pub_key):
484    """Verifies the contained inclusion proof given the public log key.
485
486    Arguments:
487      transparency_log_pub_key: The path to the trusted public key for the log.
488
489    Returns:
490      True if the calculated signature matches AftlIcpEntry's. False otherwise.
491    """
492    if not transparency_log_pub_key:
493      return False
494
495    leaf_hash = rfc6962_hash_leaf(self.fw_info_leaf.encode())
496    calc_root = root_from_icp(self.leaf_index,
497                              self.log_root_descriptor.tree_size,
498                              self.proofs,
499                              leaf_hash)
500    if ((calc_root == self.log_root_descriptor.root_hash) and
501        check_signature(
502            self.log_root_descriptor.encode(),
503            self.log_root_signature,
504            transparency_log_pub_key)):
505      return True
506    return False
507
508  def verify_vbmeta_image(self, vbmeta_descriptor, transparency_log_pub_key):
509    """Verify the inclusion proof for the given vbmeta_descriptor.
510
511    Arguments:
512      vbmeta_descriptor: A bytearray with the vbmeta descriptor.
513      transparency_log_pub_key: File path to the PEM file containing the trusted
514        transparency log public key.
515
516    Returns:
517      True if the inclusion proof validates and the vbmeta hash of the given
518      descriptor matches the one in the fw_info_leaf; otherwise False.
519    """
520    if not vbmeta_descriptor:
521      return False
522
523    # Calculate the hash of the vbmeta image.
524    vbmeta_hash = hashlib.sha256(vbmeta_descriptor).digest()
525
526    # Validates the inclusion proof and then compare the calculated vbmeta_hash
527    # against the one in the inclusion proof.
528    return (self.verify_icp(transparency_log_pub_key)
529            and self.fw_info_leaf.vbmeta_hash == vbmeta_hash)
530
531  def save(self, output):
532    """Serializes the transparency header |SIZE| and data to disk.
533
534    Arguments:
535      output: The object to write the header to.
536
537    Raises:
538      AftlError: If invalid entry structure.
539    """
540    output.write(self.encode())
541
542  def encode(self):
543    """Serializes the header |SIZE| and data to a bytearray().
544
545    Returns:
546      A bytearray() with the encoded header.
547
548    Raises:
549      AftlError: If invalid entry structure.
550    """
551    proof_bytes = bytearray()
552    if not self.is_valid():
553      raise AftlError('Invalid AftlIcpEntry structure')
554
555    expected_format_string = '{}{}s{}s{}s{}s{}s'.format(
556        self.FORMAT_STRING,
557        self.log_url_size,
558        self.log_root_descriptor_size,
559        self.fw_info_leaf_size,
560        self.log_root_sig_size,
561        self.inc_proof_size)
562
563    for proof in self.proofs:
564      proof_bytes.extend(proof)
565
566    return struct.pack(expected_format_string,
567                       self.log_url_size, self.leaf_index,
568                       self.log_root_descriptor_size, self.fw_info_leaf_size,
569                       self.log_root_sig_size, self.proof_hash_count,
570                       self.inc_proof_size, self.log_url,
571                       self.log_root_descriptor.encode(),
572                       str(self.fw_info_leaf.encode()),
573                       str(self.log_root_signature),
574                       str(proof_bytes))
575
576  def translate_response(self, transparency_log, afi_response):
577    """Translates an AddFirmwareInfoResponse object to an AftlIcpEntry.
578
579    Arguments:
580      transparency_log: String representing the transparency log URL.
581      afi_response: The AddFirmwareResponse object to translate.
582    """
583    self.log_url = transparency_log
584
585    # Deserializes from AddFirmwareInfoResponse.
586    self.leaf_index = afi_response.fw_info_proof.proof.leaf_index
587    self.log_root_descriptor = TrillianLogRootDescriptor(
588        afi_response.fw_info_proof.sth.log_root)
589    self.fw_info_leaf = FirmwareInfoLeaf(afi_response.fw_info_leaf)
590    self.log_root_signature = afi_response.fw_info_proof.sth.log_root_signature
591    self.proofs = afi_response.fw_info_proof.proof.hashes
592
593  def get_expected_size(self):
594    """Gets the expected size of the full entry out of the header.
595
596    Returns:
597      The expected size of the AftlIcpEntry from the header.
598    """
599    return (self.SIZE + self.log_url_size + self.log_root_descriptor_size +
600            self.fw_info_leaf_size + self.log_root_sig_size +
601            self.inc_proof_size)
602
603  def is_valid(self):
604    """Ensures that values in an AftlIcpEntry structure are sane.
605
606    Returns:
607      True if the values in the AftlIcpEntry are sane, False otherwise.
608    """
609    if self.leaf_index < 0:
610      sys.stderr.write('ICP entry: leaf index out of range: '
611                       '{}.\n'.format(self.leaf_index))
612      return False
613
614    if (not self.log_root_descriptor or
615        not isinstance(self.log_root_descriptor, TrillianLogRootDescriptor) or
616        not self.log_root_descriptor.is_valid()):
617      sys.stderr.write('ICP entry: invalid TrillianLogRootDescriptor.\n')
618      return False
619
620    if (not self.fw_info_leaf or
621        not isinstance(self.fw_info_leaf, FirmwareInfoLeaf)):
622      sys.stderr.write('ICP entry: invalid FirmwareInfo.\n')
623      return False
624    return True
625
626  def print_desc(self, o):
627    """Print the descriptor.
628
629    Arguments:
630      o: The object to write the output to.
631    """
632    i = ' ' * 4
633    fmt = '{}{:25}{}\n'
634    o.write(fmt.format(i, 'Transparency Log:', self.log_url))
635    o.write(fmt.format(i, 'Leaf index:', self.leaf_index))
636    o.write('    ICP hashes:              ')
637    for i, proof_hash in enumerate(self.proofs):
638      if i != 0:
639        o.write(' ' * 29)
640      o.write('{}\n'.format(binascii.hexlify(proof_hash)))
641    self.log_root_descriptor.print_desc(o)
642    self.fw_info_leaf.print_desc(o)
643
644
645class TrillianLogRootDescriptor(object):
646  """A class representing the Trillian log_root descriptor.
647
648  Taken from Trillian definitions:
649  https://github.com/google/trillian/blob/master/trillian.proto#L255
650
651  Attributes:
652    version: The version number of the descriptor. Currently only version=1 is
653        supported.
654    tree_size: The size of the tree.
655    root_hash_size: The size of the root hash in bytes. Valid values are between
656        0 and 128.
657    root_hash: The root hash as bytearray().
658    timestamp: The timestamp in nanoseconds.
659    revision: The revision number as long.
660    metadata_size: The size of the metadata in bytes. Valid values are between
661        0 and 65535.
662    metadata: The metadata as bytearray().
663  """
664  FORMAT_STRING_PART_1 = ('!H'  # version
665                          'Q'   # tree_size
666                          'B'   # root_hash_size
667                         )
668
669  FORMAT_STRING_PART_2 = ('!Q'  # timestamp
670                          'Q'   # revision
671                          'H'   # metadata_size
672                         )
673
674  def __init__(self, data=None):
675    """Initializes a new TrillianLogRoot descriptor."""
676    if data:
677      # Parses first part of the log_root descriptor.
678      data_length = struct.calcsize(self.FORMAT_STRING_PART_1)
679      (self.version, self.tree_size, self.root_hash_size) = struct.unpack(
680          self.FORMAT_STRING_PART_1, data[0:data_length])
681      data = data[data_length:]
682
683      # Parses the root_hash bytes if the size indicates existance.
684      if self.root_hash_size > 0:
685        self.root_hash = data[0:self.root_hash_size]
686        data = data[self.root_hash_size:]
687      else:
688        self.root_hash = bytearray()
689
690      # Parses second part of the log_root descriptor.
691      data_length = struct.calcsize(self.FORMAT_STRING_PART_2)
692      (self.timestamp, self.revision, self.metadata_size) = struct.unpack(
693          self.FORMAT_STRING_PART_2, data[0:data_length])
694      data = data[data_length:]
695
696      # Parses the metadata if the size indicates existance.
697      if self.metadata_size > 0:
698        self.metadata = data[0:self.metadata_size]
699      else:
700        self.metadata = bytearray()
701    else:
702      self.version = 1
703      self.tree_size = 0
704      self.root_hash_size = 0
705      self.root_hash = bytearray()
706      self.timestamp = 0
707      self.revision = 0
708      self.metadata_size = 0
709      self.metadata = bytearray()
710
711    if not self.is_valid():
712      raise AftlError('Invalid structure for TrillianLogRootDescriptor.')
713
714  def get_expected_size(self):
715    """Calculates the expected size of the TrillianLogRootDescriptor.
716
717    Returns:
718      The expected size of the TrillianLogRootDescriptor.
719    """
720    return (struct.calcsize(self.FORMAT_STRING_PART_1) + self.root_hash_size +
721            struct.calcsize(self.FORMAT_STRING_PART_2) + self.metadata_size)
722
723  def encode(self):
724    """Serializes the TrillianLogDescriptor to a bytearray().
725
726    Returns:
727      A bytearray() with the encoded header.
728
729    Raises:
730      AftlError: If invalid entry structure.
731    """
732    if not self.is_valid():
733      raise AftlError('Invalid structure for TrillianLogRootDescriptor.')
734
735    expected_format_string = '{}{}s{}{}s'.format(
736        self.FORMAT_STRING_PART_1,
737        self.root_hash_size,
738        self.FORMAT_STRING_PART_2[1:],
739        self.metadata_size)
740
741    return struct.pack(expected_format_string,
742                       self.version, self.tree_size, self.root_hash_size,
743                       str(self.root_hash), self.timestamp, self.revision,
744                       self.metadata_size, str(self.metadata))
745
746  def is_valid(self):
747    """Ensures that values in the descritor are sane.
748
749    Returns:
750      True if the values are sane; otherwise False.
751    """
752    cls = self.__class__.__name__
753    if self.version != 1:
754      sys.stderr.write('{}: Bad version value {}.\n'.format(cls, self.version))
755      return False
756    if self.tree_size < 0:
757      sys.stderr.write('{}: Bad tree_size value {}.\n'.format(cls,
758                                                              self.tree_size))
759      return False
760    if self.root_hash_size < 0 or self.root_hash_size > 128:
761      sys.stderr.write('{}: Bad root_hash_size value {}.\n'.format(
762          cls, self.root_hash_size))
763      return False
764    if len(self.root_hash) != self.root_hash_size:
765      sys.stderr.write('{}: root_hash_size {} does not match with length of '
766                       'root_hash {}.\n'.format(cls, self.root_hash_size,
767                                                len(self.root_hash)))
768      return False
769    if self.timestamp < 0:
770      sys.stderr.write('{}: Bad timestamp value {}.\n'.format(cls,
771                                                              self.timestamp))
772      return False
773    if self.revision < 0:
774      sys.stderr.write('{}: Bad revision value {}.\n'.format(cls,
775                                                             self.revision))
776      return False
777    if self.metadata_size < 0 or self.metadata_size > 65535:
778      sys.stderr.write('{}: Bad metadatasize value {}.\n'.format(
779          cls, self.metadata_size))
780      return False
781    if len(self.metadata) != self.metadata_size:
782      sys.stderr.write('{}: metadata_size {} does not match with length of'
783                       'metadata {}.\n'.format(cls, self.metadata_size,
784                                               len(self.metadata)))
785      return False
786    return True
787
788  def print_desc(self, o):
789    """Print the descriptor.
790
791    Arguments:
792      o: The object to write the output to.
793    """
794    o.write('    Log Root Descriptor:\n')
795    i = ' ' * 6
796    fmt = '{}{:23}{}\n'
797    o.write(fmt.format(i, 'Version:', self.version))
798    o.write(fmt.format(i, 'Tree size:', self.tree_size))
799    o.write(fmt.format(i, 'Root hash size:', self.root_hash_size))
800    if self.root_hash_size > 0:
801      o.write(fmt.format(i, 'Root hash:', binascii.hexlify(self.root_hash)))
802      o.write(fmt.format(i, 'Timestamp (ns):', self.timestamp))
803    o.write(fmt.format(i, 'Revision:', self.revision))
804    o.write(fmt.format(i, 'Metadata size:', self.metadata_size))
805    if self.metadata_size > 0:
806      o.write(fmt.format(i, 'Metadata:', binascii.hexlify(self.metadata)))
807
808
809class FirmwareInfoLeaf(object):
810  """A class representing the FirmwareInfo leaf.
811
812  AFTL returns the fw_info_leaf as a JSON blob and this class is able to
813  parse the blog and provide necessary attributes needed for validation.
814
815  Attributes:
816    vbmeta_hash: This is the SHA256 hash of vbmeta.
817    version_incremental: Subcomponent of the build fingerprint as defined at
818      https://source.android.com/compatibility/android-cdd#3_2_2_build_parameters.
819      For example, a Pixel device with the following build fingerprint
820      google/crosshatch/crosshatch:9/PQ3A.190605.003/5524043:user/release-keys,
821      would have 5524043 for the version incremental.
822    platform_key: Public key of the platform. This is the same key used to sign
823      the vbmeta.
824    manufacturer_key_hash:  SHA256 of the manufacturer public key (DER-encoded,
825      x509 subjectPublicKeyInfo format). The public key MUST already be in the
826      list of root keys known and trusted by the AFTL.
827    description: Free form description field. It can be used to annotate this
828      message with further context on the build (e.g., carrier specific build).
829  """
830
831  def __init__(self, data=None):
832    """Initializes a new FirmwareInfoLeaf descriptor."""
833    if data:
834      # We have to preserve the original fw_info_leaf bytes in order to preserve
835      # hash equivalence with what is stored in the Trillian log and matches up
836      # with the proofs.
837      self._fw_info_leaf_bytes = data
838
839      # Deserialize the JSON blob and keep only the FirmwareInfo parts.
840      try:
841        fw_info_leaf = json.loads(self._fw_info_leaf_bytes)
842        self._fw_info_leaf_dict = (
843            fw_info_leaf['Value']['FwInfo']['info']['info'])
844      except (ValueError, KeyError) as e:
845        raise AftlError('Invalid structure for FirmwareInfoLeaf: {}'.format(e))
846    else:
847      self._fw_info_leaf_bytes = ''
848      self._fw_info_leaf_dict = dict()
849
850    if not self.is_valid():
851      raise AftlError('Invalid structure for FirmwareInfoLeaf.')
852
853  @property
854  def vbmeta_hash(self):
855    """Gets the vbmeta_hash attribute."""
856    return self._lookup_base64_attribute('vbmeta_hash')
857
858  @property
859  def version_incremental(self):
860    """Gets the version_incremental attribute."""
861    return self._fw_info_leaf_dict.get('version_incremental')
862
863  @property
864  def platform_key(self):
865    """Gets the platform key attribute."""
866    return self._lookup_base64_attribute('platform_key')
867
868  @property
869  def manufacturer_key_hash(self):
870    """Gets the manufacturer_key_hash attribute."""
871    return self._lookup_base64_attribute('manufacturer_key_hash')
872
873  @property
874  def description(self):
875    """Gets the description attribute."""
876    return self._fw_info_leaf_dict.get('description')
877
878  def _lookup_base64_attribute(self, key):
879    """Looks up an attribute that is Base64 encoded and decodes it.
880
881    Arguments:
882      key: The name of the attribute to look up.
883
884    Returns:
885      The attribute value or None if not defined.
886    """
887    result = self._fw_info_leaf_dict.get(key)
888    if result:
889      result = base64.b64decode(result)
890    return result
891
892  def get_expected_size(self):
893    """Gets the expected size of the JSON-serialized FirmwareInfoLeaf.
894
895    Returns:
896      The expected size of the FirmwareInfo leaf in byte or 0 if not initalized.
897    """
898    if not self._fw_info_leaf_bytes:
899      return 0
900    return len(self._fw_info_leaf_bytes)
901
902  def encode(self):
903    """Serializes the FirmwareInfoLeaf.
904
905    Returns:
906      A bytearray() with the JSON-serialized FirmwareInfoLeaf.
907    """
908    return self._fw_info_leaf_bytes
909
910  def is_valid(self):
911    """Ensures that values in the descritor are sane.
912
913    Returns:
914      True if the values are sane; otherwise False.
915    """
916    # Checks that the decode fw_info_leaf at max contains values defined in the
917    # FirmwareInfo proto buf.
918    expected_fields = set(aftl_pb2.FirmwareInfo()
919                          .DESCRIPTOR.fields_by_name.keys())
920    actual_fields = set(self._fw_info_leaf_dict.keys())
921    if actual_fields.issubset(expected_fields):
922      return True
923    return False
924
925  def print_desc(self, o):
926    """Print the descriptor.
927
928    Arguments:
929      o: The object to write the output to.
930    """
931    o.write('    Firmware Info Leaf:\n')
932    # The order of the fields is based on the definition in
933    # proto.aftl_pb2.FirmwareInfo.
934    i = ' ' * 6
935    fmt = '{}{:23}{}\n'
936    if self.vbmeta_hash:
937      o.write(fmt.format(i, 'VBMeta hash:', binascii.hexlify(self.vbmeta_hash)))
938    if self.version_incremental:
939      o.write(fmt.format(i, 'Version incremental:', self.version_incremental))
940    if self.platform_key:
941      o.write(fmt.format(i, 'Platform key:', self.platform_key))
942    if self.manufacturer_key_hash:
943      o.write(fmt.format(i, 'Manufacturer key hash:',
944                         binascii.hexlify(self.manufacturer_key_hash)))
945    if self.description:
946      o.write(fmt.format(i, 'Description:', self.description))
947
948
949class AftlDescriptor(object):
950  """A class for the transparency log inclusion proof descriptor.
951
952  This encapsulates an AFTL ICP section with all information required to
953  validate an inclusion proof.
954
955  Attributes:
956    icp_header: A header for the section.
957    icp_entries: A list of AftlIcpEntry objects representing the inclusion
958        proofs.
959  """
960
961  def __init__(self, data=None):
962    """Initializes a new AftlDescriptor section.
963
964    Arguments:
965      data: If not None, must be a bytearray representing an AftlDescriptor.
966
967    Raises:
968      AftlError: If the data does not represent a well-formed AftlDescriptor.
969    """
970    if data:
971      icp_header_bytes = data[0:AftlIcpHeader.SIZE]
972      self.icp_header = AftlIcpHeader(icp_header_bytes)
973      if not self.icp_header.is_valid():
974        raise AftlError('Invalid ICP header.')
975      icp_count = self.icp_header.icp_count
976
977      # Jump past the header for entry deserialization.
978      icp_index = AftlIcpHeader.SIZE
979      # Validate each entry.
980      self.icp_entries = []
981      # Add_icp_entry updates entries and header, so set header count to
982      # compensate.
983      self.icp_header.icp_count = 0
984      for i in range(icp_count):
985        # Get the entry header from the AftlDescriptor.
986        cur_icp_entry = AftlIcpEntry(data[icp_index:])
987        cur_icp_entry_size = cur_icp_entry.get_expected_size()
988        # Now validate the entry structure.
989        if not cur_icp_entry.is_valid():
990          raise AftlError('Validation of ICP entry {} failed.'.format(i))
991        self.add_icp_entry(cur_icp_entry)
992        icp_index += cur_icp_entry_size
993    else:
994      self.icp_header = AftlIcpHeader()
995      self.icp_entries = []
996    if not self.is_valid():
997      raise AftlError('Malformed AFTLDescriptor')
998
999  def add_icp_entry(self, icp_entry):
1000    """Adds a new AftlIcpEntry to the AftlDescriptor, updating fields as needed.
1001
1002    Arguments:
1003      icp_entry: An AftlIcpEntry structure.
1004    """
1005    self.icp_entries.append(icp_entry)
1006    self.icp_header.icp_count += 1
1007    self.icp_header.aftl_descriptor_size += icp_entry.get_expected_size()
1008
1009  def verify_vbmeta_image(self, vbmeta_image, transparency_log_pub_keys):
1010    """Verifies the contained inclusion proof given the public log key.
1011
1012    Arguments:
1013      vbmeta_image: The vbmeta_image that should be verified against the
1014        inclusion proof.
1015      transparency_log_pub_keys: List of paths to PEM files containing trusted
1016        public keys that correspond with the transparency_logs.
1017
1018    Returns:
1019      True if all the inclusion proofs in the AfltDescriptor validate, are
1020      signed by one of the give transparency log public keys; otherwise false.
1021    """
1022    if not transparency_log_pub_keys or not self.icp_entries:
1023      return False
1024
1025    icp_verified = 0
1026    for icp_entry in self.icp_entries:
1027      verified = False
1028      for pub_key in transparency_log_pub_keys:
1029        if icp_entry.verify_vbmeta_image(vbmeta_image, pub_key):
1030          verified = True
1031          break
1032      if verified:
1033        icp_verified += 1
1034    return icp_verified == len(self.icp_entries)
1035
1036  def save(self, output):
1037    """Serializes the AftlDescriptor to disk.
1038
1039    Arguments:
1040      output: The object to write the descriptor to.
1041
1042    Raises:
1043      AftlError: If invalid descriptor structure.
1044    """
1045    output.write(self.encode())
1046
1047  def encode(self):
1048    """Serialize the AftlDescriptor to a bytearray().
1049
1050    Returns:
1051      A bytearray() with the encoded header.
1052
1053    Raises:
1054      AftlError: If invalid descriptor structure.
1055    """
1056    # The header and entries are guaranteed to be valid when encode is called.
1057    # Check the entire structure as a whole.
1058    if not self.is_valid():
1059      raise AftlError('Invalid AftlDescriptor structure.')
1060
1061    icp_descriptor = bytearray()
1062    icp_descriptor.extend(self.icp_header.encode())
1063    for icp_entry in self.icp_entries:
1064      icp_descriptor.extend(icp_entry.encode())
1065    return icp_descriptor
1066
1067  def is_valid(self):
1068    """Ensures that values in the AftlDescriptor are sane.
1069
1070    Returns:
1071      True if the values in the AftlDescriptor are sane, False otherwise.
1072    """
1073    if not self.icp_header.is_valid():
1074      return False
1075
1076    if self.icp_header.icp_count != len(self.icp_entries):
1077      return False
1078
1079    for icp_entry in self.icp_entries:
1080      if not icp_entry.is_valid():
1081        return False
1082    return True
1083
1084  def print_desc(self, o):
1085    """Print the descriptor.
1086
1087    Arguments:
1088      o: The object to write the output to.
1089    """
1090    o.write('Android Firmware Transparency Descriptor:\n')
1091    o.write('  Header:\n')
1092    self.icp_header.print_desc(o)
1093    for i, icp_entry in enumerate(self.icp_entries):
1094      o.write('  Entry #{}:\n'.format(i + 1))
1095      icp_entry.print_desc(o)
1096
1097
1098class AftlCommunication(object):
1099  """Class to abstract the communication layer with the transparency log."""
1100
1101  def __init__(self, transparency_log, timeout):
1102    """Initializes the object.
1103
1104    Arguments:
1105      transparency_log: String containing the URL of a transparency log server.
1106      timeout: Duration in seconds before requests to the AFTL times out. A
1107        value of 0 or None means there will be no timeout.
1108    """
1109    self.transparency_log = transparency_log
1110    if timeout:
1111      self.timeout = timeout
1112    else:
1113      self.timeout = None
1114
1115  def add_firmware_info(self, request):
1116    """Calls the AddFirmwareInfo RPC on the AFTL server.
1117
1118    Arguments:
1119      request: A AddFirmwareInfoRequest message.
1120
1121    Returns:
1122      An AddFirmwareInfoReponse message.
1123
1124    Raises:
1125      AftlError: If grpc or the proto modules cannot be loaded, if there is an
1126        error communicating with the log.
1127    """
1128    raise NotImplementedError(
1129        'AddFirmwareInfo() needs to be implemented by subclass.')
1130
1131
1132class AftlGrpcCommunication(AftlCommunication):
1133  """Class that implements GRPC communication to the AFTL server."""
1134
1135  def add_firmware_info(self, request):
1136    """Calls the AddFirmwareInfo RPC on the AFTL server.
1137
1138    Arguments:
1139      request: A AddFirmwareInfoRequest message.
1140
1141    Returns:
1142      An AddFirmwareInfoReponse message.
1143
1144    Raises:
1145      AftlError: If grpc or the proto modules cannot be loaded, if there is an
1146        error communicating with the log.
1147    """
1148    # Import grpc now to avoid global dependencies as it otherwise breakes
1149    # running unittest with atest.
1150    try:
1151      import grpc
1152      from proto import api_pb2_grpc
1153    except ImportError as e:
1154      err_str = 'grpc can be installed with python pip install grpcio.\n'
1155      raise AftlError('Failed to import module: ({}).\n{}'.format(e, err_str))
1156
1157    # Set up the gRPC channel with the transparency log.
1158    sys.stdout.write('Preparing to request inclusion proof from {}. This could '
1159                     'take ~30 seconds for the process to complete.\n'.format(
1160                         self.transparency_log))
1161    channel = grpc.insecure_channel(self.transparency_log)
1162    stub = api_pb2_grpc.AFTLogStub(channel)
1163
1164    # Attempt to transmit to the transparency log.
1165    sys.stdout.write('ICP is about to be requested from transparency log '
1166                     'with domain {}.\n'.format(self.transparency_log))
1167    try:
1168      response = stub.AddFirmwareInfo(request, timeout=self.timeout)
1169    except grpc.RpcError as e:
1170      raise AftlError('Error: grpc failure ({})'.format(e))
1171    return response
1172
1173
1174class Aftl(avbtool.Avb):
1175  """Business logic for aftltool command-line tool."""
1176
1177  def get_vbmeta_image(self, image_filename):
1178    """Gets the VBMeta struct bytes from image.
1179
1180    Arguments:
1181      image_filename: Image file to get information from.
1182
1183    Returns:
1184      A tuple with following elements:
1185        1. A bytearray with the vbmeta structure or None if the file does not
1186           contain a VBMeta structure.
1187        2. The VBMeta image footer.
1188    """
1189    # Reads and parses the vbmeta image.
1190    try:
1191      image = avbtool.ImageHandler(image_filename)
1192    except (IOError, ValueError) as e:
1193      sys.stderr.write('The image does not contain a valid VBMeta structure: '
1194                       '{}.\n'.format(e))
1195      return None, None
1196
1197    try:
1198      (footer, header, _, _) = self._parse_image(image)
1199    except avbtool.AvbError as e:
1200      sys.stderr.write('The image cannot be parsed: {}.\n'.format(e))
1201      return None, None
1202
1203    # Seeks for the start of the vbmeta image and calculates its size.
1204    offset = 0
1205    if footer:
1206      offset = footer.vbmeta_offset
1207    vbmeta_image_size = (offset + header.SIZE
1208                         + header.authentication_data_block_size
1209                         + header.auxiliary_data_block_size)
1210
1211    # Reads the vbmeta image bytes.
1212    try:
1213      image.seek(offset)
1214    except RuntimeError as e:
1215      sys.stderr.write('Given vbmeta image offset is invalid: {}.\n'.format(e))
1216      return None, None
1217    return image.read(vbmeta_image_size), footer
1218
1219  def get_aftl_descriptor(self, image_filename):
1220    """Gets the AftlDescriptor from image.
1221
1222    Arguments:
1223      image_filename: Image file to get information from.
1224
1225    Returns:
1226      An AftlDescriptor or None if the file does not contain a AftlDescriptor.
1227    """
1228    # Reads the vbmeta image bytes.
1229    vbmeta_image, _ = self.get_vbmeta_image(image_filename)
1230    if not vbmeta_image:
1231      return None
1232
1233    try:
1234      image = avbtool.ImageHandler(image_filename)
1235    except ValueError as e:
1236      sys.stderr.write('The image does not contain a valid VBMeta structure: '
1237                       '{}.\n'.format(e))
1238      return None
1239
1240    # Seeks for the start of the AftlDescriptor.
1241    try:
1242      image.seek(len(vbmeta_image))
1243    except RuntimeError as e:
1244      sys.stderr.write('Given AftlDescriptor image offset is invalid: {}.\n'
1245                       .format(e))
1246      return None
1247
1248    # Parses the header for the AftlDescriptor size.
1249    tmp_header_bytes = image.read(AftlIcpHeader.SIZE)
1250    if not tmp_header_bytes or len(tmp_header_bytes) != AftlIcpHeader.SIZE:
1251      sys.stderr.write('This image does not contain a AftlDescriptor.\n')
1252      return None
1253
1254    try:
1255      tmp_header = AftlIcpHeader(tmp_header_bytes)
1256    except AftlError as e:
1257      sys.stderr.write('This image does not contain a valid AftlDescriptor: '
1258                       '{}.\n'.format(e))
1259      return None
1260
1261    # Resets to the beginning of the AftlDescriptor.
1262    try:
1263      image.seek(len(vbmeta_image))
1264    except RuntimeError as e:
1265      sys.stderr.write('Given AftlDescriptor image offset is invalid: {}.\n'
1266                       .format(e))
1267      return None
1268
1269    # Parses the full AftlDescriptor.
1270    icp_bytes = image.read(tmp_header.aftl_descriptor_size)
1271    aftl_descriptor = None
1272    try:
1273      aftl_descriptor = AftlDescriptor(icp_bytes)
1274    except AftlError as e:
1275      sys.stderr.write('The image does not contain a valid AftlDescriptor: '
1276                       '{}.\n'.format(e))
1277    return aftl_descriptor
1278
1279  def info_image_icp(self, vbmeta_image_path, output):
1280    """Implements the 'info_image_icp' command.
1281
1282    Arguments:
1283      vbmeta_image_path: Image file to get information from.
1284      output: Output file to write human-readable information to (file object).
1285
1286    Returns:
1287      True if the given image has an AftlDescriptor and could successfully
1288      be processed; otherwise False.
1289    """
1290    aftl_descriptor = self.get_aftl_descriptor(vbmeta_image_path)
1291    if not aftl_descriptor:
1292      return False
1293    aftl_descriptor.print_desc(output)
1294    return True
1295
1296  def verify_image_icp(self, vbmeta_image_path, transparency_log_pub_keys,
1297                       output):
1298    """Implements the 'verify_image_icp' command.
1299
1300    Arguments:
1301      vbmeta_image_path: Image file to get information from.
1302      transparency_log_pub_keys: List of paths to PEM files containing trusted
1303        public keys that correspond with the transparency_logs.
1304      output: Output file to write human-readable information to (file object).
1305
1306    Returns:
1307      True if for the given image the inclusion proof validates; otherwise
1308      False.
1309    """
1310    vbmeta_image, _ = self.get_vbmeta_image(vbmeta_image_path)
1311    aftl_descriptor = self.get_aftl_descriptor(vbmeta_image_path)
1312    if not aftl_descriptor or not vbmeta_image:
1313      return False
1314    verified = aftl_descriptor.verify_vbmeta_image(vbmeta_image,
1315                                                   transparency_log_pub_keys)
1316    if not verified:
1317      sys.stderr.write('The inclusion proofs for the image do not validate.\n')
1318      return False
1319    sys.stdout.write('The inclusion proofs for the image successfully '
1320                     'validate.\n')
1321    return True
1322
1323  def request_inclusion_proof(self, transparency_log, vbmeta_descriptor,
1324                              version_inc, manufacturer_key_path,
1325                              signing_helper, signing_helper_with_files,
1326                              timeout, aftl_comms=None):
1327    """Packages and sends a request to the specified transparency log.
1328
1329    Arguments:
1330      transparency_log: String containing the URL of a transparency log server.
1331      vbmeta_descriptor: A bytearray with the vbmeta descriptor.
1332      version_inc: Subcomponent of the build fingerprint.
1333      manufacturer_key_path: Path to key used to sign messages sent to the
1334        transparency log servers.
1335      signing_helper: Program which signs a hash and returns a signature.
1336      signing_helper_with_files: Same as signing_helper but uses files instead.
1337      timeout: Duration in seconds before requests to the transparency log
1338        timeout.
1339      aftl_comms: A subclass of the AftlCommunication class. The default is
1340        to use AftlGrpcCommunication.
1341
1342    Returns:
1343      An AftlIcpEntry with the inclusion proof for the log entry.
1344
1345    Raises:
1346      AftlError: If grpc or the proto modules cannot be loaded, if there is an
1347         error communicating with the log, if the manufacturer_key_path
1348         cannot be decoded, or if the log submission cannot be signed.
1349    """
1350    # Calculate the hash of the vbmeta image.
1351    vbmeta_hash = hashlib.sha256(vbmeta_descriptor).digest()
1352
1353    # Extract the key data from the PEM file if of size 4096.
1354    manufacturer_key = avbtool.RSAPublicKey(manufacturer_key_path)
1355    if manufacturer_key.num_bits != 4096:
1356      raise AftlError('Manufacturer keys not of size 4096: {}'.format(
1357          manufacturer_key.num_bits))
1358    manufacturer_key_data = rsa_key_read_pem_bytes(manufacturer_key_path)
1359
1360    # Calculate the hash of the manufacturer key data.
1361    m_key_hash = hashlib.sha256(manufacturer_key_data).digest()
1362
1363    # Create an AddFirmwareInfoRequest protobuf for transmission to AFTL.
1364    fw_info = aftl_pb2.FirmwareInfo(vbmeta_hash=vbmeta_hash,
1365                                    version_incremental=version_inc,
1366                                    manufacturer_key_hash=m_key_hash)
1367    signed_fw_info = bytearray()
1368    # AFTL supports SHA256_RSA4096 for now, more will be available.
1369    algorithm_name = 'SHA256_RSA4096'
1370    sig_num_bytes = 0
1371    alg_padding = ''
1372    try:
1373      alg = avbtool.ALGORITHMS[algorithm_name]
1374      sig_num_bytes = alg.signature_num_bytes
1375      alg_padding = alg.padding
1376    except KeyError:
1377      raise AftlError('Unknown algorithm with name {}'.format(algorithm_name))
1378
1379    fw_info_hash = hashlib.sha256(fw_info.SerializeToString()).digest()
1380    padding_and_hash = str(bytearray(alg_padding)) + fw_info_hash
1381    try:
1382      signed_fw_info = avbtool.raw_sign(signing_helper,
1383                                        signing_helper_with_files,
1384                                        algorithm_name,
1385                                        sig_num_bytes,
1386                                        manufacturer_key_path,
1387                                        padding_and_hash)
1388    except avbtool.AvbError as e:
1389      raise AftlError('Failed to sign FirmwareInfo with '
1390                      '--manufacturer_key: {}'.format(e))
1391    fw_info_sig = sigpb.sigpb_pb2.DigitallySigned(
1392        hash_algorithm='SHA256',
1393        signature_algorithm='RSA',
1394        signature=str(signed_fw_info))
1395
1396    sfw_info = aftl_pb2.SignedFirmwareInfo(info=fw_info,
1397                                           info_signature=fw_info_sig)
1398    request = api_pb2.AddFirmwareInfoRequest(vbmeta=bytes(
1399        str(vbmeta_descriptor)), fw_info=sfw_info)
1400
1401    # Submit signed FirmwareInfo to the server.
1402    if not aftl_comms:
1403      aftl_comms = AftlGrpcCommunication(transparency_log, timeout)
1404    response = aftl_comms.add_firmware_info(request)
1405
1406    # Return an AftlIcpEntry representing this response.
1407    icp_entry = AftlIcpEntry()
1408    icp_entry.translate_response(transparency_log, response)
1409    return icp_entry
1410
1411  def make_icp_from_vbmeta(self, vbmeta_image_path, output,
1412                           signing_helper, signing_helper_with_files,
1413                           version_incremental, transparency_log_servers,
1414                           transparency_log_pub_keys, manufacturer_key,
1415                           padding_size, timeout):
1416    """Generates a vbmeta image with inclusion proof given a vbmeta image.
1417
1418    The descriptor (struct AftlDescriptor) contains the information required to
1419    validate an inclusion proof for a specific vbmeta image. It consists
1420    of a header (struct AftlIcpHeader) and zero or more entry structures
1421    (struct AftlIcpEntry) that contain the vbmeta leaf hash, tree size,
1422    root hash, inclusion proof hashes, and the signature for the root hash.
1423
1424    The vbmeta image, its hash, the version_incremental part of the build
1425    fingerprint, and the hash of the manufacturer key are sent to the
1426    transparency log, with the message signed by the manufacturer key.
1427    An inclusion proof is calculated and returned. This inclusion proof is
1428    then packaged in a AftlDescriptor structure. The existing vbmeta data is
1429    copied to a new file, appended with the AftlDescriptor data, and written to
1430    output. Validation of the inclusion proof does not require
1431    communication with the transparency log.
1432
1433    Arguments:
1434      vbmeta_image_path: Path to a vbmeta image file.
1435      output: File to write the results to.
1436      signing_helper: Program which signs a hash and returns a signature.
1437      signing_helper_with_files: Same as signing_helper but uses files instead.
1438      version_incremental: A string representing the subcomponent of the
1439        build fingerprint used to identify the vbmeta in the transparency log.
1440      transparency_log_servers: List of strings containing URLs of transparency
1441        log servers where inclusion proofs are requested from.
1442      transparency_log_pub_keys: List of paths to PEM files containing trusted
1443        public keys that correspond with the transparency_logs. There must be
1444        the same number of keys as log servers and they must be in the same
1445        order, that is, transparency_log_pub_keys[n] corresponds to
1446        transparency_log_servers[n].
1447      manufacturer_key: Path to PEM file containting the key file used to sign
1448        messages sent to the transparency log servers.
1449      padding_size: If not 0, pads output so size is a multiple of the number.
1450      timeout: Duration in seconds before requests to the AFTL times out. A
1451        value of 0 or None means there will be no timeout.
1452
1453    Returns:
1454      True if the inclusion proofs could be fetched from the transparency log
1455      servers and could be successfully validated; otherwise False.
1456    """
1457    # Validates command line parameters.
1458    if len(transparency_log_servers) != len(transparency_log_pub_keys):
1459      sys.stderr.write('Transparency log count and public key count mismatch: '
1460                       '{} servers and {} public keys.\n'.format(
1461                           len(transparency_log_servers),
1462                           len(transparency_log_pub_keys)))
1463      return False
1464    transparency_logs = zip(transparency_log_servers, transparency_log_pub_keys)
1465
1466    # Retrieves vbmeta structure from given partition image.
1467    vbmeta_image, footer = self.get_vbmeta_image(vbmeta_image_path)
1468
1469    # Fetches inclusion proofs for vbmeta structure from all transparency logs.
1470    aftl_descriptor = AftlDescriptor()
1471    for host, pub_key in transparency_logs:
1472      try:
1473        icp_entry = self.request_inclusion_proof(host, vbmeta_image,
1474                                                 version_incremental,
1475                                                 manufacturer_key,
1476                                                 signing_helper,
1477                                                 signing_helper_with_files,
1478                                                 timeout)
1479        if not icp_entry.verify_vbmeta_image(vbmeta_image, pub_key):
1480          sys.stderr.write('The inclusion proof from {} could not be verified.'
1481                           '\n'.format(host))
1482        aftl_descriptor.add_icp_entry(icp_entry)
1483      except AftlError as e:
1484        # The inclusion proof request failed. Continue and see if others will.
1485        sys.stderr.write('Requesting inclusion proof failed: {}.\n'.format(e))
1486        continue
1487
1488    # Checks that the resulting AftlDescriptor is sane.
1489    if aftl_descriptor.icp_header.icp_count != len(transparency_log_pub_keys):
1490      sys.stderr.write('Valid inclusion proofs could only be retrieved from {} '
1491                       'out of {} transparency logs.\n'
1492                       .format(aftl_descriptor.icp_header.icp_count,
1493                               len(transparency_log_pub_keys)))
1494      return False
1495    if not aftl_descriptor.is_valid():
1496      sys.stderr.write('Resulting AftlDescriptor structure is malformed.\n')
1497      return False
1498    if not aftl_descriptor.verify_vbmeta_image(vbmeta_image,
1499                                               transparency_log_pub_keys):
1500      sys.stderr.write('Resulting AftlDescriptor inclusion proofs do not '
1501                       'validate.\n')
1502      return False
1503
1504    # Write the original vbmeta descriptor, followed by the AftlDescriptor.
1505    if footer:  # Checks if it is a chained partition.
1506      # TODO(b/147217370): Determine the best way to handle chained partitions
1507      # like the system.img. Currently, we only put the main vbmeta.img in the
1508      # transparency log.
1509      sys.stderr.write('Image has a footer and ICP for this format is not '
1510                       'implemented.\n')
1511      return False
1512
1513    # Writes vbmeta image with inclusion proof into a new vbmeta image.
1514    output.seek(0)
1515    output.write(vbmeta_image)
1516    encoded_aftl_descriptor = aftl_descriptor.encode()
1517    output.write(encoded_aftl_descriptor)
1518
1519    if padding_size > 0:
1520      descriptor_size = len(vbmeta_image) + len(encoded_aftl_descriptor)
1521      padded_size = avbtool.round_to_multiple(descriptor_size, padding_size)
1522      padding_needed = padded_size - descriptor_size
1523      output.write('\0' * padding_needed)
1524
1525    sys.stdout.write('VBMeta image with AFTL inclusion proofs successfully '
1526                     'created.\n')
1527    return True
1528
1529  def _load_test_process_function(self, vbmeta_image_path,
1530                                  transparency_log_server,
1531                                  transparency_log_pub_key, manufacturer_key,
1532                                  process_number, submission_count,
1533                                  preserve_icp_images, timeout, result_queue):
1534    """Function to be used by multiprocessing.Process.
1535
1536    Arguments:
1537      vbmeta_image_path: Path to a vbmeta image file.
1538      transparency_log_server: A string in host:port format of transparency log
1539        servers where a inclusion proof is requested from.
1540      transparency_log_pub_key: Path to PEM file containing trusted
1541        public keys that corresponds with the transparency_log_server.
1542      manufacturer_key: Path to PEM file containting the key file used to sign
1543        messages sent to the transparency log servers.
1544      process_number: The number of the processes executing the function.
1545      submission_count: Number of total submissions to perform per
1546        process_count.
1547      preserve_icp_images: Boolean to indicate if the generated vbmeta
1548        image files with inclusion proofs should preserved.
1549      timeout: Duration in seconds before requests to the AFTL times out. A
1550        value of 0 or None means there will be no timeout.
1551      result_queue: Multiprocessing.Queue object for posting execution results.
1552    """
1553    for count in range(0, submission_count):
1554      version_incremental = 'aftl_load_testing_{}_{}'.format(process_number,
1555                                                             count)
1556      output_file = '{}_icp.img'.format(version_incremental)
1557      output = open(output_file, 'wb')
1558
1559      # Instrumented section.
1560      start_time = time.time()
1561      result = self.make_icp_from_vbmeta(
1562          vbmeta_image_path=vbmeta_image_path,
1563          output=output,
1564          signing_helper=None,
1565          signing_helper_with_files=None,
1566          version_incremental=version_incremental,
1567          transparency_log_servers=[transparency_log_server],
1568          transparency_log_pub_keys=[transparency_log_pub_key],
1569          manufacturer_key=manufacturer_key,
1570          padding_size=0,
1571          timeout=timeout)
1572      end_time = time.time()
1573
1574      output.close()
1575      if not preserve_icp_images:
1576        os.unlink(output_file)
1577
1578      # Puts the result onto the result queue.
1579      execution_time = end_time - start_time
1580      result_queue.put((start_time, end_time, execution_time,
1581                        version_incremental, result))
1582
1583  def load_test_aftl(self, vbmeta_image_path, output, transparency_log_server,
1584                     transparency_log_pub_key, manufacturer_key,
1585                     process_count, submission_count, stats_filename,
1586                     preserve_icp_images, timeout):
1587    """Performs multi-threaded load test on a given AFTL and prints stats.
1588
1589    Arguments:
1590      vbmeta_image_path: Path to a vbmeta image file.
1591      output: File to write the report to.
1592      transparency_log_server: A string in host:port format of transparency log
1593        servers where a inclusion proof is requested from.
1594      transparency_log_pub_key: Path to PEM file containing trusted
1595        public keys that corresponds with the transparency_log_server.
1596      manufacturer_key: Path to PEM file containting the key file used to sign
1597        messages sent to the transparency log servers.
1598      process_count: Number of processes used for parallel testing.
1599      submission_count: Number of total submissions to perform per
1600        process_count.
1601      stats_filename: Path to the stats file to write the raw execution data to.
1602      preserve_icp_images: Boolean to indicate if the generated vbmeta
1603        image files with inclusion proofs should preserved.
1604      timeout: Duration in seconds before requests to the AFTL times out. A
1605        value of 0 or None means there will be no timeout.
1606
1607    Returns:
1608      True if the load tested succeeded without errors; otherwise False.
1609    """
1610    if process_count < 1 or submission_count < 1:
1611      sys.stderr.write('Values for --processes/--submissions '
1612                       'must be at least 1.\n')
1613      return False
1614
1615    if not stats_filename:
1616      stats_filename = 'load_test_p{}_s{}.csv'.format(process_count,
1617                                                      submission_count)
1618    try:
1619      stats_file = open(stats_filename, 'w')
1620      stats_file.write('start_time,end_time,execution_time,version_incremental,'
1621                       'result\n')
1622    except IOError as e:
1623      sys.stderr.write('Could not open stats file {}: {}.\n'
1624                       .format(stats_file, e))
1625      return False
1626
1627    # Launch all the processes with their workloads.
1628    result_queue = multiprocessing.Queue()
1629    processes = set()
1630    execution_times = []
1631    results = []
1632    for i in range(0, process_count):
1633      p = multiprocessing.Process(
1634          target=self._load_test_process_function,
1635          args=(vbmeta_image_path, transparency_log_server,
1636                transparency_log_pub_key, manufacturer_key, i, submission_count,
1637                preserve_icp_images, timeout, result_queue))
1638      p.start()
1639      processes.add(p)
1640
1641    while processes:
1642      # Processes the results queue and writes these to a stats file.
1643      try:
1644        (start_time, end_time, execution_time, version_incremental,
1645         result) = result_queue.get(timeout=1)
1646        stats_file.write('{},{},{},{},{}\n'.format(start_time, end_time,
1647                                                   execution_time,
1648                                                   version_incremental, result))
1649        execution_times.append(execution_time)
1650        results.append(result)
1651      except Queue.Empty:
1652        pass
1653
1654      # Checks if processes are still alive; if not clean them up. join() would
1655      # have been nicer but we want to continously stream out the stats to file.
1656      for p in processes.copy():
1657        if not p.is_alive():
1658          processes.remove(p)
1659
1660    # Prepares stats.
1661    executions = sorted(execution_times)
1662    execution_count = len(execution_times)
1663    median = 0
1664    if execution_count % 2 == 0:
1665      median = (executions[execution_count // 2 - 1]
1666                + executions[execution_count // 2]) / 2
1667    else:
1668      median = executions[execution_count // 2]
1669
1670    # Outputs the stats report.
1671    o = output
1672    o.write('Load testing results:\n')
1673    o.write('  Processes:               {}\n'.format(process_count))
1674    o.write('  Submissions per process: {}\n'.format(submission_count))
1675    o.write('\n')
1676    o.write('  Submissions:\n')
1677    o.write('    Total:                 {}\n'.format(len(executions)))
1678    o.write('    Succeeded:             {}\n'.format(results.count(True)))
1679    o.write('    Failed:                {}\n'.format(results.count(False)))
1680    o.write('\n')
1681    o.write('  Submission execution durations:\n')
1682    o.write('    Average:               {:.2f} sec\n'.format(
1683        sum(execution_times) / execution_count))
1684    o.write('    Median:                {:.2f} sec\n'.format(median))
1685    o.write('    Min:                   {:.2f} sec\n'.format(min(executions)))
1686    o.write('    Max:                   {:.2f} sec\n'.format(max(executions)))
1687
1688    # Close the stats file.
1689    stats_file.close()
1690    if results.count(False):
1691      return False
1692    return True
1693
1694
1695class AftlTool(avbtool.AvbTool):
1696  """Object for aftltool command-line tool."""
1697
1698  def __init__(self):
1699    """Initializer method."""
1700    self.aftl = Aftl()
1701    super(AftlTool, self).__init__()
1702
1703  def make_icp_from_vbmeta(self, args):
1704    """Implements the 'make_icp_from_vbmeta' sub-command."""
1705    args = self._fixup_common_args(args)
1706    return self.aftl.make_icp_from_vbmeta(args.vbmeta_image_path,
1707                                          args.output,
1708                                          args.signing_helper,
1709                                          args.signing_helper_with_files,
1710                                          args.version_incremental,
1711                                          args.transparency_log_servers,
1712                                          args.transparency_log_pub_keys,
1713                                          args.manufacturer_key,
1714                                          args.padding_size,
1715                                          args.timeout)
1716
1717  def info_image_icp(self, args):
1718    """Implements the 'info_image_icp' sub-command."""
1719    return self.aftl.info_image_icp(args.vbmeta_image_path.name, args.output)
1720
1721  def verify_image_icp(self, args):
1722    """Implements the 'verify_image_icp' sub-command."""
1723    return self.aftl.verify_image_icp(args.vbmeta_image_path.name,
1724                                      args.transparency_log_pub_keys,
1725                                      args.output)
1726
1727  def load_test_aftl(self, args):
1728    """Implements the 'load_test_aftl' sub-command."""
1729    return self.aftl.load_test_aftl(args.vbmeta_image_path,
1730                                    args.output,
1731                                    args.transparency_log_server,
1732                                    args.transparency_log_pub_key,
1733                                    args.manufacturer_key,
1734                                    args.processes,
1735                                    args.submissions,
1736                                    args.stats_file,
1737                                    args.preserve_icp_images,
1738                                    args.timeout)
1739
1740  def run(self, argv):
1741    """Command-line processor.
1742
1743    Arguments:
1744      argv: Pass sys.argv from main.
1745    """
1746    parser = argparse.ArgumentParser()
1747    subparsers = parser.add_subparsers(title='subcommands')
1748
1749    # Command: make_icp_from_vbmeta
1750    sub_parser = subparsers.add_parser('make_icp_from_vbmeta',
1751                                       help='Makes an ICP enhanced vbmeta image'
1752                                       ' from an existing vbmeta image.')
1753    sub_parser.add_argument('--output',
1754                            help='Output file name.',
1755                            type=argparse.FileType('wb'),
1756                            default=sys.stdout)
1757    sub_parser.add_argument('--vbmeta_image_path',
1758                            help='Path to a generate vbmeta image file.',
1759                            required=True)
1760    sub_parser.add_argument('--version_incremental',
1761                            help='Current build ID.',
1762                            required=True)
1763    sub_parser.add_argument('--manufacturer_key',
1764                            help='Path to the PEM file containing the '
1765                            'manufacturer key for use with the log.',
1766                            required=True)
1767    sub_parser.add_argument('--transparency_log_servers',
1768                            help='List of transparency log servers in '
1769                            'host:port format. This must not be None and must '
1770                            'be the same size as transparency_log_pub_keys. '
1771                            'Also, transparency_log_servers[n] must correspond '
1772                            'to transparency_log_pub_keys[n] for all values n.',
1773                            nargs='*',
1774                            required=True)
1775    sub_parser.add_argument('--transparency_log_pub_keys',
1776                            help='Paths to PEM files containing transparency '
1777                            'log server key(s). This must not be None and must '
1778                            'be the same size as transparency_log_servers. '
1779                            'Also, transparency_log_pub_keys[n] must '
1780                            'correspond to transparency_log_servers[n] for all '
1781                            'values n.',
1782                            nargs='*',
1783                            required=True)
1784    sub_parser.add_argument('--padding_size',
1785                            metavar='NUMBER',
1786                            help='If non-zero, pads output with NUL bytes so '
1787                            'its size is a multiple of NUMBER (default: 0)',
1788                            type=avbtool.parse_number,
1789                            default=0)
1790    sub_parser.add_argument('--timeout',
1791                            metavar='SECONDS',
1792                            help='Timeout in seconds for transparency log '
1793                            'requests (default: 600 sec). A value of 0 means '
1794                            'no timeout.',
1795                            type=avbtool.parse_number,
1796                            default=600)
1797    self._add_common_args(sub_parser)
1798    sub_parser.set_defaults(func=self.make_icp_from_vbmeta)
1799
1800    # Command: info_image_icp
1801    sub_parser = subparsers.add_parser(
1802        'info_image_icp',
1803        help='Show information about AFTL ICPs in vbmeta or footer.')
1804    sub_parser.add_argument('--vbmeta_image_path',
1805                            help='Path to vbmeta image for AFTL information.',
1806                            type=argparse.FileType('rb'),
1807                            required=True)
1808    sub_parser.add_argument('--output',
1809                            help='Write info to file',
1810                            type=argparse.FileType('wt'),
1811                            default=sys.stdout)
1812    sub_parser.set_defaults(func=self.info_image_icp)
1813
1814    # Arguments for verify_image_icp.
1815    sub_parser = subparsers.add_parser(
1816        'verify_image_icp',
1817        help='Verify AFTL ICPs in vbmeta or footer.')
1818
1819    sub_parser.add_argument('--vbmeta_image_path',
1820                            help='Image to verify the inclusion proofs.',
1821                            type=argparse.FileType('rb'),
1822                            required=True)
1823    sub_parser.add_argument('--transparency_log_pub_keys',
1824                            help='Paths to PEM files containing transparency '
1825                            'log server key(s). This must not be None.',
1826                            nargs='*',
1827                            required=True)
1828    sub_parser.add_argument('--output',
1829                            help='Write info to file',
1830                            type=argparse.FileType('wt'),
1831                            default=sys.stdout)
1832    sub_parser.set_defaults(func=self.verify_image_icp)
1833
1834    # Command: load_test_aftl
1835    sub_parser = subparsers.add_parser(
1836        'load_test_aftl',
1837        help='Perform load testing against one AFTL log server. Note: This MUST'
1838        ' not be performed against a production system.')
1839    sub_parser.add_argument('--vbmeta_image_path',
1840                            help='Path to a generate vbmeta image file.',
1841                            required=True)
1842    sub_parser.add_argument('--output',
1843                            help='Write report to file.',
1844                            type=argparse.FileType('wt'),
1845                            default=sys.stdout)
1846    sub_parser.add_argument('--manufacturer_key',
1847                            help='Path to the PEM file containing the '
1848                            'manufacturer key for use with the log.',
1849                            required=True)
1850    sub_parser.add_argument('--transparency_log_server',
1851                            help='Transparency log servers to test against in '
1852                            'host:port format.',
1853                            required=True)
1854    sub_parser.add_argument('--transparency_log_pub_key',
1855                            help='Paths to PEM file containing transparency '
1856                            'log server key.',
1857                            required=True)
1858    sub_parser.add_argument('--processes',
1859                            help='Number of parallel processes to use for '
1860                            'testing (default: 1).',
1861                            type=avbtool.parse_number,
1862                            default=1)
1863    sub_parser.add_argument('--submissions',
1864                            help='Number of submissions to perform against the '
1865                            'log per process (default: 1).',
1866                            type=avbtool.parse_number,
1867                            default=1)
1868    sub_parser.add_argument('--stats_file',
1869                            help='Path to the stats file to write the raw '
1870                            'execution data to (Default: '
1871                            'load_test_p[processes]_s[submissions].csv.')
1872    sub_parser.add_argument('--preserve_icp_images',
1873                            help='Boolean flag to indicate if the generated '
1874                            'vbmeta image files with inclusion proofs should '
1875                            'preserved.',
1876                            action='store_true')
1877    sub_parser.add_argument('--timeout',
1878                            metavar='SECONDS',
1879                            help='Timeout in seconds for transparency log '
1880                            'requests (default: 0). A value of 0 means '
1881                            'no timeout.',
1882                            type=avbtool.parse_number,
1883                            default=0)
1884    sub_parser.set_defaults(func=self.load_test_aftl)
1885
1886    args = parser.parse_args(argv[1:])
1887    try:
1888      success = args.func(args)
1889      if not success:
1890        # Signals to calling tools that the command has failed.
1891        sys.exit(1)
1892    except AftlError as e:
1893      # Signals to calling tools that an unhandled exeception occured.
1894      sys.stderr.write('Unhandled AftlError occured: {}\n'.format(e))
1895      sys.exit(2)
1896
1897if __name__ == '__main__':
1898  tool = AftlTool()
1899  tool.run(sys.argv)
1900