• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2
3# Copyright 2020, The Android Open Source Project
4#
5# Permission is hereby granted, free of charge, to any person
6# obtaining a copy of this software and associated documentation
7# files (the "Software"), to deal in the Software without
8# restriction, including without limitation the rights to use, copy,
9# modify, merge, publish, distribute, sublicense, and/or sell copies
10# of the Software, and to permit persons to whom the Software is
11# furnished to do so, subject to the following conditions:
12#
13# The above copyright notice and this permission notice shall be
14# included in all copies or substantial portions of the Software.
15#
16# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
20# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
21# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
22# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23# SOFTWARE.
24#
25"""Command-line tool for AFTL support for Android Verified Boot images."""
26
27import abc
28import argparse
29import enum
30import hashlib
31import io
32import multiprocessing
33import os
34import queue
35import struct
36import subprocess
37import sys
38import tempfile
39import time
40
41# This is to work around temporarily with the issue that python3 does not permit
42# relative imports anymore going forward. This adds the proto directory relative
43# to the location of aftltool to the sys.path.
44# TODO(b/154068467): Implement proper importing of generated *_pb2 modules.
45EXEC_PATH = os.path.dirname(os.path.realpath(__file__))
46sys.path.append(os.path.join(EXEC_PATH, 'proto'))
47
48# pylint: disable=wrong-import-position,import-error
49import avbtool
50import api_pb2
51# pylint: enable=wrong-import-position,import-error
52
53
54class AftlError(Exception):
55  """Application-specific errors.
56
57  These errors represent issues for which a stack-trace should not be
58  presented.
59
60  Attributes:
61    message: Error message.
62  """
63
64  def __init__(self, message):
65    Exception.__init__(self, message)
66
67
68def rsa_key_read_pem_bytes(key_path):
69  """Reads the bytes out of the passed in PEM file.
70
71  Arguments:
72    key_path: A string containing the path to the PEM file.
73
74  Returns:
75    A bytearray containing the DER encoded bytes in the PEM file.
76
77  Raises:
78    AftlError: If openssl cannot decode the PEM file.
79  """
80  # Use openssl to decode the PEM file.
81  args = ['openssl', 'rsa', '-in', key_path, '-pubout', '-outform', 'DER']
82  p = subprocess.Popen(args,
83                       stdin=subprocess.PIPE,
84                       stdout=subprocess.PIPE,
85                       stderr=subprocess.PIPE)
86  (pout, perr) = p.communicate()
87  retcode = p.wait()
88  if retcode != 0:
89    raise AftlError('Error decoding: {}'.format(perr))
90  return pout
91
92
93def check_signature(log_root, log_root_sig,
94                    transparency_log_pub_key):
95  """Validates the signature provided by the transparency log.
96
97  Arguments:
98    log_root: The transparency log_root data structure.
99    log_root_sig: The signature of the transparency log_root data structure.
100    transparency_log_pub_key: The file path to the transparency log public key.
101
102  Returns:
103    True if the signature check passes, otherwise False.
104  """
105
106  logsig_tmp = tempfile.NamedTemporaryFile()
107  logsig_tmp.write(log_root_sig)
108  logsig_tmp.flush()
109  logroot_tmp = tempfile.NamedTemporaryFile()
110  logroot_tmp.write(log_root)
111  logroot_tmp.flush()
112
113  p = subprocess.Popen(['openssl', 'dgst', '-sha256', '-verify',
114                        transparency_log_pub_key,
115                        '-signature', logsig_tmp.name, logroot_tmp.name],
116                       stdin=subprocess.PIPE,
117                       stdout=subprocess.PIPE,
118                       stderr=subprocess.PIPE)
119
120  p.communicate()
121  retcode = p.wait()
122  if not retcode:
123    return True
124  return False
125
126
127# AFTL Merkle Tree Functionality
128def rfc6962_hash_leaf(leaf):
129  """RFC6962 hashing function for hashing leaves of a Merkle tree.
130
131  Arguments:
132    leaf: A bytearray containing the Merkle tree leaf to be hashed.
133
134  Returns:
135    A bytearray containing the RFC6962 SHA256 hash of the leaf.
136  """
137  hasher = hashlib.sha256()
138  # RFC6962 states a '0' byte should be prepended to the data.
139  # This is done in conjunction with the '1' byte for non-leaf
140  # nodes for 2nd preimage attack resistance.
141  hasher.update(b'\x00')
142  hasher.update(leaf)
143  return hasher.digest()
144
145
146def rfc6962_hash_children(l, r):
147  """Calculates the inner Merkle tree node hash of child nodes l and r.
148
149  Arguments:
150    l: A bytearray containing the left child node to be hashed.
151    r: A bytearray containing the right child node to be hashed.
152
153  Returns:
154    A bytearray containing the RFC6962 SHA256 hash of 1|l|r.
155  """
156  hasher = hashlib.sha256()
157  # RFC6962 states a '1' byte should be prepended to the concatenated data.
158  # This is done in conjunction with the '0' byte for leaf
159  # nodes for 2nd preimage attack resistance.
160  hasher.update(b'\x01')
161  hasher.update(l)
162  hasher.update(r)
163  return hasher.digest()
164
165
166def chain_border_right(seed, proof):
167  """Computes a subtree hash along the left-side tree border.
168
169  Arguments:
170    seed: A bytearray containing the starting hash.
171    proof: A list of bytearrays representing the hashes in the inclusion proof.
172
173  Returns:
174    A bytearray containing the left-side subtree hash.
175  """
176  for h in proof:
177    seed = rfc6962_hash_children(h, seed)
178  return seed
179
180
181def chain_inner(seed, proof, leaf_index):
182  """Computes a subtree hash on or below the tree's right border.
183
184  Arguments:
185    seed: A bytearray containing the starting hash.
186    proof: A list of bytearrays representing the hashes in the inclusion proof.
187    leaf_index: The current leaf index.
188
189  Returns:
190    A bytearray containing the subtree hash.
191  """
192  for i, h in enumerate(proof):
193    if leaf_index >> i & 1 == 0:
194      seed = rfc6962_hash_children(seed, h)
195    else:
196      seed = rfc6962_hash_children(h, seed)
197  return seed
198
199
200def root_from_icp(leaf_index, tree_size, proof, leaf_hash):
201  """Calculates the expected Merkle tree root hash.
202
203  Arguments:
204    leaf_index: The current leaf index.
205    tree_size: The number of nodes in the Merkle tree.
206    proof: A list of bytearrays containing the inclusion proof.
207    leaf_hash: A bytearray containing the initial leaf hash.
208
209  Returns:
210    A bytearray containing the calculated Merkle tree root hash.
211
212  Raises:
213    AftlError: If invalid parameters are passed in.
214  """
215  if leaf_index < 0:
216    raise AftlError('Invalid leaf_index value: {}'.format(leaf_index))
217  if tree_size < 0:
218    raise AftlError('Invalid tree_size value: {}'.format(tree_size))
219  if leaf_index >= tree_size:
220    err_str = 'leaf_index cannot be equal or larger than tree_size: {}, {}'
221    raise AftlError(err_str.format(leaf_index, tree_size))
222  if proof is None:
223    raise AftlError('Inclusion proof not provided.')
224  if leaf_hash is None:
225    raise AftlError('No leaf hash provided.')
226  # Calculate the point to split the proof into two parts.
227  # The split is where the paths to leaves diverge.
228  inner = (leaf_index ^ (tree_size - 1)).bit_length()
229  result = chain_inner(leaf_hash, proof[:inner], leaf_index)
230  result = chain_border_right(result, proof[inner:])
231  return result
232
233
234class AftlImageHeader(object):
235  """A class for representing the AFTL image header.
236
237  Attributes:
238    magic: Magic for identifying the AftlImage.
239    required_icp_version_major: The major version of AVB that wrote the entry.
240    required_icp_version_minor: The minor version of AVB that wrote the entry.
241    aftl_image_size: Total size of the AftlImage.
242    icp_count: Number of inclusion proofs represented in this structure.
243  """
244
245  SIZE = 18  # The size of the structure, in bytes
246  MAGIC = b'AFTL'
247  FORMAT_STRING = ('!4s2L'  # magic, major & minor version.
248                   'L'      # AFTL image size.
249                   'H')     # number of inclusion proof entries.
250
251  def __init__(self, data=None):
252    """Initializes a new AftlImageHeader object.
253
254    Arguments:
255      data: If not None, must be a bytearray of size |SIZE|.
256
257    Raises:
258      AftlError: If invalid structure for AftlImageHeader.
259    """
260    assert struct.calcsize(self.FORMAT_STRING) == self.SIZE
261
262    if data:
263      (self.magic, self.required_icp_version_major,
264       self.required_icp_version_minor, self.aftl_image_size,
265       self.icp_count) = struct.unpack(self.FORMAT_STRING, data)
266    else:
267      self.magic = self.MAGIC
268      self.required_icp_version_major = avbtool.AVB_VERSION_MAJOR
269      self.required_icp_version_minor = avbtool.AVB_VERSION_MINOR
270      self.aftl_image_size = self.SIZE
271      self.icp_count = 0
272    if not self.is_valid():
273      raise AftlError('Invalid structure for AftlImageHeader.')
274
275  def encode(self):
276    """Serializes the AftlImageHeader |SIZE| to bytes.
277
278    Returns:
279      The encoded AftlImageHeader as bytes.
280
281    Raises:
282      AftlError: If invalid structure for AftlImageHeader.
283    """
284    if not self.is_valid():
285      raise AftlError('Invalid structure for AftlImageHeader')
286    return struct.pack(self.FORMAT_STRING, self.magic,
287                       self.required_icp_version_major,
288                       self.required_icp_version_minor,
289                       self.aftl_image_size,
290                       self.icp_count)
291
292  def is_valid(self):
293    """Ensures that values in the AftlImageHeader are sane.
294
295    Returns:
296      True if the values in the AftlImageHeader are sane, False otherwise.
297    """
298    if self.magic != AftlImageHeader.MAGIC:
299      sys.stderr.write(
300          'AftlImageHeader: magic value mismatch: {}\n'
301          .format(repr(self.magic)))
302      return False
303
304    if self.required_icp_version_major > avbtool.AVB_VERSION_MAJOR:
305      sys.stderr.write('AftlImageHeader: major version mismatch: {}\n'.format(
306          self.required_icp_version_major))
307      return False
308
309    if self.required_icp_version_minor > avbtool.AVB_VERSION_MINOR:
310      sys.stderr.write('AftlImageHeader: minor version mismatch: {}\n'.format(
311          self.required_icp_version_minor))
312      return False
313
314    if self.aftl_image_size < self.SIZE:
315      sys.stderr.write('AftlImageHeader: Invalid AFTL image size: {}\n'.format(
316          self.aftl_image_size))
317      return False
318
319    if self.icp_count < 0 or self.icp_count > 65535:
320      sys.stderr.write(
321          'AftlImageHeader: ICP entry count out of range: {}\n'.format(
322              self.icp_count))
323      return False
324    return True
325
326  def print_desc(self, o):
327    """Print the AftlImageHeader.
328
329    Arguments:
330      o: The object to write the output to.
331    """
332    o.write('  AFTL image header:\n')
333    i = ' ' * 4
334    fmt = '{}{:25}{}\n'
335    o.write(fmt.format(i, 'Major version:', self.required_icp_version_major))
336    o.write(fmt.format(i, 'Minor version:', self.required_icp_version_minor))
337    o.write(fmt.format(i, 'Image size:', self.aftl_image_size))
338    o.write(fmt.format(i, 'ICP entries count:', self.icp_count))
339
340
341class AftlIcpEntry(object):
342  """A class for the transparency log inclusion proof entries.
343
344  The data that represents each of the components of the ICP entry are stored
345  immediately following the ICP entry header. The format is log_url,
346  SignedLogRoot, and inclusion proof hashes.
347
348  Attributes:
349    log_url_size: Length of the string representing the transparency log URL.
350    leaf_index: Leaf index in the transparency log representing this entry.
351    log_root_descriptor_size: Size of the transparency log's SignedLogRoot.
352    annotation_leaf_size: Size of the SignedVBMetaPrimaryAnnotationLeaf passed
353        to the log.
354    log_root_sig_size: Size in bytes of the log_root_signature
355    proof_hash_count: Number of hashes comprising the inclusion proof.
356    inc_proof_size: The total size of the inclusion proof, in bytes.
357    log_url: The URL for the transparency log that generated this inclusion
358        proof.
359    log_root_descriptor: The data comprising the signed tree head structure.
360    annotation_leaf: The data comprising the SignedVBMetaPrimaryAnnotationLeaf
361        leaf.
362    log_root_signature: The data comprising the log root signature.
363    proofs: The hashes comprising the inclusion proof.
364
365  """
366  SIZE = 27  # The size of the structure, in bytes
367  FORMAT_STRING = ('!L'   # transparency log server url size
368                   'Q'    # leaf index
369                   'L'    # log root descriptor size
370                   'L'    # firmware info leaf size
371                   'H'    # log root signature size
372                   'B'    # number of hashes in the inclusion proof
373                   'L')   # size of the inclusion proof in bytes
374  # This header is followed by the log_url, log_root_descriptor,
375  # annotation leaf, log root signature, and the proofs elements.
376
377  def __init__(self, data=None):
378    """Initializes a new ICP entry object.
379
380    Arguments:
381      data: If not None, must be a bytearray of size >= |SIZE|.
382
383    Raises:
384      AftlError: If data does not represent a well-formed AftlIcpEntry.
385    """
386    # Assert the header structure is of a sane size.
387    assert struct.calcsize(self.FORMAT_STRING) == self.SIZE
388
389    if data:
390      # Deserialize the header from the data.
391      (self._log_url_size_expected,
392       self.leaf_index,
393       self._log_root_descriptor_size_expected,
394       self._annotation_leaf_size_expected,
395       self._log_root_sig_size_expected,
396       self._proof_hash_count_expected,
397       self._inc_proof_size_expected) = struct.unpack(self.FORMAT_STRING,
398                                                      data[0:self.SIZE])
399
400      # Deserialize ICP entry components from the data.
401      expected_format_string = '{}s{}s{}s{}s{}s'.format(
402          self._log_url_size_expected,
403          self._log_root_descriptor_size_expected,
404          self._annotation_leaf_size_expected,
405          self._log_root_sig_size_expected,
406          self._inc_proof_size_expected)
407
408      (log_url, log_root_descriptor_bytes, annotation_leaf_bytes,
409       self.log_root_signature, proof_bytes) = struct.unpack(
410           expected_format_string, data[self.SIZE:self.get_expected_size()])
411
412      self.log_url = log_url.decode('ascii')
413      self.log_root_descriptor = TrillianLogRootDescriptor(
414          log_root_descriptor_bytes)
415
416      self.annotation_leaf = SignedVBMetaPrimaryAnnotationLeaf.parse(
417          annotation_leaf_bytes)
418
419      self.proofs = []
420      if self._proof_hash_count_expected > 0:
421        proof_idx = 0
422        hash_size = (self._inc_proof_size_expected
423                     // self._proof_hash_count_expected)
424        for _ in range(self._proof_hash_count_expected):
425          proof = proof_bytes[proof_idx:(proof_idx+hash_size)]
426          self.proofs.append(proof)
427          proof_idx += hash_size
428    else:
429      self.leaf_index = 0
430      self.log_url = ''
431      self.log_root_descriptor = TrillianLogRootDescriptor()
432      self.annotation_leaf = SignedVBMetaPrimaryAnnotationLeaf()
433      self.log_root_signature = b''
434      self.proofs = []
435    if not self.is_valid():
436      raise AftlError('Invalid structure for AftlIcpEntry')
437
438  @property
439  def log_url_size(self):
440    """Gets the size of the log_url attribute."""
441    if hasattr(self, 'log_url'):
442      return len(self.log_url)
443    return self._log_url_size_expected
444
445  @property
446  def log_root_descriptor_size(self):
447    """Gets the size of the log_root_descriptor attribute."""
448    if hasattr(self, 'log_root_descriptor'):
449      return self.log_root_descriptor.get_expected_size()
450    return self._log_root_descriptor_size_expected
451
452  @property
453  def annotation_leaf_size(self):
454    """Gets the size of the annotation_leaf attribute."""
455    if hasattr(self, 'annotation_leaf'):
456      return self.annotation_leaf.get_expected_size()
457    return self._annotation_leaf_size_expected
458
459  @property
460  def log_root_sig_size(self):
461    """Gets the size of the log_root signature."""
462    if hasattr(self, 'log_root_signature'):
463      return len(self.log_root_signature)
464    return self._log_root_sig_size_expected
465
466  @property
467  def proof_hash_count(self):
468    """Gets the number of proof hashes."""
469    if hasattr(self, 'proofs'):
470      return len(self.proofs)
471    return self._proof_hash_count_expected
472
473  @property
474  def inc_proof_size(self):
475    """Gets the total size of the proof hashes in bytes."""
476    if hasattr(self, 'proofs'):
477      result = 0
478      for proof in self.proofs:
479        result += len(proof)
480      return result
481    return self._inc_proof_size_expected
482
483  def verify_icp(self, transparency_log_pub_key):
484    """Verifies the contained inclusion proof given the public log key.
485
486    Arguments:
487      transparency_log_pub_key: The path to the trusted public key for the log.
488
489    Returns:
490      True if the calculated signature matches AftlIcpEntry's. False otherwise.
491    """
492    if not transparency_log_pub_key:
493      return False
494
495    leaf_hash = rfc6962_hash_leaf(self.annotation_leaf.encode())
496    calc_root = root_from_icp(self.leaf_index,
497                              self.log_root_descriptor.tree_size,
498                              self.proofs,
499                              leaf_hash)
500    if ((calc_root == self.log_root_descriptor.root_hash) and
501        check_signature(
502            self.log_root_descriptor.encode(),
503            self.log_root_signature,
504            transparency_log_pub_key)):
505      return True
506    return False
507
508  def verify_vbmeta_image(self, vbmeta_image, transparency_log_pub_key):
509    """Verify the inclusion proof for the given VBMeta image.
510
511    Arguments:
512      vbmeta_image: A bytearray with the VBMeta image.
513      transparency_log_pub_key: File path to the PEM file containing the trusted
514        transparency log public key.
515
516    Returns:
517      True if the inclusion proof validates and the vbmeta hash of the given
518      VBMeta image matches the one in the annotation leaf; otherwise False.
519    """
520    if not vbmeta_image:
521      return False
522
523    # Calculate the hash of the vbmeta image.
524    vbmeta_hash = hashlib.sha256(vbmeta_image).digest()
525
526    # Validates the inclusion proof and then compare the calculated vbmeta_hash
527    # against the one in the inclusion proof.
528    return (self.verify_icp(transparency_log_pub_key)
529            and self.annotation_leaf.annotation.vbmeta_hash == vbmeta_hash)
530
531  def encode(self):
532    """Serializes the header |SIZE| and data to bytes.
533
534    Returns:
535      bytes with the encoded header.
536
537    Raises:
538      AftlError: If invalid entry structure.
539    """
540    proof_bytes = bytearray()
541    if not self.is_valid():
542      raise AftlError('Invalid AftlIcpEntry structure')
543
544    expected_format_string = '{}{}s{}s{}s{}s{}s'.format(
545        self.FORMAT_STRING,
546        self.log_url_size,
547        self.log_root_descriptor_size,
548        self.annotation_leaf_size,
549        self.log_root_sig_size,
550        self.inc_proof_size)
551
552    for proof in self.proofs:
553      proof_bytes.extend(proof)
554
555    return struct.pack(expected_format_string,
556                       self.log_url_size, self.leaf_index,
557                       self.log_root_descriptor_size, self.annotation_leaf_size,
558                       self.log_root_sig_size, self.proof_hash_count,
559                       self.inc_proof_size, self.log_url.encode('ascii'),
560                       self.log_root_descriptor.encode(),
561                       self.annotation_leaf.encode(),
562                       self.log_root_signature,
563                       proof_bytes)
564
565  def translate_response(self, log_url, avbm_response):
566    """Translates an AddVBMetaResponse object to an AftlIcpEntry.
567
568    Arguments:
569      log_url: String representing the transparency log URL.
570      avbm_response: The AddVBMetaResponse object to translate.
571    """
572    self.log_url = log_url
573
574    # Deserializes from AddVBMetaResponse.
575    proof = avbm_response.annotation_proof
576    self.leaf_index = proof.proof.leaf_index
577    self.log_root_descriptor = TrillianLogRootDescriptor(proof.sth.log_root)
578    self.annotation_leaf = SignedVBMetaPrimaryAnnotationLeaf.parse(
579        avbm_response.annotation_leaf)
580    self.log_root_signature = proof.sth.log_root_signature
581    self.proofs = proof.proof.hashes
582
583  def get_expected_size(self):
584    """Gets the expected size of the full entry out of the header.
585
586    Returns:
587      The expected size of the AftlIcpEntry from the header.
588    """
589    return (self.SIZE + self.log_url_size + self.log_root_descriptor_size +
590            self.annotation_leaf_size + self.log_root_sig_size +
591            self.inc_proof_size)
592
593  def is_valid(self):
594    """Ensures that values in an AftlIcpEntry structure are sane.
595
596    Returns:
597      True if the values in the AftlIcpEntry are sane, False otherwise.
598    """
599    if self.leaf_index < 0:
600      sys.stderr.write('ICP entry: leaf index out of range: '
601                       '{}.\n'.format(self.leaf_index))
602      return False
603
604    if (not self.log_root_descriptor or
605        not isinstance(self.log_root_descriptor, TrillianLogRootDescriptor) or
606        not self.log_root_descriptor.is_valid()):
607      sys.stderr.write('ICP entry: invalid TrillianLogRootDescriptor.\n')
608      return False
609
610    if (not self.annotation_leaf or
611        not isinstance(self.annotation_leaf, Leaf)):
612      sys.stderr.write('ICP entry: invalid Leaf.\n')
613      return False
614    return True
615
616  def print_desc(self, o):
617    """Print the ICP entry.
618
619    Arguments:
620      o: The object to write the output to.
621    """
622    i = ' ' * 4
623    fmt = '{}{:25}{}\n'
624    o.write(fmt.format(i, 'Transparency Log:', self.log_url))
625    o.write(fmt.format(i, 'Leaf index:', self.leaf_index))
626    o.write('    ICP hashes:              ')
627    for i, proof_hash in enumerate(self.proofs):
628      if i != 0:
629        o.write(' ' * 29)
630      o.write('{}\n'.format(proof_hash.hex()))
631    self.log_root_descriptor.print_desc(o)
632    self.annotation_leaf.print_desc(o)
633
634
635class TrillianLogRootDescriptor(object):
636  """A class representing the Trillian log_root descriptor.
637
638  Taken from Trillian definitions:
639  https://github.com/google/trillian/blob/master/trillian.proto#L255
640
641  Attributes:
642    version: The version number of the descriptor. Currently only version=1 is
643        supported.
644    tree_size: The size of the tree.
645    root_hash_size: The size of the root hash in bytes. Valid values are between
646        0 and 128.
647    root_hash: The root hash as bytearray().
648    timestamp: The timestamp in nanoseconds.
649    revision: The revision number as long.
650    metadata_size: The size of the metadata in bytes. Valid values are between
651        0 and 65535.
652    metadata: The metadata as bytearray().
653  """
654  FORMAT_STRING_PART_1 = ('!H'  # version
655                          'Q'   # tree_size
656                          'B'   # root_hash_size
657                         )
658
659  FORMAT_STRING_PART_2 = ('!Q'  # timestamp
660                          'Q'   # revision
661                          'H'   # metadata_size
662                         )
663
664  def __init__(self, data=None):
665    """Initializes a new TrillianLogRoot descriptor."""
666    if data:
667      # Parses first part of the log_root descriptor.
668      data_length = struct.calcsize(self.FORMAT_STRING_PART_1)
669      (self.version, self.tree_size, self.root_hash_size) = struct.unpack(
670          self.FORMAT_STRING_PART_1, data[0:data_length])
671      data = data[data_length:]
672
673      # Parses the root_hash bytes if the size indicates existance.
674      if self.root_hash_size > 0:
675        self.root_hash = data[0:self.root_hash_size]
676        data = data[self.root_hash_size:]
677      else:
678        self.root_hash = b''
679
680      # Parses second part of the log_root descriptor.
681      data_length = struct.calcsize(self.FORMAT_STRING_PART_2)
682      (self.timestamp, self.revision, self.metadata_size) = struct.unpack(
683          self.FORMAT_STRING_PART_2, data[0:data_length])
684      data = data[data_length:]
685
686      # Parses the metadata if the size indicates existance.
687      if self.metadata_size > 0:
688        self.metadata = data[0:self.metadata_size]
689      else:
690        self.metadata = b''
691    else:
692      self.version = 1
693      self.tree_size = 0
694      self.root_hash_size = 0
695      self.root_hash = b''
696      self.timestamp = 0
697      self.revision = 0
698      self.metadata_size = 0
699      self.metadata = b''
700
701    if not self.is_valid():
702      raise AftlError('Invalid structure for TrillianLogRootDescriptor.')
703
704  def get_expected_size(self):
705    """Calculates the expected size of the TrillianLogRootDescriptor.
706
707    Returns:
708      The expected size of the TrillianLogRootDescriptor.
709    """
710    return (struct.calcsize(self.FORMAT_STRING_PART_1) + self.root_hash_size +
711            struct.calcsize(self.FORMAT_STRING_PART_2) + self.metadata_size)
712
713  def encode(self):
714    """Serializes the TrillianLogDescriptor to a bytearray().
715
716    Returns:
717      A bytearray() with the encoded header.
718
719    Raises:
720      AftlError: If invalid entry structure.
721    """
722    if not self.is_valid():
723      raise AftlError('Invalid structure for TrillianLogRootDescriptor.')
724
725    expected_format_string = '{}{}s{}{}s'.format(
726        self.FORMAT_STRING_PART_1,
727        self.root_hash_size,
728        self.FORMAT_STRING_PART_2[1:],
729        self.metadata_size)
730
731    return struct.pack(expected_format_string,
732                       self.version, self.tree_size, self.root_hash_size,
733                       self.root_hash, self.timestamp, self.revision,
734                       self.metadata_size, self.metadata)
735
736  def is_valid(self):
737    """Ensures that values in the descritor are sane.
738
739    Returns:
740      True if the values are sane; otherwise False.
741    """
742    cls = self.__class__.__name__
743    if self.version != 1:
744      sys.stderr.write('{}: Bad version value {}.\n'.format(cls, self.version))
745      return False
746    if self.tree_size < 0:
747      sys.stderr.write('{}: Bad tree_size value {}.\n'.format(cls,
748                                                              self.tree_size))
749      return False
750    if self.root_hash_size < 0 or self.root_hash_size > 128:
751      sys.stderr.write('{}: Bad root_hash_size value {}.\n'.format(
752          cls, self.root_hash_size))
753      return False
754    if len(self.root_hash) != self.root_hash_size:
755      sys.stderr.write('{}: root_hash_size {} does not match with length of '
756                       'root_hash {}.\n'.format(cls, self.root_hash_size,
757                                                len(self.root_hash)))
758      return False
759    if self.timestamp < 0:
760      sys.stderr.write('{}: Bad timestamp value {}.\n'.format(cls,
761                                                              self.timestamp))
762      return False
763    if self.revision < 0:
764      sys.stderr.write('{}: Bad revision value {}.\n'.format(cls,
765                                                             self.revision))
766      return False
767    if self.metadata_size < 0 or self.metadata_size > 65535:
768      sys.stderr.write('{}: Bad metadatasize value {}.\n'.format(
769          cls, self.metadata_size))
770      return False
771    if len(self.metadata) != self.metadata_size:
772      sys.stderr.write('{}: metadata_size {} does not match with length of'
773                       'metadata {}.\n'.format(cls, self.metadata_size,
774                                               len(self.metadata)))
775      return False
776    return True
777
778  def print_desc(self, o):
779    """Print the TrillianLogRootDescriptor.
780
781    Arguments:
782      o: The object to write the output to.
783    """
784    o.write('    Log Root Descriptor:\n')
785    i = ' ' * 6
786    fmt = '{}{:23}{}\n'
787    o.write(fmt.format(i, 'Version:', self.version))
788    o.write(fmt.format(i, 'Tree size:', self.tree_size))
789    o.write(fmt.format(i, 'Root hash size:', self.root_hash_size))
790    if self.root_hash_size > 0:
791      o.write(fmt.format(i, 'Root hash:', self.root_hash.hex()))
792      o.write(fmt.format(i, 'Timestamp (ns):', self.timestamp))
793    o.write(fmt.format(i, 'Revision:', self.revision))
794    o.write(fmt.format(i, 'Metadata size:', self.metadata_size))
795    if self.metadata_size > 0:
796      o.write(fmt.format(i, 'Metadata:', self.metadata.hex()))
797
798
799def tls_decode_bytes(byte_size, stream):
800  """Decodes a variable-length vector.
801
802  In the TLS presentation language, a variable-length vector is a pair
803  (size, value). |size| describes the size of the |value| to read
804  in bytes. All values are encoded in big-endian.
805  See https://tools.ietf.org/html/rfc8446#section-3 for more details.
806
807  Arguments:
808      byte_size: A format character as described in the struct module
809          which describes the expected length of the size. For
810          instance, "B", "H", "L" or "Q".
811      stream: a BytesIO which contains the value to decode.
812
813  Returns:
814    A bytes containing the value decoded.
815
816  Raises:
817    AftlError: If |byte_size| is not a known format character, or if not
818    enough data is available to decode the size or the value.
819  """
820  byte_size_format = "!" + byte_size
821  try:
822    byte_size_length = struct.calcsize(byte_size_format)
823  except struct.error:
824    raise AftlError("Invalid byte_size character: {}. It must be a "
825                    "format supported by struct.".format(byte_size))
826  try:
827    value_size = struct.unpack(byte_size_format,
828                               stream.read(byte_size_length))[0]
829  except struct.error:
830    raise AftlError("Not enough data to read size: {}".format(byte_size))
831  value = stream.read(value_size)
832  if value_size != len(value):
833    raise AftlError("Not enough data to read value: "
834                    "{} != {}".format(value_size, len(value)))
835  return value
836
837def tls_encode_bytes(byte_size, value, stream):
838  """Encodes a variable-length vector.
839
840  In the TLS presentation language, a variable-length vector is a pair
841  (size, value). |size| describes the size of the |value| to read
842  in bytes. All values are encoded in big-endian.
843  See https://tools.ietf.org/html/rfc8446#section-3 for more details.
844
845  Arguments:
846      byte_size: A format character as described in the struct module
847          which describes the expected length of the size. For
848          instance, "B", "H", "L" or "Q".
849      value: the value to encode. The length of |value| must be
850          representable with |byte_size|.
851      stream: a BytesIO to which the value is encoded to.
852
853  Raises:
854    AftlError: If |byte_size| is not a known format character, or if
855    |value|'s length cannot be represent with |byte_size|.
856  """
857  byte_size_format = "!" + byte_size
858  try:
859    stream.write(struct.pack(byte_size_format, len(value)))
860  except struct.error:
861    # Whether byte_size is invalid or not large enough to represent value,
862    # struct returns an struct.error exception. Instead of matching on the
863    # exception message, capture both cases in a generic message.
864    raise AftlError("Invalid byte_size to store {} bytes".format(len(value)))
865  stream.write(value)
866
867class HashAlgorithm(enum.Enum):
868  SHA256 = 0
869
870class SignatureAlgorithm(enum.Enum):
871  RSA = 0
872  ECDSA = 1
873
874class Signature(object):
875  """Represents a signature of some data.
876
877  It is usually made using a manufacturer key and used to sign part of a leaf
878  that belongs to the transparency log. The encoding of this structure must
879  match the server expectation.
880
881  Attributes:
882    hash_algorithm: the HashAlgorithm used for the signature.
883    signature_algorithm: the SignatureAlgorithm used.
884    signature: the raw signature in bytes.
885  """
886  FORMAT_STRING = ('!B'    # Hash algorithm
887                   'B'     # Signing algorithm
888                  )
889  # Followed by the raw signature, encoded as a TLS variable-length vector
890  # which size is represented using 2 bytes.
891
892  def __init__(self, hash_algorithm=HashAlgorithm.SHA256,
893               signature_algorithm=SignatureAlgorithm.RSA, signature=b''):
894    self.hash_algorithm = hash_algorithm
895    self.signature_algorithm = signature_algorithm
896    self.signature = signature
897
898  @classmethod
899  def parse(cls, stream):
900    """Parses a TLS-encoded structure and returns a new Signature.
901
902    Arguments:
903      stream: a BytesIO to read the signature from.
904
905    Returns:
906      A new Signature object.
907
908    Raises:
909      AftlError: If the hash algorithm or signature algorithm value is
910        unknown; or if the decoding failed.
911    """
912    data_length = struct.calcsize(cls.FORMAT_STRING)
913    (hash_algorithm, signature_algorithm) = struct.unpack(
914        cls.FORMAT_STRING, stream.read(data_length))
915    try:
916      hash_algorithm = HashAlgorithm(hash_algorithm)
917    except ValueError:
918      raise AftlError('unknown hash algorithm: {}'.format(hash_algorithm))
919    try:
920      signature_algorithm = SignatureAlgorithm(signature_algorithm)
921    except ValueError:
922      raise AftlError('unknown signature algorithm: {}'.format(
923          signature_algorithm))
924    signature = tls_decode_bytes('H', stream)
925    return Signature(hash_algorithm, signature_algorithm, signature)
926
927  def get_expected_size(self):
928    """Returns the size of the encoded Signature."""
929    return struct.calcsize(self.FORMAT_STRING) + \
930        struct.calcsize('H') + len(self.signature)
931
932  def encode(self, stream):
933    """Encodes the Signature.
934
935    Arguments:
936      stream: a BytesIO to which the signature is written.
937    """
938    stream.write(struct.pack(self.FORMAT_STRING, self.hash_algorithm.value,
939                             self.signature_algorithm.value))
940    tls_encode_bytes('H', self.signature, stream)
941
942class VBMetaPrimaryAnnotation(object):
943  """An annotation that contains metadata about a VBMeta image.
944
945  Attributes:
946    vbmeta_hash: the SHA256 of the VBMeta it references.
947    version_incremental: the version incremental of the build, as string.
948    manufacturer_key_hash: the hash of the manufacturer key that will
949        sign this annotation.
950    description: a free-form field.
951  """
952
953  def __init__(self, vbmeta_hash=b'', version_incremental='',
954               manufacturer_key_hash=b'', description=''):
955    """Default constructor."""
956    self.vbmeta_hash = vbmeta_hash
957    self.version_incremental = version_incremental
958    self.manufacturer_key_hash = manufacturer_key_hash
959    self.description = description
960
961  @classmethod
962  def parse(cls, stream):
963    """Parses a VBMetaPrimaryAnnotation from data.
964
965    Arguments:
966      stream: an io.BytesIO to decode the annotation from.
967
968    Returns:
969      A new VBMetaPrimaryAnnotation.
970
971    Raises:
972      AftlError: If an error occured while parsing the annotation.
973    """
974    vbmeta_hash = tls_decode_bytes("B", stream)
975    version_incremental = tls_decode_bytes("B", stream)
976    try:
977      version_incremental = version_incremental.decode("ascii")
978    except UnicodeError:
979      raise AftlError('Failed to convert version incremental to an ASCII'
980                      'string')
981    manufacturer_key_hash = tls_decode_bytes("B", stream)
982    description = tls_decode_bytes("H", stream)
983    try:
984      description = description.decode("utf-8")
985    except UnicodeError:
986      raise AftlError('Failed to convert description to an UTF-8 string')
987    return cls(vbmeta_hash, version_incremental, manufacturer_key_hash,
988               description)
989
990  def sign(self, manufacturer_key_path, signing_helper=None,
991           signing_helper_with_files=None):
992    """Signs the annotation.
993
994    Arguments:
995      manufacturer_key_path: Path to key used to sign messages sent to the
996        transparency log servers.
997      signing_helper: Program which signs a hash and returns a signature.
998      signing_helper_with_files: Same as signing_helper but uses files instead.
999
1000    Returns:
1001      A new SignedVBMetaPrimaryAnnotation.
1002
1003    Raises:
1004      AftlError: If an error occured while signing the annotation.
1005    """
1006    # AFTL supports SHA256_RSA4096 for now, more will be available.
1007    algorithm_name = 'SHA256_RSA4096'
1008    encoded_leaf = io.BytesIO()
1009    self.encode(encoded_leaf)
1010    try:
1011      rsa_key = avbtool.RSAPublicKey(manufacturer_key_path)
1012      raw_signature = rsa_key.sign(algorithm_name, encoded_leaf.getvalue(),
1013                                   signing_helper, signing_helper_with_files)
1014    except avbtool.AvbError as e:
1015      raise AftlError('Failed to sign VBMetaPrimaryAnnotation with '
1016                      '--manufacturer_key: {}'.format(e))
1017    signature = Signature(hash_algorithm=HashAlgorithm.SHA256,
1018                          signature_algorithm=SignatureAlgorithm.RSA,
1019                          signature=raw_signature)
1020    return SignedVBMetaPrimaryAnnotation(signature=signature, annotation=self)
1021
1022  def encode(self, stream):
1023    """Encodes the VBMetaPrimaryAnnotation.
1024
1025    Arguments:
1026      stream: a BytesIO to which the signature is written.
1027
1028    Raises:
1029      AftlError: If the encoding failed.
1030    """
1031    tls_encode_bytes("B", self.vbmeta_hash, stream)
1032    try:
1033      tls_encode_bytes("B", self.version_incremental.encode("ascii"), stream)
1034    except UnicodeError:
1035      raise AftlError('Unable to encode version incremental to ASCII')
1036    tls_encode_bytes("B", self.manufacturer_key_hash, stream)
1037    try:
1038      tls_encode_bytes("H", self.description.encode("utf-8"), stream)
1039    except UnicodeError:
1040      raise AftlError('Unable to encode description to UTF-8')
1041
1042  def get_expected_size(self):
1043    """Returns the size of the encoded annotation."""
1044    b = io.BytesIO()
1045    self.encode(b)
1046    return len(b.getvalue())
1047
1048  def print_desc(self, o):
1049    """Print the VBMetaPrimaryAnnotation.
1050
1051    Arguments:
1052      o: The object to write the output to.
1053    """
1054    o.write('      VBMeta Primary Annotation:\n')
1055    i = ' ' * 8
1056    fmt = '{}{:23}{}\n'
1057    if self.vbmeta_hash:
1058      o.write(fmt.format(i, 'VBMeta hash:', self.vbmeta_hash.hex()))
1059    if self.version_incremental:
1060      o.write(fmt.format(i, 'Version incremental:', self.version_incremental))
1061    if self.manufacturer_key_hash:
1062      o.write(fmt.format(i, 'Manufacturer key hash:',
1063                         self.manufacturer_key_hash.hex()))
1064    if self.description:
1065      o.write(fmt.format(i, 'Description:', self.description))
1066
1067
1068class SignedVBMetaPrimaryAnnotation(object):
1069  """A Signed VBMetaPrimaryAnnotation.
1070
1071  Attributes:
1072    signature: a Signature.
1073    annotation: a VBMetaPrimaryAnnotation.
1074  """
1075
1076  def __init__(self, signature=None, annotation=None):
1077    """Default constructor."""
1078    if not signature:
1079      signature = Signature()
1080    self.signature = signature
1081    if not annotation:
1082      annotation = VBMetaPrimaryAnnotation()
1083    self.annotation = annotation
1084
1085  @classmethod
1086  def parse(cls, stream):
1087    """Parses a signed annotation."""
1088    signature = Signature.parse(stream)
1089    annotation = VBMetaPrimaryAnnotation.parse(stream)
1090    return cls(signature, annotation)
1091
1092  def get_expected_size(self):
1093    """Returns the size of the encoded signed annotation."""
1094    return self.signature.get_expected_size() + \
1095             self.annotation.get_expected_size()
1096
1097  def encode(self, stream):
1098    """Encodes the SignedVBMetaPrimaryAnnotation.
1099
1100    Arguments:
1101      stream: a BytesIO to which the object is written.
1102
1103    Raises:
1104      AftlError: If the encoding failed.
1105    """
1106    self.signature.encode(stream)
1107    self.annotation.encode(stream)
1108
1109  def print_desc(self, o):
1110    """Prints the annotation.
1111
1112    Arguments:
1113      o: The object to write the output to.
1114    """
1115    self.annotation.print_desc(o)
1116
1117class Leaf(abc.ABC):
1118  """An abstract class to represent the leaves in the transparency log."""
1119  FORMAT_STRING = ('!B'   # Version
1120                   'Q'    # Timestamp
1121                   'B'    # LeafType
1122                  )
1123
1124  class LeafType(enum.Enum):
1125    VBMetaType = 0
1126    SignedVBMetaPrimaryAnnotationType = 1
1127
1128  def __init__(self, version=1, timestamp=0, leaf_type=LeafType.VBMetaType):
1129    """Build a new leaf."""
1130    self.version = version
1131    self.timestamp = timestamp
1132    self.leaf_type = leaf_type
1133
1134  @classmethod
1135  def _parse_header(cls, stream):
1136    """Parses the header of a leaf.
1137
1138    This is called with the parse method of the subclasses.
1139
1140    Arguments:
1141      stream: a BytesIO to read the header from.
1142
1143    Returns:
1144      A tuple (version, timestamp, leaf_type).
1145
1146    Raises:
1147      AftlError: If the header cannot be decoded; or if the leaf type is
1148          unknown.
1149    """
1150    data_length = struct.calcsize(cls.FORMAT_STRING)
1151    try:
1152      (version, timestamp, leaf_type) = struct.unpack(
1153          cls.FORMAT_STRING, stream.read(data_length))
1154    except struct.error:
1155      raise AftlError("Not enough data to parse leaf header")
1156    try:
1157      leaf_type = cls.LeafType(leaf_type)
1158    except ValueError:
1159      raise AftlError("Unknown leaf type: {}".format(leaf_type))
1160    return version, timestamp, leaf_type
1161
1162  @classmethod
1163  @abc.abstractmethod
1164  def parse(cls, data):
1165    """Parses a leaf and returned a new object.
1166
1167    This abstract method must be implemented by the subclass. It may use
1168    _parse_header to parse the common fields.
1169
1170    Arguments:
1171      data: a bytes-like object.
1172
1173    Returns:
1174      An object of the type of the particular subclass.
1175
1176    Raises:
1177      AftlError: If the leaf type is incorrect; or if the decoding failed.
1178    """
1179
1180  @abc.abstractmethod
1181  def encode(self):
1182    """Encodes a leaf.
1183
1184    This abstract method must be implemented by the subclass. It may use
1185    _encode_header to encode the common fields.
1186
1187    Returns:
1188      A bytes with the encoded leaf.
1189
1190    Raises:
1191      AftlError: If the encoding failed.
1192    """
1193
1194  def _get_expected_header_size(self):
1195    """Returns the size of the leaf header."""
1196    return struct.calcsize(self.FORMAT_STRING)
1197
1198  def _encode_header(self, stream):
1199    """Encodes the header of the leaf.
1200
1201    This method is called by the encode method in the subclass.
1202
1203    Arguments:
1204      stream: a BytesIO to which the object is written.
1205
1206    Raises:
1207      AftlError: If the encoding failed.
1208    """
1209    try:
1210      stream.write(struct.pack(self.FORMAT_STRING, self.version, self.timestamp,
1211                               self.leaf_type.value))
1212    except struct.error:
1213      raise AftlError('Unable to encode the leaf header')
1214
1215  def print_desc(self, o):
1216    """Prints the leaf header.
1217
1218    Arguments:
1219      o: The object to write the output to.
1220    """
1221    i = ' ' * 6
1222    fmt = '{}{:23}{}\n'
1223    o.write(fmt.format(i, 'Version:', self.version))
1224    o.write(fmt.format(i, 'Timestamp:', self.timestamp))
1225    o.write(fmt.format(i, 'Type:', self.leaf_type))
1226
1227
1228class SignedVBMetaPrimaryAnnotationLeaf(Leaf):
1229  """A Signed VBMetaPrimaryAnnotation leaf."""
1230
1231  def __init__(self, version=1, timestamp=0,
1232               signed_vbmeta_primary_annotation=None):
1233    """Builds a new Signed VBMeta Primary Annotation leaf."""
1234    super(SignedVBMetaPrimaryAnnotationLeaf, self).__init__(
1235        version=version, timestamp=timestamp,
1236        leaf_type=self.LeafType.SignedVBMetaPrimaryAnnotationType)
1237    if not signed_vbmeta_primary_annotation:
1238      signed_vbmeta_primary_annotation = SignedVBMetaPrimaryAnnotation()
1239    self.signed_vbmeta_primary_annotation = signed_vbmeta_primary_annotation
1240
1241  @property
1242  def annotation(self):
1243    """Returns the VBMetaPrimaryAnnotation contained in the leaf."""
1244    return self.signed_vbmeta_primary_annotation.annotation
1245
1246  @property
1247  def signature(self):
1248    """Returns the Signature contained in the leaf."""
1249    return self.signed_vbmeta_primary_annotation.signature
1250
1251  @classmethod
1252  def parse(cls, data):
1253    """Parses an encoded contained in data.
1254
1255    Arguments:
1256      data: a bytes-like object.
1257
1258    Returns:
1259      A SignedVBMetaPrimaryAnnotationLeaf.
1260
1261    Raises:
1262      AftlError if the leaf type is incorrect; or if the decoding failed.
1263    """
1264    encoded_leaf = io.BytesIO(data)
1265    version, timestamp, leaf_type = Leaf._parse_header(encoded_leaf)
1266    if leaf_type != Leaf.LeafType.SignedVBMetaPrimaryAnnotationType:
1267      raise AftlError("Incorrect leaf type")
1268    signed_annotation = SignedVBMetaPrimaryAnnotation.parse(encoded_leaf)
1269    return cls(version=version, timestamp=timestamp,
1270               signed_vbmeta_primary_annotation=signed_annotation)
1271
1272  def get_expected_size(self):
1273    """Returns the size of the leaf."""
1274    size = self._get_expected_header_size()
1275    if self.signed_vbmeta_primary_annotation:
1276      size += self.signed_vbmeta_primary_annotation.get_expected_size()
1277    return size
1278
1279  def encode(self):
1280    """Encodes the leaf.
1281
1282    Returns:
1283      bytes which contains the encoded leaf.
1284
1285    Raises:
1286      AftlError: If the encoding failed.
1287    """
1288    stream = io.BytesIO()
1289    self._encode_header(stream)
1290    self.signed_vbmeta_primary_annotation.encode(stream)
1291    return stream.getvalue()
1292
1293  def print_desc(self, o):
1294    """Prints the leaf.
1295
1296    Arguments:
1297      o: The object to write the output to.
1298    """
1299    i = ' ' * 4
1300    fmt = '{}{:25}{}\n'
1301    o.write(fmt.format(i, 'Leaf:', ''))
1302    super(SignedVBMetaPrimaryAnnotationLeaf, self).print_desc(o)
1303    self.signed_vbmeta_primary_annotation.print_desc(o)
1304
1305
1306class AftlImage(object):
1307  """A class for the AFTL image, which contains the transparency log ICPs.
1308
1309  This encapsulates an AFTL ICP section with all information required to
1310  validate an inclusion proof.
1311
1312  Attributes:
1313    image_header: A header for the section.
1314    icp_entries: A list of AftlIcpEntry objects representing the inclusion
1315        proofs.
1316  """
1317
1318  def __init__(self, data=None):
1319    """Initializes a new AftlImage section.
1320
1321    Arguments:
1322      data: If not None, must be a bytearray representing an AftlImage.
1323
1324    Raises:
1325      AftlError: If the data does not represent a well-formed AftlImage.
1326    """
1327    if data:
1328      image_header_bytes = data[0:AftlImageHeader.SIZE]
1329      self.image_header = AftlImageHeader(image_header_bytes)
1330      if not self.image_header.is_valid():
1331        raise AftlError('Invalid AftlImageHeader.')
1332      icp_count = self.image_header.icp_count
1333
1334      # Jump past the header for entry deserialization.
1335      icp_index = AftlImageHeader.SIZE
1336      # Validate each entry.
1337      self.icp_entries = []
1338      # add_icp_entry() updates entries and header, so set header count to
1339      # compensate.
1340      self.image_header.icp_count = 0
1341      for i in range(icp_count):
1342        # Get the entry header from the AftlImage.
1343        cur_icp_entry = AftlIcpEntry(data[icp_index:])
1344        cur_icp_entry_size = cur_icp_entry.get_expected_size()
1345        # Now validate the entry structure.
1346        if not cur_icp_entry.is_valid():
1347          raise AftlError('Validation of ICP entry {} failed.'.format(i))
1348        self.add_icp_entry(cur_icp_entry)
1349        icp_index += cur_icp_entry_size
1350    else:
1351      self.image_header = AftlImageHeader()
1352      self.icp_entries = []
1353    if not self.is_valid():
1354      raise AftlError('Invalid AftlImage.')
1355
1356  def add_icp_entry(self, icp_entry):
1357    """Adds a new AftlIcpEntry to the AftlImage, updating fields as needed.
1358
1359    Arguments:
1360      icp_entry: An AftlIcpEntry structure.
1361    """
1362    self.icp_entries.append(icp_entry)
1363    self.image_header.icp_count += 1
1364    self.image_header.aftl_image_size += icp_entry.get_expected_size()
1365
1366  def verify_vbmeta_image(self, vbmeta_image, transparency_log_pub_keys):
1367    """Verifies the contained inclusion proof given the public log key.
1368
1369    Arguments:
1370      vbmeta_image: The vbmeta_image that should be verified against the
1371        inclusion proof.
1372      transparency_log_pub_keys: List of paths to PEM files containing trusted
1373        public keys that correspond with the transparency_logs.
1374
1375    Returns:
1376      True if all the inclusion proofs in the AfltDescriptor validate, are
1377      signed by one of the give transparency log public keys; otherwise false.
1378    """
1379    if not transparency_log_pub_keys or not self.icp_entries:
1380      return False
1381
1382    icp_verified = 0
1383    for icp_entry in self.icp_entries:
1384      verified = False
1385      for pub_key in transparency_log_pub_keys:
1386        if icp_entry.verify_vbmeta_image(vbmeta_image, pub_key):
1387          verified = True
1388          break
1389      if verified:
1390        icp_verified += 1
1391    return icp_verified == len(self.icp_entries)
1392
1393  def encode(self):
1394    """Serialize the AftlImage to a bytearray().
1395
1396    Returns:
1397      A bytearray() with the encoded AFTL image.
1398
1399    Raises:
1400      AftlError: If invalid AFTL image structure.
1401    """
1402    # The header and entries are guaranteed to be valid when encode is called.
1403    # Check the entire structure as a whole.
1404    if not self.is_valid():
1405      raise AftlError('Invalid AftlImage structure.')
1406
1407    aftl_image = bytearray()
1408    aftl_image.extend(self.image_header.encode())
1409    for icp_entry in self.icp_entries:
1410      aftl_image.extend(icp_entry.encode())
1411    return aftl_image
1412
1413  def is_valid(self):
1414    """Ensures that values in the AftlImage are sane.
1415
1416    Returns:
1417      True if the values in the AftlImage are sane, False otherwise.
1418    """
1419    if not self.image_header.is_valid():
1420      return False
1421
1422    if self.image_header.icp_count != len(self.icp_entries):
1423      return False
1424
1425    for icp_entry in self.icp_entries:
1426      if not icp_entry.is_valid():
1427        return False
1428    return True
1429
1430  def print_desc(self, o):
1431    """Print the AFTL image.
1432
1433    Arguments:
1434      o: The object to write the output to.
1435    """
1436    o.write('Android Firmware Transparency Image:\n')
1437    self.image_header.print_desc(o)
1438    for i, icp_entry in enumerate(self.icp_entries):
1439      o.write('  Entry #{}:\n'.format(i + 1))
1440      icp_entry.print_desc(o)
1441
1442
1443class AftlCommunication(object):
1444  """Class to abstract the communication layer with the transparency log."""
1445
1446  def __init__(self, transparency_log_config, timeout):
1447    """Initializes the object.
1448
1449    Arguments:
1450      transparency_log_config: A TransparencyLogConfig instance.
1451      timeout: Duration in seconds before requests to the AFTL times out. A
1452        value of 0 or None means there will be no timeout.
1453    """
1454    self.transparency_log_config = transparency_log_config
1455    if timeout:
1456      self.timeout = timeout
1457    else:
1458      self.timeout = None
1459
1460  def add_vbmeta(self, request):
1461    """Calls the AddVBMeta RPC on the AFTL server.
1462
1463    Arguments:
1464      request: An AddVBMetaRequest message.
1465
1466    Returns:
1467      An AddVBMetaResponse message.
1468
1469    Raises:
1470      AftlError: If grpc or the proto modules cannot be loaded, if there is an
1471        error communicating with the log.
1472    """
1473    raise NotImplementedError(
1474        'add_vbmeta() needs to be implemented by subclass.')
1475
1476
1477class AftlGrpcCommunication(AftlCommunication):
1478  """Class that implements GRPC communication to the AFTL server."""
1479
1480  def add_vbmeta(self, request):
1481    """Calls the AddVBMeta RPC on the AFTL server.
1482
1483    Arguments:
1484      request: An AddVBMetaRequest message.
1485
1486    Returns:
1487      An AddVBMetaResponse message.
1488
1489    Raises:
1490      AftlError: If grpc or the proto modules cannot be loaded, if there is an
1491        error communicating with the log.
1492    """
1493    # Import grpc now to avoid global dependencies as it otherwise breaks
1494    # running unittest with atest.
1495    try:
1496      import grpc  # pylint: disable=import-outside-toplevel
1497      from proto import api_pb2_grpc # pylint: disable=import-outside-toplevel
1498    except ImportError as e:
1499      err_str = 'grpc can be installed with python pip install grpcio.\n'
1500      raise AftlError('Failed to import module: ({}).\n{}'.format(e, err_str))
1501
1502    # Set up the gRPC channel with the transparency log.
1503    sys.stdout.write('Preparing to request inclusion proof from {}. This could '
1504                     'take ~30 seconds for the process to complete.\n'.format(
1505                         self.transparency_log_config.target))
1506    channel = grpc.insecure_channel(self.transparency_log_config.target)
1507    stub = api_pb2_grpc.AFTLogStub(channel)
1508
1509    metadata = []
1510    if self.transparency_log_config.api_key:
1511      metadata.append(('x-api-key', self.transparency_log_config.api_key))
1512
1513    # Attempt to transmit to the transparency log.
1514    sys.stdout.write('ICP is about to be requested from transparency log '
1515                     'with domain {}.\n'.format(
1516                         self.transparency_log_config.target))
1517    try:
1518      response = stub.AddVBMeta(request, timeout=self.timeout,
1519                                metadata=metadata)
1520    except grpc.RpcError as e:
1521      raise AftlError('Error: grpc failure ({})'.format(e))
1522    return response
1523
1524
1525class Aftl(avbtool.Avb):
1526  """Business logic for aftltool command-line tool."""
1527
1528  def get_vbmeta_image(self, image_filename):
1529    """Gets the VBMeta struct bytes from image.
1530
1531    Arguments:
1532      image_filename: Image file to get information from.
1533
1534    Returns:
1535      A tuple with following elements:
1536        1. A bytearray with the vbmeta structure or None if the file does not
1537           contain a VBMeta structure.
1538        2. The VBMeta image footer.
1539    """
1540    # Reads and parses the vbmeta image.
1541    try:
1542      image = avbtool.ImageHandler(image_filename, read_only=True)
1543    except (IOError, ValueError) as e:
1544      sys.stderr.write('The image does not contain a valid VBMeta structure: '
1545                       '{}.\n'.format(e))
1546      return None, None
1547
1548    try:
1549      (footer, header, _, _) = self._parse_image(image)
1550    except avbtool.AvbError as e:
1551      sys.stderr.write('The image cannot be parsed: {}.\n'.format(e))
1552      return None, None
1553
1554    # Seeks for the start of the vbmeta image and calculates its size.
1555    offset = 0
1556    if footer:
1557      offset = footer.vbmeta_offset
1558    vbmeta_image_size = (offset + header.SIZE
1559                         + header.authentication_data_block_size
1560                         + header.auxiliary_data_block_size)
1561
1562    # Reads the vbmeta image bytes.
1563    try:
1564      image.seek(offset)
1565    except RuntimeError as e:
1566      sys.stderr.write('Given vbmeta image offset is invalid: {}.\n'.format(e))
1567      return None, None
1568    return image.read(vbmeta_image_size), footer
1569
1570  def get_aftl_image(self, image_filename):
1571    """Gets the AftlImage from image.
1572
1573    Arguments:
1574      image_filename: Image file to get information from.
1575
1576    Returns:
1577      An AftlImage or None if the file does not contain a AftlImage.
1578    """
1579    # Reads the vbmeta image bytes.
1580    vbmeta_image, _ = self.get_vbmeta_image(image_filename)
1581    if not vbmeta_image:
1582      return None
1583
1584    try:
1585      image = avbtool.ImageHandler(image_filename, read_only=True)
1586    except ValueError as e:
1587      sys.stderr.write('The image does not contain a valid VBMeta structure: '
1588                       '{}.\n'.format(e))
1589      return None
1590
1591    # Seeks for the start of the AftlImage.
1592    try:
1593      image.seek(len(vbmeta_image))
1594    except RuntimeError as e:
1595      sys.stderr.write('Given AftlImage image offset is invalid: {}.\n'
1596                       .format(e))
1597      return None
1598
1599    # Parses the header for the AftlImage size.
1600    tmp_header_bytes = image.read(AftlImageHeader.SIZE)
1601    if not tmp_header_bytes or len(tmp_header_bytes) != AftlImageHeader.SIZE:
1602      sys.stderr.write('This image does not contain an AftlImage.\n')
1603      return None
1604
1605    try:
1606      tmp_header = AftlImageHeader(tmp_header_bytes)
1607    except AftlError as e:
1608      sys.stderr.write('This image does not contain a valid AftlImage: '
1609                       '{}.\n'.format(e))
1610      return None
1611
1612    # Resets to the beginning of the AftlImage.
1613    try:
1614      image.seek(len(vbmeta_image))
1615    except RuntimeError as e:
1616      sys.stderr.write('Given AftlImage image offset is invalid: {}.\n'
1617                       .format(e))
1618      return None
1619
1620    # Parses the full AftlImage.
1621    aftl_image_bytes = image.read(tmp_header.aftl_image_size)
1622    aftl_image = None
1623    try:
1624      aftl_image = AftlImage(aftl_image_bytes)
1625    except AftlError as e:
1626      sys.stderr.write('The image does not contain a valid AftlImage: '
1627                       '{}.\n'.format(e))
1628    return aftl_image
1629
1630  def info_image_icp(self, vbmeta_image_path, output):
1631    """Implements the 'info_image_icp' command.
1632
1633    Arguments:
1634      vbmeta_image_path: Image file to get information from.
1635      output: Output file to write human-readable information to (file object).
1636
1637    Returns:
1638      True if the given image has an AftlImage and could successfully
1639      be processed; otherwise False.
1640    """
1641    aftl_image = self.get_aftl_image(vbmeta_image_path)
1642    if not aftl_image:
1643      return False
1644    aftl_image.print_desc(output)
1645    return True
1646
1647  def verify_image_icp(self, vbmeta_image_path, transparency_log_pub_keys,
1648                       output):
1649    """Implements the 'verify_image_icp' command.
1650
1651    Arguments:
1652      vbmeta_image_path: Image file to get information from.
1653      transparency_log_pub_keys: List of paths to PEM files containing trusted
1654        public keys that correspond with the transparency_logs.
1655      output: Output file to write human-readable information to (file object).
1656
1657    Returns:
1658      True if for the given image the inclusion proof validates; otherwise
1659      False.
1660    """
1661    vbmeta_image, _ = self.get_vbmeta_image(vbmeta_image_path)
1662    aftl_image = self.get_aftl_image(vbmeta_image_path)
1663    if not aftl_image or not vbmeta_image:
1664      return False
1665    verified = aftl_image.verify_vbmeta_image(vbmeta_image,
1666                                              transparency_log_pub_keys)
1667    if not verified:
1668      output.write('The inclusion proofs for the image do not validate.\n')
1669      return False
1670    output.write('The inclusion proofs for the image successfully validate.\n')
1671    return True
1672
1673  def request_inclusion_proof(self, transparency_log_config, vbmeta_image,
1674                              version_inc, manufacturer_key_path,
1675                              signing_helper, signing_helper_with_files,
1676                              timeout, aftl_comms=None):
1677    """Packages and sends a request to the specified transparency log.
1678
1679    Arguments:
1680      transparency_log_config: A TransparencyLogConfig instance.
1681      vbmeta_image: A bytearray with the VBMeta image.
1682      version_inc: Subcomponent of the build fingerprint.
1683      manufacturer_key_path: Path to key used to sign messages sent to the
1684        transparency log servers.
1685      signing_helper: Program which signs a hash and returns a signature.
1686      signing_helper_with_files: Same as signing_helper but uses files instead.
1687      timeout: Duration in seconds before requests to the transparency log
1688        timeout.
1689      aftl_comms: A subclass of the AftlCommunication class. The default is
1690        to use AftlGrpcCommunication.
1691
1692    Returns:
1693      An AftlIcpEntry with the inclusion proof for the log entry.
1694
1695    Raises:
1696      AftlError: If grpc or the proto modules cannot be loaded, if there is an
1697         error communicating with the log, if the manufacturer_key_path
1698         cannot be decoded, or if the log submission cannot be signed.
1699    """
1700    # Calculate the hash of the vbmeta image.
1701    vbmeta_hash = hashlib.sha256(vbmeta_image).digest()
1702
1703    # Extract the key data from the PEM file if of size 4096.
1704    manufacturer_key = avbtool.RSAPublicKey(manufacturer_key_path)
1705    if manufacturer_key.num_bits != 4096:
1706      raise AftlError('Manufacturer keys not of size 4096: {}'.format(
1707          manufacturer_key.num_bits))
1708    manufacturer_key_data = rsa_key_read_pem_bytes(manufacturer_key_path)
1709
1710    # Calculate the hash of the manufacturer key data.
1711    m_key_hash = hashlib.sha256(manufacturer_key_data).digest()
1712
1713    # Build VBMetaPrimaryAnnotation with that data.
1714    annotation = VBMetaPrimaryAnnotation(
1715        vbmeta_hash=vbmeta_hash, version_incremental=version_inc,
1716        manufacturer_key_hash=m_key_hash)
1717
1718    # Sign annotation and add it to the request.
1719    signed_annotation = annotation.sign(
1720        manufacturer_key_path, signing_helper=signing_helper,
1721        signing_helper_with_files=signing_helper_with_files)
1722
1723    encoded_signed_annotation = io.BytesIO()
1724    signed_annotation.encode(encoded_signed_annotation)
1725    request = api_pb2.AddVBMetaRequest(
1726        vbmeta=vbmeta_image,
1727        signed_vbmeta_primary_annotation=encoded_signed_annotation.getvalue())
1728
1729    # Submit signed VBMeta annotation to the server.
1730    if not aftl_comms:
1731      aftl_comms = AftlGrpcCommunication(transparency_log_config, timeout)
1732    response = aftl_comms.add_vbmeta(request)
1733
1734    # Return an AftlIcpEntry representing this response.
1735    icp_entry = AftlIcpEntry()
1736    icp_entry.translate_response(transparency_log_config.target, response)
1737    return icp_entry
1738
1739  def make_icp_from_vbmeta(self, vbmeta_image_path, output,
1740                           signing_helper, signing_helper_with_files,
1741                           version_incremental, transparency_log_configs,
1742                           manufacturer_key, padding_size, timeout):
1743    """Generates a vbmeta image with inclusion proof given a vbmeta image.
1744
1745    The AftlImage contains the information required to validate an inclusion
1746    proof for a specific vbmeta image. It consists of a header (struct
1747    AftlImageHeader) and zero or more entry structures (struct AftlIcpEntry)
1748    that contain the vbmeta leaf hash, tree size, root hash, inclusion proof
1749    hashes, and the signature for the root hash.
1750
1751    The vbmeta image, its hash, the version_incremental part of the build
1752    fingerprint, and the hash of the manufacturer key are sent to the
1753    transparency log, with the message signed by the manufacturer key.
1754    An inclusion proof is calculated and returned. This inclusion proof is
1755    then packaged in an AftlImage structure. The existing vbmeta data is
1756    copied to a new file, appended with the AftlImage data, and written to
1757    output. Validation of the inclusion proof does not require
1758    communication with the transparency log.
1759
1760    Arguments:
1761      vbmeta_image_path: Path to a vbmeta image file.
1762      output: File to write the results to.
1763      signing_helper: Program which signs a hash and returns a signature.
1764      signing_helper_with_files: Same as signing_helper but uses files instead.
1765      version_incremental: A string representing the subcomponent of the
1766        build fingerprint used to identify the vbmeta in the transparency log.
1767      transparency_log_configs: List of TransparencyLogConfig used to request
1768        the inclusion proofs.
1769      manufacturer_key: Path to PEM file containting the key file used to sign
1770        messages sent to the transparency log servers.
1771      padding_size: If not 0, pads output so size is a multiple of the number.
1772      timeout: Duration in seconds before requests to the AFTL times out. A
1773        value of 0 or None means there will be no timeout.
1774
1775    Returns:
1776      True if the inclusion proofs could be fetched from the transparency log
1777      servers and could be successfully validated; otherwise False.
1778    """
1779    # Retrieves vbmeta structure from given partition image.
1780    vbmeta_image, footer = self.get_vbmeta_image(vbmeta_image_path)
1781
1782    # Fetches inclusion proofs for vbmeta structure from all transparency logs.
1783    aftl_image = AftlImage()
1784    for log_config in transparency_log_configs:
1785      try:
1786        icp_entry = self.request_inclusion_proof(log_config, vbmeta_image,
1787                                                 version_incremental,
1788                                                 manufacturer_key,
1789                                                 signing_helper,
1790                                                 signing_helper_with_files,
1791                                                 timeout)
1792        if not icp_entry.verify_vbmeta_image(vbmeta_image, log_config.pub_key):
1793          sys.stderr.write('The inclusion proof from {} could not be verified.'
1794                           '\n'.format(log_config.target))
1795        aftl_image.add_icp_entry(icp_entry)
1796      except AftlError as e:
1797        # The inclusion proof request failed. Continue and see if others will.
1798        sys.stderr.write('Requesting inclusion proof failed: {}.\n'.format(e))
1799        continue
1800
1801    # Checks that the resulting AftlImage is sane.
1802    if aftl_image.image_header.icp_count != len(transparency_log_configs):
1803      sys.stderr.write('Valid inclusion proofs could only be retrieved from {} '
1804                       'out of {} transparency logs.\n'
1805                       .format(aftl_image.image_header.icp_count,
1806                               len(transparency_log_configs)))
1807      return False
1808    if not aftl_image.is_valid():
1809      sys.stderr.write('Resulting AftlImage structure is malformed.\n')
1810      return False
1811    keys = [log.pub_key for log in transparency_log_configs]
1812    if not aftl_image.verify_vbmeta_image(vbmeta_image, keys):
1813      sys.stderr.write('Resulting AftlImage inclusion proofs do not '
1814                       'validate.\n')
1815      return False
1816
1817    # Writes original VBMeta image, followed by the AftlImage into the output.
1818    if footer:  # Checks if it is a chained partition.
1819      # TODO(b/147217370): Determine the best way to handle chained partitions
1820      # like the system.img. Currently, we only put the main vbmeta.img in the
1821      # transparency log.
1822      sys.stderr.write('Image has a footer and ICP for this format is not '
1823                       'implemented.\n')
1824      return False
1825
1826    output.seek(0)
1827    output.write(vbmeta_image)
1828    encoded_aftl_image = aftl_image.encode()
1829    output.write(encoded_aftl_image)
1830
1831    if padding_size > 0:
1832      total_image_size = len(vbmeta_image) + len(encoded_aftl_image)
1833      padded_size = avbtool.round_to_multiple(total_image_size, padding_size)
1834      padding_needed = padded_size - total_image_size
1835      output.write('\0' * padding_needed)
1836
1837    sys.stdout.write('VBMeta image with AFTL image successfully created.\n')
1838    return True
1839
1840  def _load_test_process_function(self, vbmeta_image_path,
1841                                  transparency_log_config,
1842                                  manufacturer_key,
1843                                  process_number, submission_count,
1844                                  preserve_icp_images, timeout, result_queue):
1845    """Function to be used by multiprocessing.Process.
1846
1847    Arguments:
1848      vbmeta_image_path: Path to a vbmeta image file.
1849      transparency_log_config: A TransparencyLogConfig instance used to request
1850        an inclusion proof.
1851      manufacturer_key: Path to PEM file containting the key file used to sign
1852        messages sent to the transparency log servers.
1853      process_number: The number of the processes executing the function.
1854      submission_count: Number of total submissions to perform per
1855        process_count.
1856      preserve_icp_images: Boolean to indicate if the generated vbmeta image
1857        files with inclusion proofs should be preserved in the temporary
1858        directory.
1859      timeout: Duration in seconds before requests to the AFTL times out. A
1860        value of 0 or None means there will be no timeout.
1861      result_queue: Multiprocessing.Queue object for posting execution results.
1862    """
1863    for count in range(0, submission_count):
1864      version_incremental = 'aftl_load_testing_{}_{}'.format(process_number,
1865                                                             count)
1866      output_file = os.path.join(tempfile.gettempdir(),
1867                                 '{}_icp.img'.format(version_incremental))
1868      output = open(output_file, 'wb')
1869
1870      # Instrumented section.
1871      start_time = time.time()
1872      result = self.make_icp_from_vbmeta(
1873          vbmeta_image_path=vbmeta_image_path,
1874          output=output,
1875          signing_helper=None,
1876          signing_helper_with_files=None,
1877          version_incremental=version_incremental,
1878          transparency_log_configs=[transparency_log_config],
1879          manufacturer_key=manufacturer_key,
1880          padding_size=0,
1881          timeout=timeout)
1882      end_time = time.time()
1883
1884      output.close()
1885      if not preserve_icp_images:
1886        os.unlink(output_file)
1887
1888      # Puts the result onto the result queue.
1889      execution_time = end_time - start_time
1890      result_queue.put((start_time, end_time, execution_time,
1891                        version_incremental, result))
1892
1893  def load_test_aftl(self, vbmeta_image_path, output, transparency_log_config,
1894                     manufacturer_key,
1895                     process_count, submission_count, stats_filename,
1896                     preserve_icp_images, timeout):
1897    """Performs multi-threaded load test on a given AFTL and prints stats.
1898
1899    Arguments:
1900      vbmeta_image_path: Path to a vbmeta image file.
1901      output: File to write the report to.
1902      transparency_log_config: A TransparencyLogConfig used to request an
1903        inclusion proof.
1904      manufacturer_key: Path to PEM file containting the key file used to sign
1905        messages sent to the transparency log servers.
1906      process_count: Number of processes used for parallel testing.
1907      submission_count: Number of total submissions to perform per
1908        process_count.
1909      stats_filename: Path to the stats file to write the raw execution data to.
1910        If None, it will be written to the temp directory.
1911      preserve_icp_images: Boolean to indicate if the generated vbmeta
1912        image files with inclusion proofs should preserved.
1913      timeout: Duration in seconds before requests to the AFTL times out. A
1914        value of 0 or None means there will be no timeout.
1915
1916    Returns:
1917      True if the load tested succeeded without errors; otherwise False.
1918    """
1919    if process_count < 1 or submission_count < 1:
1920      sys.stderr.write('Values for --processes/--submissions '
1921                       'must be at least 1.\n')
1922      return False
1923
1924    if not stats_filename:
1925      stats_filename = os.path.join(
1926          tempfile.gettempdir(),
1927          'load_test_p{}_s{}.csv'.format(process_count, submission_count))
1928
1929    stats_file = None
1930    try:
1931      stats_file = open(stats_filename, 'wt')
1932      stats_file.write('start_time,end_time,execution_time,version_incremental,'
1933                       'result\n')
1934    except IOError as e:
1935      sys.stderr.write('Could not open stats file {}: {}.\n'
1936                       .format(stats_file, e))
1937      return False
1938
1939    # Launch all the processes with their workloads.
1940    result_queue = multiprocessing.Queue()
1941    processes = set()
1942    execution_times = []
1943    results = []
1944    for i in range(0, process_count):
1945      p = multiprocessing.Process(
1946          target=self._load_test_process_function,
1947          args=(vbmeta_image_path, transparency_log_config,
1948                manufacturer_key, i, submission_count,
1949                preserve_icp_images, timeout, result_queue))
1950      p.start()
1951      processes.add(p)
1952
1953    while processes:
1954      # Processes the results queue and writes these to a stats file.
1955      try:
1956        (start_time, end_time, execution_time, version_incremental,
1957         result) = result_queue.get(timeout=1)
1958        stats_file.write('{},{},{},{},{}\n'.format(start_time, end_time,
1959                                                   execution_time,
1960                                                   version_incremental, result))
1961        execution_times.append(execution_time)
1962        results.append(result)
1963      except queue.Empty:
1964        pass
1965
1966      # Checks if processes are still alive; if not clean them up. join() would
1967      # have been nicer but we want to continously stream out the stats to file.
1968      for p in processes.copy():
1969        if not p.is_alive():
1970          processes.remove(p)
1971
1972    # Prepares stats.
1973    executions = sorted(execution_times)
1974    execution_count = len(execution_times)
1975    median = 0
1976
1977    # pylint: disable=old-division
1978    if execution_count % 2 == 0:
1979      median = (executions[execution_count // 2 - 1]
1980                + executions[execution_count // 2]) / 2
1981    else:
1982      median = executions[execution_count // 2]
1983
1984    # Outputs the stats report.
1985    o = output
1986    o.write('Load testing results:\n')
1987    o.write('  Processes:               {}\n'.format(process_count))
1988    o.write('  Submissions per process: {}\n'.format(submission_count))
1989    o.write('\n')
1990    o.write('  Submissions:\n')
1991    o.write('    Total:                 {}\n'.format(len(executions)))
1992    o.write('    Succeeded:             {}\n'.format(results.count(True)))
1993    o.write('    Failed:                {}\n'.format(results.count(False)))
1994    o.write('\n')
1995    o.write('  Submission execution durations:\n')
1996    o.write('    Average:               {:.2f} sec\n'.format(
1997        sum(execution_times) / execution_count))
1998    o.write('    Median:                {:.2f} sec\n'.format(median))
1999    o.write('    Min:                   {:.2f} sec\n'.format(min(executions)))
2000    o.write('    Max:                   {:.2f} sec\n'.format(max(executions)))
2001
2002    # Close the stats file.
2003    stats_file.close()
2004    if results.count(False):
2005      return False
2006    return True
2007
2008
2009class TransparencyLogConfig(object):
2010  """Class that gathers the fields representing a transparency log.
2011
2012  Attributes:
2013    target: The hostname and port of the server in hostname:port format.
2014    pub_key: A PEM file that contains the public key of the transparency
2015      log server.
2016    api_key: The API key to use to interact with the transparency log
2017      server.
2018  """
2019
2020  @staticmethod
2021  def from_argument(arg):
2022    """Build an object from a command line argument string.
2023
2024    Arguments:
2025      arg: The transparency log as passed in the command line argument.
2026        It must be in the format: host:port,key_file[,api_key].
2027
2028    Returns:
2029      The TransparencyLogConfig instance.
2030
2031    Raises:
2032      argparse.ArgumentTypeError: If the format of arg is invalid.
2033    """
2034    api_key = None
2035    try:
2036      target, pub_key, *rest = arg.split(",", maxsplit=2)
2037    except ValueError:
2038      raise argparse.ArgumentTypeError("incorrect format for transparency log "
2039                                       "server, expected "
2040                                       "host:port,publickey_file.")
2041    if not target:
2042      raise argparse.ArgumentTypeError("incorrect format for transparency log "
2043                                       "server: host:port cannot be empty.")
2044    if not pub_key:
2045      raise argparse.ArgumentTypeError("incorrect format for transparency log "
2046                                       "server: publickey_file cannot be "
2047                                       "empty.")
2048    if rest:
2049      api_key = rest[0]
2050    return TransparencyLogConfig(target, pub_key, api_key)
2051
2052  def __init__(self, target, pub_key, api_key=None):
2053    """Initializes a new TransparencyLogConfig object."""
2054    self.target = target
2055    self.pub_key = pub_key
2056    self.api_key = api_key
2057
2058
2059class AftlTool(avbtool.AvbTool):
2060  """Object for aftltool command-line tool."""
2061
2062  def __init__(self):
2063    """Initializer method."""
2064    self.aftl = Aftl()
2065    super(AftlTool, self).__init__()
2066
2067  def make_icp_from_vbmeta(self, args):
2068    """Implements the 'make_icp_from_vbmeta' sub-command."""
2069    args = self._fixup_common_args(args)
2070    return self.aftl.make_icp_from_vbmeta(args.vbmeta_image_path,
2071                                          args.output,
2072                                          args.signing_helper,
2073                                          args.signing_helper_with_files,
2074                                          args.version_incremental,
2075                                          args.transparency_log_servers,
2076                                          args.manufacturer_key,
2077                                          args.padding_size,
2078                                          args.timeout)
2079
2080  def info_image_icp(self, args):
2081    """Implements the 'info_image_icp' sub-command."""
2082    return self.aftl.info_image_icp(args.vbmeta_image_path.name, args.output)
2083
2084  def verify_image_icp(self, args):
2085    """Implements the 'verify_image_icp' sub-command."""
2086    return self.aftl.verify_image_icp(args.vbmeta_image_path.name,
2087                                      args.transparency_log_pub_keys,
2088                                      args.output)
2089
2090  def load_test_aftl(self, args):
2091    """Implements the 'load_test_aftl' sub-command."""
2092    return self.aftl.load_test_aftl(args.vbmeta_image_path,
2093                                    args.output,
2094                                    args.transparency_log_server,
2095                                    args.manufacturer_key,
2096                                    args.processes,
2097                                    args.submissions,
2098                                    args.stats_file,
2099                                    args.preserve_icp_images,
2100                                    args.timeout)
2101
2102  def run(self, argv):
2103    """Command-line processor.
2104
2105    Arguments:
2106      argv: Pass sys.argv from main.
2107    """
2108    parser = argparse.ArgumentParser()
2109    subparsers = parser.add_subparsers(title='subcommands')
2110
2111    # Command: make_icp_from_vbmeta
2112    sub_parser = subparsers.add_parser('make_icp_from_vbmeta',
2113                                       help='Makes an ICP enhanced vbmeta image'
2114                                       ' from an existing vbmeta image.')
2115    sub_parser.add_argument('--output',
2116                            help='Output file name.',
2117                            type=argparse.FileType('wb'),
2118                            default=sys.stdout)
2119    sub_parser.add_argument('--vbmeta_image_path',
2120                            help='Path to a generate vbmeta image file.',
2121                            required=True)
2122    sub_parser.add_argument('--version_incremental',
2123                            help='Current build ID.',
2124                            required=True)
2125    sub_parser.add_argument('--manufacturer_key',
2126                            help='Path to the PEM file containing the '
2127                            'manufacturer key for use with the log.',
2128                            required=True)
2129    sub_parser.add_argument('--transparency_log_servers',
2130                            help='List of transparency log servers in '
2131                            'host:port,publickey_file[,api_key] format. The '
2132                            'publickey_file must be in the PEM format.',
2133                            nargs='+', type=TransparencyLogConfig.from_argument)
2134    sub_parser.add_argument('--padding_size',
2135                            metavar='NUMBER',
2136                            help='If non-zero, pads output with NUL bytes so '
2137                            'its size is a multiple of NUMBER (default: 0)',
2138                            type=avbtool.parse_number,
2139                            default=0)
2140    sub_parser.add_argument('--timeout',
2141                            metavar='SECONDS',
2142                            help='Timeout in seconds for transparency log '
2143                            'requests (default: 600 sec). A value of 0 means '
2144                            'no timeout.',
2145                            type=avbtool.parse_number,
2146                            default=600)
2147    self._add_common_args(sub_parser)
2148    sub_parser.set_defaults(func=self.make_icp_from_vbmeta)
2149
2150    # Command: info_image_icp
2151    sub_parser = subparsers.add_parser(
2152        'info_image_icp',
2153        help='Show information about AFTL ICPs in vbmeta or footer.')
2154    sub_parser.add_argument('--vbmeta_image_path',
2155                            help='Path to vbmeta image for AFTL information.',
2156                            type=argparse.FileType('rb'),
2157                            required=True)
2158    sub_parser.add_argument('--output',
2159                            help='Write info to file',
2160                            type=argparse.FileType('wt'),
2161                            default=sys.stdout)
2162    sub_parser.set_defaults(func=self.info_image_icp)
2163
2164    # Arguments for verify_image_icp.
2165    sub_parser = subparsers.add_parser(
2166        'verify_image_icp',
2167        help='Verify AFTL ICPs in vbmeta or footer.')
2168
2169    sub_parser.add_argument('--vbmeta_image_path',
2170                            help='Image to verify the inclusion proofs.',
2171                            type=argparse.FileType('rb'),
2172                            required=True)
2173    sub_parser.add_argument('--transparency_log_pub_keys',
2174                            help='Paths to PEM files containing transparency '
2175                            'log server key(s). This must not be None.',
2176                            nargs='*',
2177                            required=True)
2178    sub_parser.add_argument('--output',
2179                            help='Write info to file',
2180                            type=argparse.FileType('wt'),
2181                            default=sys.stdout)
2182    sub_parser.set_defaults(func=self.verify_image_icp)
2183
2184    # Command: load_test_aftl
2185    sub_parser = subparsers.add_parser(
2186        'load_test_aftl',
2187        help='Perform load testing against one AFTL log server. Note: This MUST'
2188        ' not be performed against a production system.')
2189    sub_parser.add_argument('--vbmeta_image_path',
2190                            help='Path to a generate vbmeta image file.',
2191                            required=True)
2192    sub_parser.add_argument('--output',
2193                            help='Write report to file.',
2194                            type=argparse.FileType('wt'),
2195                            default=sys.stdout)
2196    sub_parser.add_argument('--manufacturer_key',
2197                            help='Path to the PEM file containing the '
2198                            'manufacturer key for use with the log.',
2199                            required=True)
2200    sub_parser.add_argument('--transparency_log_server',
2201                            help='Transparency log server to test against in '
2202                            'host:port,publickey_file[,api_key] format. The '
2203                            'publickey_file must be in the PEM format.',
2204                            required=True,
2205                            type=TransparencyLogConfig.from_argument)
2206    sub_parser.add_argument('--processes',
2207                            help='Number of parallel processes to use for '
2208                            'testing (default: 1).',
2209                            type=avbtool.parse_number,
2210                            default=1)
2211    sub_parser.add_argument('--submissions',
2212                            help='Number of submissions to perform against the '
2213                            'log per process (default: 1).',
2214                            type=avbtool.parse_number,
2215                            default=1)
2216    sub_parser.add_argument('--stats_file',
2217                            help='Path to the stats file to write the raw '
2218                            'execution data to (Default: '
2219                            'load_test_p[processes]_s[submissions].csv.')
2220    sub_parser.add_argument('--preserve_icp_images',
2221                            help='Boolean flag to indicate if the generated '
2222                            'vbmeta image files with inclusion proofs should '
2223                            'preserved.',
2224                            action='store_true')
2225    sub_parser.add_argument('--timeout',
2226                            metavar='SECONDS',
2227                            help='Timeout in seconds for transparency log '
2228                            'requests (default: 0). A value of 0 means '
2229                            'no timeout.',
2230                            type=avbtool.parse_number,
2231                            default=0)
2232    sub_parser.set_defaults(func=self.load_test_aftl)
2233
2234    args = parser.parse_args(argv[1:])
2235    if not 'func' in args:
2236      # This error gets raised when the command line tool is called without any
2237      # arguments. It mimics the original Python 2 behavior.
2238      parser.print_usage()
2239      print('aftltool: error: too few arguments')
2240      sys.exit(2)
2241    try:
2242      success = args.func(args)
2243    except AftlError as e:
2244      # Signals to calling tools that an unhandled exception occured.
2245      sys.stderr.write('Unhandled AftlError occured: {}\n'.format(e))
2246      sys.exit(2)
2247
2248    if not success:
2249      # Signals to calling tools that the command has failed.
2250      sys.exit(1)
2251
2252if __name__ == '__main__':
2253  tool = AftlTool()
2254  tool.run(sys.argv)
2255