• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: UTF-8 -*-
3'''
4 * Copyright (c) 2022-2023 Huawei Device Co., Ltd.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16'''
17
18import binascii
19import hashlib
20import os
21import re
22import stat
23import struct
24import subprocess
25import sys
26import bisect
27import shutil
28import random
29
30
31_params = {
32    'partition':                 None,             \
33    'partition_size':            0,                \
34    'image':                     None,             \
35    'verity_type':               'hash',           \
36    'algorithm':                 'SHA256_RSA3072', \
37    'userid':                    None,             \
38    'rollback_location':         None,             \
39    'rollback_index':            None,             \
40    'salt':                      None,             \
41    'pubkey':                    None,             \
42    'privkey':                   None,             \
43    'hash_algo':                 'SHA256',         \
44    'block_size':                4096,             \
45    'fec_num_roots':             0,                \
46    'pubkey_num_per_ptn':        1,                \
47    'padding_size':              None,             \
48    'chain_partition':           [],               \
49    'signing_helper_with_files': None,             \
50    'calc_max_image_size':      'false',           \
51    'output':                    None
52}
53
54
55VERITY_TYPE = {
56    'make_hash_footer':      'hash',   \
57    'make_hashtree_footer':  'hashtree'
58}
59
60
61class Algorithm(object):
62    def __init__(self, sig_algo, hash_algo, bit_length, sig_bytes, hash_bytes, pubkey_bytes):
63        self.sig_algo = sig_algo
64        self.hash_algo = hash_algo
65        self.bit_length = bit_length
66        self.sig_bytes = sig_bytes
67        self.hash_bytes = hash_bytes
68        self.pubkey_bytes = pubkey_bytes
69
70
71ALGORITHMS = {'SHA256_RSA3072': Algorithm(
72        sig_algo=0,
73        hash_algo='sha256',
74        bit_length=3072,
75        sig_bytes=384,
76        hash_bytes=32,
77        pubkey_bytes=8 + 2 * 3072 // 8
78    ), \
79    'SHA256_RSA4096': Algorithm(
80        sig_algo=1,
81        hash_algo='sha256',
82        bit_length=4096,
83        sig_bytes=512,
84        hash_bytes=32,
85        pubkey_bytes=8 + 2 * 4096 // 8
86    ), \
87    'SHA256_RSA2048': Algorithm(
88        sig_algo=2,
89        hash_algo='sha256',
90        bit_length=2048,
91        sig_bytes=256,
92        hash_bytes=32,
93        pubkey_bytes=8 + 2 * 2048 // 8
94    ), \
95    'SM2': Algorithm(
96        sig_algo=3,
97        hash_algo='SM3',
98        bit_length=256,    # bit
99        sig_bytes=64,
100        hash_bytes=32,
101        pubkey_bytes=64
102    ) \
103}
104
105HASH_ALGORITHM = {
106    'sha256' : 0,
107    'sha128' : 1,
108    'sha512' : 2,
109    'SM3'    : 3
110}
111
112
113def round_to_multiple(num, size):
114    remainder = num % size
115    if remainder == 0:
116        return num
117    return num + size - remainder
118
119
120def create_random_data(length=64):
121    data = ''
122    chars = '0123456789abcdef'
123    len_chars = len(chars) - 1
124    for i in range(length):
125        data += chars[random.randint(0, len_chars)]
126    return data
127
128
129def encode_long(num_bits, value):
130    ret = bytearray()
131    for bit_pos in range(num_bits, 0, -8):
132        octet = (value >> (bit_pos - 8)) & 0xff
133        ret.extend(struct.pack('!B', octet))
134    return ret
135
136
137def decode_long(blob):
138    ret = 0
139    for b in bytearray(blob):
140        ret *= 256
141        ret += b
142    return ret
143
144
145def calc_hashtree_size(size, block_size, digest_size):
146    level_offsets = []
147    level_sizes = []
148
149    treesize = 0
150    levels = 0
151
152    while size > block_size:
153        blocknum = size // block_size
154        level_size = round_to_multiple(blocknum * digest_size, block_size)
155        level_sizes.append(level_size)
156        treesize += level_size
157        levels += 1
158        size = level_size
159    for i in range(levels):
160        offset = 0
161        for j in range(i + 1, levels):
162            offset += level_sizes[j]
163        level_offsets.append(offset)
164
165    return level_offsets, treesize
166
167
168def get_sm3_digest(handle):
169    cmds = ['openssl', 'dgst', '-SM3', handle]
170    p = subprocess.Popen(cmds, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
171    (pout, perr) = p.communicate(timeout=10)
172    retcode = p.wait()
173    if retcode != 0:
174        raise HvbError("Failed to calculate the hash using SM3.")
175    return binascii.a2b_hex(pout.split(b'=')[-1].strip())
176
177
178def get_sha256_digest(hash_algo, salt, content):
179    hasher = hashlib.new(hash_algo, salt)
180    hasher.update(content)
181    return hasher.digest()
182
183
184def generate_sha_hash_tree(image, image_size, block_size, hash_algo, salt, hash_level_offsets, tree_size):
185    hash_ret = bytearray(tree_size)
186    hash_src_offset = 0
187    hash_src_size = image_size
188    level_num = 0
189
190    while hash_src_size > block_size:
191        level_output_list = []
192        remaining = hash_src_size
193        while remaining > 0:
194            hasher = hashlib.new(hash_algo, salt)
195            if level_num == 0:
196                begin = hash_src_offset + hash_src_size - remaining
197                end = min(remaining, block_size)
198                image.seek(begin)
199                data = image.read(end)
200            else:
201                offset = hash_level_offsets[level_num - 1] + hash_src_size - remaining
202                data = hash_ret[offset : offset + block_size]
203            hasher.update(data)
204
205            remaining -= len(data)
206            if len(data) < block_size:
207                hasher.update(b'\0' * (block_size - len(data)))
208            level_output_list.append(hasher.digest())
209
210        level_output = b''.join(level_output_list)
211        level_output_padding = round_to_multiple(len(level_output), block_size) - len(level_output)
212        level_output += b'\0' * level_output_padding
213
214        offset = hash_level_offsets[level_num]
215        hash_ret[offset : offset + len(level_output)] = level_output
216
217        hash_src_size = len(level_output)
218        level_num += 1
219
220    hasher = hashlib.new(hash_algo, salt)
221    hasher.update(level_output)
222
223    return hasher.digest(), bytes(hash_ret)
224
225
226def generate_sm3_hash_tree(image, image_size, block_size, hash_level_offsets, tree_size):
227    hash_ret = bytearray(tree_size)
228    hash_src_offset = 0
229    hash_src_size = image_size
230    level_num = 0
231    tmp = 'hash_{}.bin'.format(os.getpid())
232
233    while hash_src_size > block_size:
234        level_output_list = []
235        remaining = hash_src_size
236        while remaining > 0:
237            hasher = b''
238            if level_num == 0:
239                begin = hash_src_offset + hash_src_size - remaining
240                end = min(remaining, block_size)
241                image.seek(begin)
242                data = image.read(end)
243            else:
244                offset = hash_level_offsets[level_num - 1] + hash_src_size - remaining
245                data = hash_ret[offset : offset + block_size]
246            hasher += data
247
248            remaining -= len(data)
249            if len(data) < block_size:
250                hasher += b'\0' * (block_size - len(data))
251
252            if os.path.exists(tmp):
253                os.remove(tmp)
254
255            fd = open(tmp, 'wb')
256            fd.write(hasher)
257            fd.close()
258
259            dgst = get_sm3_digest(tmp)
260            level_output_list.append(dgst)
261
262        level_output = b''.join(level_output_list)
263        level_output_padding = round_to_multiple(len(level_output), block_size) - len(level_output)
264        level_output += b'\0' * level_output_padding
265
266        offset = hash_level_offsets[level_num]
267        hash_ret[offset : offset + len(level_output)] = level_output
268
269        hash_src_size = len(level_output)
270        level_num += 1
271
272    if os.path.exists(tmp):
273        os.remove(tmp)
274
275    fd = open(tmp, 'wb')
276    fd.write(level_output)
277    fd.close()
278
279    root_digest = get_sm3_digest(tmp)
280
281    return root_digest, bytes(hash_ret)
282
283
284class HvbFooter(object):
285    FOOTERMAGIC = b'HVB' + b'\0' * 5
286    FOOTER_FORMAT = ('8s'  # Magic
287                     '4Q'  # Cert offset, Cert size, Image_size, Partition_size
288                     '64s'  # Reserved
289                     )
290
291    def __init__(self, footer=None):
292        self.foot = footer
293        if self.foot:
294            (self.magic, self.certoffset, self.certsize, self.imagesize,
295             self.partition_size, _) = struct.unpack(self.FOOTER_FORMAT, footer)
296            if self.magic != self.FOOTERMAGIC:
297                raise HvbError('Given footer does not look like a HVB footer.')
298        else:
299            raise HvbError('Given footer is None.')
300
301    def info_footer(self):
302        msg = "[HVB footer]: \n"
303        if self.foot:
304            msg += "\tMaigc:                   {}\n".format((self.magic).decode())
305            msg += "\tCert offset:             {} bytes\n".format(hex(self.certoffset))
306            msg += "\tCert size:               {} bytes\n".format(self.certsize)
307            msg += "\tImage size:              {} bytes\n".format(self.imagesize)
308            msg += "\tPartition size:          {} bytes\n\n".format(self.partition_size)
309        else:
310            msg += "There is no footer. \n\n"
311        print(msg)
312
313
314class HvbCert(object):
315    CERTMAGIC = b'HVB\0'
316    ALGORITHM_TYPES = {0 : 'SHA256_RSA3072',
317                       1 : 'SHA256_RSA4096',
318                       2 : 'SHA256_RSA2048',
319                       3 : 'SM2'
320                    }
321
322    HASH_ALGORITHMS = {0 : 'SHA256',
323                       1 : 'SHA128',
324                       2 : 'SHA512',
325                       3 : 'SM3'
326                       }
327
328    def __init__(self, cert=None):
329        self.cert = cert
330
331        flags = os.O_WRONLY | os.O_CREAT
332        modes = stat.S_IWUSR | stat.S_IRUSR
333        with os.fdopen(os.open('cert.bin', flags, modes), 'wb') as cert_fd:
334            cert_fd.write(self.cert)
335
336        if self.cert:
337            self.mgc, self.major, self.minor = struct.unpack('4s2I', self.cert[0:12])
338            self.version = '{}.{}'.format(self.major, self.minor)
339            if self.mgc != self.CERTMAGIC:
340                raise HvbError('Given cert does not look like a HVB cert.')
341            self.img_org_len, self.img_len, self.partition = struct.unpack('2Q64s', self.cert[48:128])
342            self.rollback_location, self.rollback_index = struct.unpack('2Q', self.cert[128:144])
343
344            verity, self.hash_algo = struct.unpack('2I', self.cert[144:152])
345            self.verity_type = 'hash' if verity == 1 else 'hashtree'
346            self.salt_offset, self.salt_size = struct.unpack('2Q', self.cert[152:168])
347
348            self.digest_offset, self.digest_size, self.hashtree_offset, self.hashtree_size, \
349                        self.data_block_size, self.hash_block_size, self.fec_num_roots, \
350                        self.fec_offset, self.fec_size = struct.unpack('9Q', self.cert[168:240])
351            self.salt = struct.unpack('{}s'.format(self.salt_size), self.cert[240:240 + self.salt_size])
352            self.digest = struct.unpack('{}s'.format(self.digest_size), \
353                                        self.cert[240 + self.salt_size : 240 + self.salt_size + self.digest_size])
354            hash_payload_size = self.salt_size + self.digest_size
355            signature_info_offset = 240 + hash_payload_size
356            self.algo, self.flags, self.key_offset, self.key_len = struct.unpack('2I2Q', \
357                                    self.cert[signature_info_offset + 8 : signature_info_offset + 8 + 24])
358            self.sig_content_offset, self.sig_content_bytes = struct.unpack('2Q', \
359                                    self.cert[signature_info_offset + 32 : signature_info_offset + 32 + 16])
360            self.userid_offset, self.userid_len = struct.unpack('QI', \
361                                    self.cert[signature_info_offset + 48 : signature_info_offset + 48 + 12])
362            print("userid_offset: {}, userid_len: {}".format(self.userid_offset, self.userid_len))
363            self.key = self.cert[signature_info_offset + 112 : signature_info_offset + 112 + self.key_len]
364            self.user_id = self.cert[signature_info_offset + 112 + self.key_len + self.sig_content_bytes : \
365                            signature_info_offset + 112 + self.key_len + self.sig_content_bytes + self.userid_len]
366
367        else:
368            raise HvbError('Given cert is None.')
369
370    def info_cert(self):
371        msg = "[HVB cert]: \n"
372        if self.cert:
373            msg += "\tHVB tool version:           hvb tool {}\n".format(self.version)
374            msg += "\tOriginal Image length:      {} bytes\n".format(self.img_org_len)
375            msg += "\tImage length:               {} bytes (4K alignment)\n".format(self.img_len)
376            msg += "\tPartition name:             {}\n".format(self.partition.decode())
377            msg += "\tverity type(hash/hashtree): {}\n".format(self.verity_type)
378            msg += "\tsalt size:                  {} bytes\n".format(self.salt_size)
379            if self.hash_algo not in self.HASH_ALGORITHMS:
380                raise HvbError("Unknown hash algorithm: {}".format(self.hash_algo))
381            msg += "\tHash algorithm:             {}\n".format(self.HASH_ALGORITHMS[self.hash_algo])
382            msg += "\tdigest size:                {} bytes\n".format(self.digest_size)
383            msg += "\thashtree size:              {}\n".format(self.hashtree_size)
384            msg += "\tfec size:                   {}\n".format(self.fec_size)
385            msg += "\thashpayload:\n"
386            msg += "\t\tsalt:               {}\n".format((binascii.b2a_hex(self.salt[0]).decode()))
387            msg += "\t\tdigest:             {}\n".format((binascii.b2a_hex(self.digest[0]).decode()))
388            if self.hashtree_size != 0:
389                msg += "\t\thashtree offset: 0x{:x}\n".format(self.hashtree_offset)
390            if self.fec_size != 0:
391                msg += "\t\tfec offset:      0x{:x}\n".format(self.fec_offset)
392            if self.algo not in self.ALGORITHM_TYPES:
393                raise HvbError("Unknown algorithm type: {}".format(self.algo))
394            msg += "\tAlgorithm:                  {}\n".format(self.ALGORITHM_TYPES[self.algo])
395            msg += "\tUser id:                    {}\n".format(self.user_id.decode())
396            if self.ALGORITHM_TYPES[self.algo] == 'SM2':
397                msg += "\tPublic key:                 {}\n\n".format(binascii.b2a_hex(self.key).decode())
398            else:
399                msg += "\tPublic key (sha256):        {}\n\n".format(hashlib.sha256(self.key).hexdigest())
400        else:
401            msg += 'There is no certificate.\n\n'
402        print(msg)
403
404    def verify_signature(self, pubkey):
405        if self.algo not in self.ALGORITHM_TYPES:
406            raise HvbError("Unknown algorithm: {}".format(self.algo))
407        algorithm_type = self.ALGORITHM_TYPES.get(self.algo)
408        if algorithm_type not in ALGORITHMS:
409            raise HvbError("Unknown algorithm type: {}".format(algorithm_type))
410        algorithm = ALGORITHMS.get(algorithm_type)
411        to_be_sig_content = self.cert[0 : self.sig_content_offset]
412        sig_content = self.cert[self.sig_content_offset : self.sig_content_offset + self.sig_content_bytes]
413
414        to_be_sig_content_bin = os.sep.join([os.getcwd(), 'tmp.{0}.bin'.format(os.getpid())])
415        sig_content_bin = os.sep.join([os.getcwd(), 'sigcontent.{0}.bin'.format(os.getpid())])
416        flags = os.O_RDONLY | os.O_WRONLY | os.O_CREAT
417        modes = stat.S_IWUSR | stat.S_IRUSR
418        with os.fdopen(os.open(to_be_sig_content_bin, flags, modes), 'wb') as tmp_fd:
419            tmp_fd.write(to_be_sig_content)
420        with os.fdopen(os.open(sig_content_bin, flags, modes), 'wb') as tmp_fd:
421            tmp_fd.write(sig_content)
422        cmds = ['openssl', 'dgst', '-verify', pubkey, '-sigopt', 'rsa_padding_mode:pss',
423               '-sigopt', 'rsa_pss_saltlen:32', '-sha256', '-signature', sig_content_bin, to_be_sig_content_bin]
424        p = subprocess.Popen(cmds, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
425        (pout, perr) = p.communicate(timeout=10)
426        retcode = p.wait()
427        if os.path.exists(sig_content_bin):
428            os.remove(sig_content_bin)
429        if os.path.exists(to_be_sig_content_bin):
430            os.remove(to_be_sig_content_bin)
431        print(pout.decode().strip())
432        if retcode != 0:
433            raise HvbError('Error verifying data: {}'.format(perr))
434        if pout.decode().strip() != 'Verified OK':
435            sys.stderr.write('Signature not correct\n')
436            return False
437        return True
438
439    def verify_digest(self, image):
440        image_size = self.img_org_len
441        hash_algo = self.HASH_ALGORITHMS.get(self.hash_algo)
442        salt = self.salt[0]
443        if self.verity_type == 'hash':
444            image.seek(0)
445            image_content = image.read(image_size)
446            hasher = hashlib.new(hash_algo, salt)
447            hasher.update(image_content)
448            digest = hasher.digest()
449        elif self.verity_type == 'hashtree':
450            hashtree_hasher = hashlib.new(hash_algo, salt)
451            digest_size = len(hashtree_hasher.digest())
452            digest_padding = 2 ** ((digest_size - 1).bit_length()) - digest_size
453            remainder = image.block_size - image.img_size % image.block_size
454            if remainder != image.block_size:
455                image.append_raw(b'\0' * remainder)
456            level_offsets, tree_size = calc_hashtree_size(image_size, image.block_size, digest_size)
457            if hash_algo == 'SM3':
458                digest, hashtree = generate_sm3_hash_tree(image, image_size, image.block_size,
459                                                  hash_algo, level_offsets, tree_size)
460            else:
461                digest, hashtree = generate_sha_hash_tree(image, image_size, image.block_size,
462                                                  hash_algo, salt, level_offsets, tree_size)
463        else:
464            print("[hvbtool][ERROR]: Invalid verity_type: %d", self.vertype)
465            return False
466        if self.digest[0] and digest != self.digest[0]:
467            return False
468        return True
469
470
471class SMPublicKey(object):
472    def __init__(self, pubkey):
473        self.privkey = pubkey
474        self.pubkey = b''
475
476        cmds = ['openssl', 'ec', '-pubin', '-in', pubkey, '-text']
477        process = subprocess.Popen(cmds, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
478        try:
479            (out, err) = process.communicate(timeout=10)
480        except subprocess.TimeoutExpired:
481            process.kill()
482            raise HvbError("Get public key timeout!")
483        if process.wait() != 0:
484            raise HvbError("Failed to get public key: {}".format(err))
485
486        self.key_data = out.split(b'\n')
487        self.pubkey_idx = self.key_data.index(b'pub:') + 1
488
489    def get_public_key(self):
490        for i in range(self.pubkey_idx, self.pubkey_idx + 5):    # The size of public key is 64 bytes, occupies 5 lines.
491            self.pubkey += b''.join(self.key_data[i].strip().split(b':'))
492
493        return binascii.a2b_hex(self.pubkey[2:])    # The prefix occupies two bytes.
494
495
496class RSAPublicKey(object):
497    MODULUS_PREFIX = b'modulus='
498    BIT_LENGTH_KEYWORD = b'Public-Key:'
499
500    def __init__(self, pubkey):
501        self.pubkey = pubkey
502        self.modulus_bits = self.get_bit_length(self.pubkey)
503        cmds = ['openssl', 'rsa', '-in', pubkey, '-modulus', '-noout', '-pubin']
504        process = subprocess.Popen(cmds, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
505        try:
506            (out, err) = process.communicate(timeout=10)
507        except subprocess.TimeoutExpired:
508            process.kill()
509            raise HvbError("Get public key timeout!")
510        if process.wait() != 0:
511            raise HvbError("Failed to get public key: {}".format(err))
512
513        if not out.lower().startswith(self.MODULUS_PREFIX):
514            raise HvbError('Invalid modulus')
515
516        self.modulus = out[len(self.MODULUS_PREFIX):].strip()
517        self.modulusdata = int(self.modulus, 16)
518        self.exponent = 65537
519
520    def get_bit_length(self, pubkey):
521        bitlen = 0
522        cmd = ['openssl', 'rsa',  '-inform', 'PEM',  '-in', pubkey,  '-pubin', '-text']
523        child = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
524        lines = child.stdout.read().split(b'\n')
525        for line in lines:
526            if self.BIT_LENGTH_KEYWORD in line:
527                bitlen = int(re.findall(b'\d+', line.split(self.BIT_LENGTH_KEYWORD)[-1])[0])
528                break
529        return bitlen
530
531    def calc_egcd(self, num1, num2):
532        if num1 == 0:
533            return (num2, 0, 1)
534        egcd_g, egcd_y, egcd_x = self.calc_egcd(num2 % num1, num1)
535        return (egcd_g, egcd_x - (num2 // num1) * egcd_y, egcd_y)
536
537    def calc_modinv(self, num1, modulo):
538        modinv_gcd, modinv_x, _ = self.calc_egcd(num1, modulo)
539        if modinv_gcd != 1:
540            raise HvbError("modular inverse does not exist.")
541        return modinv_x % modulo
542
543    def get_public_key(self):
544        pkey_n0 = 2 ** 64 - self.calc_modinv(self.modulusdata, 2 ** 64)
545        pkey_r = 2 ** self.modulusdata.bit_length()
546        pkey_prr = bytes(encode_long(self.modulus_bits, pkey_r * pkey_r % self.modulusdata))
547        modulus = bytes(encode_long(self.modulus_bits, self.modulusdata))
548
549        return struct.pack('!QQ', self.modulus_bits, pkey_n0) + modulus + pkey_prr
550
551
552class HvbError(Exception):
553    def __init__(self, message):
554        print("[HvbError]: " + message)
555        Exception.__init__(self, message)
556
557
558class ImageChunk(object):
559    CHUNK_HEADER_FORMAT = '<2H2I'
560    CHUNK_HEADER_SIZE = struct.calcsize(CHUNK_HEADER_FORMAT)
561    CHUNK_TYPE_RAW = 0xcac1
562    CHUNK_TYPE_FILL = 0xcac2
563    CHUNK_TYPE_DONT_CARE = 0xcac3
564    CHUNK_TYPE_CRC32 = 0xcac4
565
566    def __init__(self, chunk_type, chunk_offset, nsparsed_output_offset,
567                 nsparsed_output_size, input_offset, fill_data):
568        """Initializes an ImageChunk object.
569
570            Arguments:
571              chunk_type: One of TYPE_RAW, TYPE_FILL, or TYPE_DONT_CARE.
572              chunk_offset: Offset in the sparse file where this chunk begins.
573              output_offset: Offset in de-sparsified file.
574              output_size: Number of bytes in output.
575              input_offset: Offset in sparse file where the chunk data begins if TYPE_RAW otherwise None.
576              fill_data: Blob as bytes with data to fill if TYPE_FILL otherwise None.
577            """
578
579        self.chunk_type = chunk_type
580        self.chunk_offset = chunk_offset
581        self.nsparsed_chunk_offset = nsparsed_output_offset
582        self.nsparsed_output_size = nsparsed_output_size
583        self.sparsed_input_offset = input_offset
584        self.fill_data = fill_data
585        # Check invariants.
586        if self.chunk_type == self.CHUNK_TYPE_RAW:
587            if self.fill_data is not None:
588                raise HvbError('RAW chunk cannot have fill_data set.')
589            if not self.sparsed_input_offset:
590                raise HvbError('RAW chunk must have input_offset set.')
591        elif self.chunk_type == self.CHUNK_TYPE_FILL:
592            if self.fill_data is None:
593                raise HvbError('FILL chunk must have fill_data set.')
594            if self.sparsed_input_offset:
595                raise HvbError('FILL chunk cannot have input_offset set.')
596        elif self.chunk_type == self.CHUNK_TYPE_DONT_CARE:
597            if self.fill_data is not None:
598                raise HvbError('DONT_CARE chunk cannot have fill_data set.')
599            if self.sparsed_input_offset:
600                raise HvbError('DONT_CARE chunk cannot have input_offset set.')
601        else:
602            raise HvbError('Invalid chunk type')
603
604
605class ImageHandle(object):
606    #Descriptions of sparse image
607    SIMAGE_MAGIC = 0xed26ff3a
608    SIMAGE_HEADER_FORMAT = '<I4H4I'
609    SIMAGE_HEADER_SIZE = struct.calcsize(SIMAGE_HEADER_FORMAT)
610
611    def __init__(self, file_name):
612        self.image_file = file_name
613        self.is_sparse = False
614        self.block_size = 4096    # A block size is 4096 bytes.
615        self.total_blks = 0
616        self.total_chunks = 0
617        self.header_analyze()
618
619    def header_analyze(self):
620        self.img_handler = open(self.image_file, 'r+b')
621        self.img_handler.seek(0, os.SEEK_END)
622        self.img_size = self.img_handler.tell()
623        print("Initial image length: ", self.img_size)
624
625        self.img_handler.seek(0, os.SEEK_SET)
626        header = self.img_handler.read(self.SIMAGE_HEADER_SIZE)
627
628        """ Sparse header
629            magic                  0xed26ff3a
630            major_version          (0x1) - reject images with higher major versions
631            minor_version          (0x0) - allow images with higer minor versions
632            file_hdr_sz            28 bytes for first revision of the file format
633            chunk_hdr_sz           12 bytes for first revision of the file format
634            blk_sz                 block size in bytes, must be a multiple of 4 (4096)
635            total_blks             total blocks in the non-sparse output image
636            total_chunks           total chunks in the sparse input image
637            image_checksum         CRC32 checksum of the original data, counting "don't care
638                                   as 0. Standard 802.3 polynomial, use a Public Domain
639                                   table implementation
640        """
641        (magic, major_version, minor_version, file_hdr_sz,
642            chunk_hdr_sz, block_size, self.total_blks, self.total_chunks,
643            img_checksum) = struct.unpack(self.SIMAGE_HEADER_FORMAT, header)
644        if magic != self.SIMAGE_MAGIC:
645            return
646
647        self.block_size = block_size
648        print("It's a sparse image.")
649        if self.SIMAGE_HEADER_SIZE != file_hdr_sz:
650            raise HvbError("Incorrect sparse image header size: {}".format(file_hdr_sz))
651        if ImageChunk.CHUNK_HEADER_SIZE != chunk_hdr_sz:
652            raise HvbError("Incorrect chunk header size: {}".format(chunk_hdr_sz))
653
654        self.chunks = list()
655        nsparsed_output_offset, nsparsed_chunk_nums = self.chunk_analyze()
656        self.sparse_end = self.img_handler.tell()
657
658        if self.total_blks != nsparsed_chunk_nums:
659            raise HvbError("The header said we should have {} output blocks, "
660                               'but we saw {}'.format(self.total_blks, nsparsed_chunk_nums))
661        if self.sparse_end != self.img_size:
662            raise HvbError("There were {} bytes of extra data at the end of the "
663                               "file.".format(self.img_size - self.sparse_end))
664
665        self.nsparsed_chunk_offset_list = [c.nsparsed_chunk_offset for c in self.chunks]
666        self.is_sparse = True
667        self.img_size = nsparsed_output_offset
668
669    def chunk_analyze(self):
670        nsparsed_output_offset = 0
671        nsparsed_chunk_nums = 0
672
673        for i in range(self.total_chunks):
674            chunk_offset = self.img_handler.tell()
675
676            """ chunk header
677                chunk_type             Type of this chunk (Raw, Fill, Dont care, CRC32)
678                chunk_sz               Size of the chunk before the sparse(The unit is blk_sz, that is, 4096)
679                total_sz               The size of the chunk after sparse, includes chunk_header and chunk data.
680            """
681            chunk_header = self.img_handler.read(ImageChunk.CHUNK_HEADER_SIZE)
682            (chunk_type, _, chunk_sz, total_sz) = struct.unpack(ImageChunk.CHUNK_HEADER_FORMAT, chunk_header)
683            chunk_data_size = total_sz - ImageChunk.CHUNK_HEADER_SIZE
684
685            if chunk_type == ImageChunk.CHUNK_TYPE_RAW:
686                if chunk_data_size != (chunk_sz * self.block_size):
687                    raise HvbError("Raw chunk size ({}) does not match output "
688                                    "size ({})".format(chunk_data_size, (chunk_sz * self.block_size)))
689                self.chunks.append(ImageChunk(ImageChunk.CHUNK_TYPE_RAW, chunk_offset,
690                                                  nsparsed_output_offset, chunk_sz * self.block_size,
691                                                  self.img_handler.tell(), None))
692                self.img_handler.seek(chunk_data_size, os.SEEK_CUR)
693            elif chunk_type == ImageChunk.CHUNK_TYPE_FILL:
694                if chunk_data_size != 4:
695                    raise HvbError("Fill chunk should have 4 bytes of fill, but this "
696                                       "has {}".format(chunk_data_size))
697                fill_data = self.img_handler.read(4)
698                self.chunks.append(ImageChunk(ImageChunk.CHUNK_TYPE_FILL, chunk_offset,
699                                                  nsparsed_output_offset, chunk_sz * self.block_size,
700                                                  None, fill_data))
701            elif chunk_type == ImageChunk.CHUNK_TYPE_DONT_CARE:
702                if chunk_data_size != 0:
703                    raise HvbError("Don\'t care chunk input size is non-zero ({})".
704                                       format(chunk_data_size))
705                self.chunks.append(ImageChunk(ImageChunk.CHUNK_TYPE_DONT_CARE, chunk_offset,
706                                                  nsparsed_output_offset, chunk_sz * self.block_size,
707                                                  None, None))
708            elif chunk_type == ImageChunk.CHUNK_TYPE_CRC32:
709                if chunk_data_size != 4:
710                    raise HvbError("CRC32 chunk should have 4 bytes of CRC, but "
711                                       "this has {}".format(chunk_data_size))
712                self.img_handler.read(4)
713            else:
714                raise HvbError("Unknown chunk type: {}".format(chunk_type))
715
716            nsparsed_chunk_nums += chunk_sz
717            nsparsed_output_offset += chunk_sz * self.block_size
718
719        return nsparsed_output_offset, nsparsed_chunk_nums
720
721    def update_chunks_and_blocks(self):
722        """Helper function to update the image header.
723
724        The the |total_chunks| and |total_blocks| fields in the header
725        will be set to value of the |_num_total_blocks| and
726        |_num_total_chunks| attributes.
727
728        """
729        num_chunks_and_blocks_offset = 16
730        num_chunks_and_blocks_format = '<II'
731        self.img_handler.seek(num_chunks_and_blocks_offset, os.SEEK_SET)
732        self.img_handler.write(struct.pack(num_chunks_and_blocks_format,
733                                      self.total_blks, self.total_chunks))
734
735    def append_dont_care(self, num_bytes):
736        """Appends a DONT_CARE chunk to the sparse file.
737
738        The given number of bytes must be a multiple of the block size.
739
740        Arguments:
741          num_bytes: Size in number of bytes of the DONT_CARE chunk.
742
743        """
744        if num_bytes % self.block_size != 0:
745            raise HvbError("Given number of bytes must be a multiple of the block size.")
746
747        if not self.is_sparse:
748            self.img_handler.seek(0, os.SEEK_END)
749            # This is more efficient that writing NUL bytes since it'll add
750            # a hole on file systems that support sparse files (native
751            # sparse, not OpenHarmony sparse).
752            self.img_handler.truncate(self.img_handler.tell() + num_bytes)
753            self.header_analyze()
754            return
755
756        self.total_chunks += 1
757        self.total_blks += num_bytes // self.block_size
758        self.update_chunks_and_blocks()
759
760        self.img_handler.seek(self.sparse_end, os.SEEK_SET)
761        self.img_handler.write(struct.pack(ImageChunk.CHUNK_HEADER_FORMAT,
762                                      ImageChunk.CHUNK_TYPE_DONT_CARE,
763                                      0,  # Reserved
764                                      num_bytes // self.block_size,
765                                      struct.calcsize(ImageChunk.CHUNK_HEADER_FORMAT)))
766        self.header_analyze()
767
768    def append_raw(self, data):
769        """Appends a RAW chunk to the sparse file.
770
771        The length of the given data must be a multiple of the block size.
772
773        Arguments:
774          data: Data to append as bytes.
775
776        """
777        print("SELF>BLOCK_SIZE: ", self.block_size)
778        if len(data) % self.block_size != 0:
779            raise HvbError("Given data must be a multiple of the block size.")
780
781        if not self.is_sparse:
782            self.img_handler.seek(0, os.SEEK_END)
783            self.img_handler.write(data)
784            self.header_analyze()
785            return
786
787        self.total_chunks += 1
788        self.total_blks += len(data) // self.block_size
789        self.update_chunks_and_blocks()
790
791        self.img_handler.seek(self.sparse_end, os.SEEK_SET)
792        self.img_handler.write(struct.pack(ImageChunk.CHUNK_HEADER_FORMAT,
793                                      ImageChunk.CHUNK_TYPE_RAW,
794                                      0,  # Reserved
795                                      len(data) // self.block_size,
796                                      len(data) +
797                                      struct.calcsize(ImageChunk.CHUNK_HEADER_FORMAT)))
798        self.img_handler.write(data)
799        self.header_analyze()
800
801    def append_fill(self, fill_data, size):
802        """Appends a fill chunk to the sparse file.
803
804        The total length of the fill data must be a multiple of the block size.
805
806        Arguments:
807          fill_data: Fill data to append - must be four bytes.
808          size: Number of chunk - must be a multiple of four and the block size.
809
810        """
811        if len(fill_data) != 4 or size % 4 != 0 or size % self.block_size != 0:
812            raise HvbError("The total length of the fill data must be a multiple of the block size.")
813
814        if not self.is_sparse:
815            self.img_handler.seek(0, os.SEEK_END)
816            self.img_handler.write(fill_data * (size // 4))
817            self.header_analyze()
818            return
819
820        self.total_chunks += 1
821        self.total_blks += size // self.block_size
822        self.update_chunks_and_blocks()
823
824        self.img_handler.seek(self.sparse_end, os.SEEK_SET)
825        self.img_handler.write(struct.pack(ImageChunk.CHUNK_HEADER_FORMAT,
826                                      ImageChunk.CHUNK_TYPE_FILL,
827                                      0,  # Reserved
828                                      size // self.block_size,
829                                      4 + struct.calcsize(ImageChunk.CHUNK_HEADER_FORMAT)))
830        self.img_handler.write(fill_data)
831        self.header_analyze()
832
833    def seek(self, offset):
834        """Sets the cursor position for reading from unsparsified file.
835
836        Arguments:
837          offset: Offset to seek to from the beginning of the file.
838
839        Raises:
840          RuntimeError: If the given offset is negative.
841        """
842        if offset < 0:
843            raise RuntimeError('Seeking with negative offset: {}'.format(offset))
844        self._file_pos = offset
845
846    def read(self, size):
847        """Reads data from the unsparsified file.
848
849        This method may return fewer than |size| bytes of data if the end
850        of the file was encountered.
851
852        The file cursor for reading is advanced by the number of bytes
853        read.
854
855        Arguments:
856          size: Number of bytes to read.
857
858        Returns:
859          The data as bytes.
860        """
861        if not self.is_sparse:
862            self.img_handler.seek(self._file_pos)
863            data = self.img_handler.read(size)
864            self._file_pos += len(data)
865            return data
866
867        # Iterate over all chunks.
868        chunk_idx = bisect.bisect_right(self.nsparsed_chunk_offset_list,
869                                        self._file_pos) - 1
870        data = bytearray()
871        to_go = size
872        while to_go > 0:
873            chunk = self.chunks[chunk_idx]
874            chunk_pos_offset = self._file_pos - chunk.nsparsed_chunk_offset
875            chunk_pos_to_go = min(chunk.nsparsed_output_size - chunk_pos_offset, to_go)
876
877            if chunk.chunk_type == ImageChunk.CHUNK_TYPE_RAW:
878                self.img_handler.seek(chunk.sparsed_input_offset + chunk_pos_offset)
879                data.extend(self.img_handler.read(chunk_pos_to_go))
880            elif chunk.chunk_type == ImageChunk.CHUNK_TYPE_FILL:
881                all_data = chunk.fill_data * (chunk_pos_to_go // len(chunk.fill_data) + 2)
882                offset_mod = chunk_pos_offset % len(chunk.fill_data)
883                data.extend(all_data[offset_mod:(offset_mod + chunk_pos_to_go)])
884            else:
885                if chunk.chunk_type != ImageChunk.CHUNK_TYPE_DONT_CARE:
886                    raise HvbError("Invalid chunk type!")
887                data.extend(b'\0' * chunk_pos_to_go)
888
889            to_go -= chunk_pos_to_go
890            self._file_pos += chunk_pos_to_go
891            chunk_idx += 1
892            # Generate partial read in case of EOF.
893            if chunk_idx >= len(self.nsparsed_chunk_offset_list):
894                break
895
896        return bytes(data)
897
898    def tell(self):
899        """Returns the file cursor position for reading from unsparsified file.
900
901        Returns:
902          The file cursor position for reading.
903        """
904        return self._file_pos
905
906    def truncate(self, size):
907        """Truncates the unsparsified file.
908
909        Arguments:
910          size: Desired size of unsparsified file.
911
912        Raises:
913          ValueError: If desired size isn't a multiple of the block size.
914        """
915
916        if not self.is_sparse:
917            self.img_handler.truncate(size)
918            self.header_analyze()
919            return
920
921        if size % self.block_size != 0:
922            raise ValueError('Cannot truncate to a size which is not a multiple '
923                             'of the block size')
924
925        if size == self.img_size:
926            # Trivial where there's nothing to do.
927            return
928
929        if size < self.img_size:
930            chunk_idx = bisect.bisect_right(self.nsparsed_chunk_offset_list, size) - 1
931            chunk = self.chunks[chunk_idx]
932            if chunk.nsparsed_chunk_offset != size:
933                # Truncation in the middle of a trunk - need to keep the chunk
934                # and modify it.
935                chunk_idx_for_update = chunk_idx + 1
936                num_to_keep = size - chunk.nsparsed_chunk_offset
937                if num_to_keep % self.block_size != 0:
938                    raise HvbError("Remainder size of bytes must be multiple of the block size.")
939
940                if chunk.chunk_type == ImageChunk.CHUNK_TYPE_RAW:
941                    truncate_at = (chunk.nsparsed_chunk_offset +
942                                   struct.calcsize(ImageChunk.CHUNK_HEADER_FORMAT) + num_to_keep)
943                    data_sz = num_to_keep
944                elif chunk.chunk_type == ImageChunk.CHUNK_TYPE_FILL:
945                    truncate_at = (chunk.nsparsed_chunk_offset +
946                                   struct.calcsize(ImageChunk.CHUNK_HEADER_FORMAT) + 4)
947                    data_sz = 4
948                elif chunk.chunk_type == ImageChunk.CHUNK_TYPE_DONT_CARE:
949                    truncate_at = chunk.nsparsed_chunk_offset + struct.calcsize(ImageChunk.CHUNK_HEADER_FORMAT)
950                    data_sz = 0
951                else:
952                    raise HvbError("Invalid chunk type.")
953                chunk_sz = num_to_keep // self.block_size
954                total_sz = data_sz + struct.calcsize(ImageChunk.CHUNK_HEADER_FORMAT)
955                self.img_handler.seek(chunk.nsparsed_chunk_offset)
956                self.img_handler.write(struct.pack(ImageChunk.CHUNK_HEADER_FORMAT,
957                                              chunk.chunk_type,
958                                              0,  # Reserved
959                                              chunk_sz, total_sz))
960                chunk.output_size = num_to_keep
961            else:
962                # Truncation at trunk boundary.
963                truncate_at = chunk.chunk_offset
964                chunk_idx_for_update = chunk_idx
965
966            self.total_chunks = chunk_idx_for_update
967            self.total_blks = 0
968            for i in range(0, chunk_idx_for_update):
969                self.total_blks += self.chunks[i].nsparsed_output_size // self.block_size
970            self.update_chunks_and_blocks()
971            self.img_handler.truncate(truncate_at)
972
973            # We've modified the file so re-read all data.
974            self.header_analyze()
975        else:
976            # Truncating to grow - just add a DONT_CARE section.
977            self.append_dont_care(size - self.img_size)
978
979
980class HvbTool(object):
981    MAGIC = b'HVB\0'
982    HVB_VERSION_MAJOR = 1
983    HVB_VERSION_MINOR = 1
984    FOOTER_SIZE = 104
985    VERITY_RESERVED = b'\0' * 36
986    RVT_MAGIC = b'rot\0'
987
988    MAX_RVT_SIZE = 64 * 1024
989    MAX_FOOTER_SIZE = 4096
990    RVT_RESERVED = 60
991
992
993    def __init__(self):
994        self.img = _params['image']
995        self.partition = _params['partition']
996        self.partition_size = int(_params['partition_size'])
997        self.vertype = _params['verity_type'].lower()  # verity type: hash - 1 or hashtree - 2 (fix)
998        self.algo = _params['algorithm']
999        self.user_id = _params['userid']
1000        self.pubkey = _params['pubkey']
1001        self.privkey = _params['privkey']
1002        self.block_size = _params['block_size']
1003        self.padding_size = _params['padding_size']
1004        self.signing_helper_with_files = _params['signing_helper_with_files']
1005        self.pubkey_num_per_ptn = int(_params['pubkey_num_per_ptn'])
1006        self.calc_max_image_size = _params['calc_max_image_size'].lower()
1007        self.signed_img = _params['output']
1008        self.hash_algo = ALGORITHMS[self.algo].hash_algo
1009
1010        if _params['salt'] is not None:
1011            self.salt = binascii.a2b_hex(_params['salt'])
1012
1013        if self.algo == 'SM2' and self.user_id is None:
1014            self.user_id = create_random_data(16)    # Set user_id 16 bytes
1015            print("USERID: {}".format(self.user_id))
1016
1017        self.hashtree = b''
1018        self.digest = b''
1019        self.fec = b''
1020        self.hvb_cert_content = b''
1021
1022        self.hvb_calc_max_image()
1023        self.original_image_info()
1024
1025    def original_image_info(self):
1026        if self.img is None:
1027            return
1028
1029        self.image_handle = ImageHandle(self.img)
1030        self.original_image_length = self.image_handle.img_size
1031        # Image length algins to 4096 bytes
1032        self.img_len_with_padding = round_to_multiple(self.original_image_length, self.block_size)
1033        print("Image length(%s): %d bytes" % (self.img, self.original_image_length))
1034
1035    def hvb_header(self):
1036        self.hvb_cert_content = self.MAGIC
1037        self.hvb_cert_content += struct.pack('I', self.HVB_VERSION_MAJOR)
1038        self.hvb_cert_content += struct.pack('I', self.HVB_VERSION_MINOR)
1039        self.hvb_cert_content += self.VERITY_RESERVED
1040
1041    def hvb_image_info(self):
1042        max_partition_name_len = 64
1043        partition_name = (self.partition).encode('utf-8') + b'\0' * (max_partition_name_len - len(self.partition))
1044
1045        self.hvb_cert_content += struct.pack('Q', self.original_image_length)
1046        self.hvb_cert_content += struct.pack('Q', self.img_len_with_padding)
1047        self.hvb_cert_content += partition_name
1048        self.hvb_cert_content += struct.pack('2Q', int(_params['rollback_location']), int(_params['rollback_index']))
1049
1050    def image_hash(self):
1051        # 0: SHA256    1: SHA128    2: SHA512    3:SM3
1052        halgo = HASH_ALGORITHM[ALGORITHMS[self.algo].hash_algo]
1053        print("HALGO: {}".format(halgo))
1054
1055        print("digest: {}".format(binascii.b2a_hex(self.digest)))
1056        print("digest size: ", len(self.digest))
1057
1058        hash_algo = struct.pack('I', halgo)
1059        salt_offset = struct.pack('Q', 240)       # 根据HVB证书格式,salt偏移位置固定,为240字节的偏移
1060        salt_size = struct.pack('Q', len(self.salt))
1061        digest_offset = struct.pack('Q', 240 + len(self.salt))
1062        digest_size = struct.pack('Q', len(self.digest))
1063        return hash_algo + salt_offset + salt_size + digest_offset + digest_size
1064
1065    def create_hashtree(self, digest_size):
1066        size = self.image_handle.img_size
1067
1068        level_offsets, tree_size = calc_hashtree_size(size, self.block_size, digest_size)
1069        print("level_offsets: {}, tree_size: {}".format(level_offsets, tree_size))
1070        if self.hash_algo == 'SM3':
1071            rootdigest, self.hashtree = generate_sm3_hash_tree(self.image_handle, size, self.block_size, level_offsets, tree_size)
1072        else:
1073            rootdigest, self.hashtree = generate_sha_hash_tree(self.image_handle, size, self.block_size, self.hash_algo,
1074                                                       self.salt, level_offsets, tree_size)
1075        if len(self.hashtree) % self.block_size != 0:
1076            self.hashtree += b'\0' * (self.block_size - len(self.hashtree) % self.block_size)
1077        print("root digest: ", binascii.b2a_hex(rootdigest))
1078        return rootdigest
1079
1080    def image_hash_tree(self):
1081        image_hashtree = {"hashtree_offset": self.img_len_with_padding, \
1082                          "hashtree_size": len(self.hashtree), "data_block_size": self.block_size, \
1083                          "hash_block_size": self.block_size, "fec_num_roots": 0, \
1084                          "fec_offset": 240 + len(self.salt) + len(self.digest), "fec_size": 0}
1085        hashtree_struct = struct.Struct('Q' * len(image_hashtree))
1086        dlist = []
1087        for item in image_hashtree:
1088            dlist.append(image_hashtree[item])
1089        return hashtree_struct.pack(*dlist)
1090
1091    def hvb_hash_info(self):
1092        verity_type = 0
1093
1094        if self.vertype == 'hash':
1095            verity_type = 1    # hash: 1    hashtree: 2
1096
1097            if self.hash_algo == 'SM3':
1098                self.digest = get_sm3_digest(self.img)
1099            else:
1100                self.image_handle.seek(0)
1101                image_content = self.image_handle.read(self.image_handle.img_size)
1102                self.digest = get_sha256_digest(self.hash_algo, self.salt, image_content)
1103        elif self.vertype == 'hashtree':
1104            verity_type = 2    # hash: 1    hashtree: 2
1105
1106            if self.hash_algo == 'SM3':
1107                digest_size = ALGORITHMS[self.algo].hash_bytes
1108            else:
1109                hashtree_hasher = hashlib.new(self.hash_algo, self.salt)
1110                digest_size = len(hashtree_hasher.digest())
1111            digest_padding = 2 ** ((digest_size - 1).bit_length()) - digest_size
1112            print("hash_algo: {}, salt: {}".format(self.hash_algo, self.salt))
1113            print("digest_size: {}, digest_padding: {}".format(digest_size, digest_padding))
1114            remainder = self.block_size - self.image_handle.img_size % self.block_size
1115            if remainder != self.block_size:
1116                self.image_handle.append_raw(b'\0' * remainder)
1117            self.img_len_with_padding = self.image_handle.img_size
1118            self.digest = self.create_hashtree(digest_size + digest_padding)
1119        else:
1120            print("[hvbtool][ERROR]: Invalid verity_type: %d", self.vertype)
1121            sys.exit(1)
1122
1123        imghash = self.image_hash()
1124        imghashtree = self.image_hash_tree()
1125        hashpayload = self.salt + self.digest
1126
1127        hashinfo = struct.pack('I', verity_type) + imghash + imghashtree + hashpayload
1128        self.hashinfo_size = len(hashinfo)
1129        self.hvb_cert_content += hashinfo
1130
1131    def get_sign_command(self, sig_content_bin, to_be_sig_content_bin):
1132        if self.algo == 'SM2':
1133            cmd = [
1134                    'openssl', 'dgst',  '-SM3', '-sign', self.privkey, '-sigopt', 'distid:{}'.format(self.user_id),
1135                    '-out', sig_content_bin, to_be_sig_content_bin
1136            ]
1137        else:
1138            cmd = [
1139                    'openssl', 'dgst', '-sign', self.privkey, '-sigopt',  'rsa_padding_mode:pss',
1140                    '-sigopt', 'rsa_pss_saltlen:{}'.format(len(self.salt)), '-sha256', '-out',
1141                    sig_content_bin,  to_be_sig_content_bin
1142            ]
1143
1144        return cmd
1145
1146    def hvb_signature_info(self):
1147        sig_content_bin = os.sep.join([os.getcwd(), 'sigcontent.{0}.bin'.format(os.getpid())])
1148        to_be_sig_content_bin = os.sep.join([os.getcwd(), 'tmp.{0}.bin'.format(os.getpid())])
1149        # 签名算法  0:SHA256_RSA3072(默认值), 1:SHA256_RSA4096 , 2:SHA256_RSA2048
1150        if self.algo not in ALGORITHMS:
1151            raise HvbError("Unknown algorithm: {}".format(self.algo))
1152        algo = ALGORITHMS[self.algo]
1153        flags = 0       # 预留的flag标记,默认全为0
1154        user_id_len = 0
1155        userid = b''
1156
1157        if self.user_id is not None:
1158            user_id_len = len(self.user_id)
1159            userid = self.user_id.encode()
1160
1161        keyblock_offset = 144 + self.hashinfo_size + 112
1162
1163        try:
1164            if self.algo == "SM2":
1165                key = SMPublicKey(self.pubkey)
1166            else:
1167                key = RSAPublicKey(self.pubkey)
1168        except HvbError as err:
1169            sys.exit(1)
1170        keyblock = key.get_public_key()
1171
1172        signature_offset = keyblock_offset + len(keyblock)
1173        sig_length = len(self.hvb_cert_content) + 112 + len(keyblock)
1174        user_id_offset = signature_offset + algo.sig_bytes
1175
1176        print("user_id_offset: {}, user_id_len: {}".format(user_id_offset, user_id_len))
1177
1178        self.hvb_cert_content += struct.pack('Q', sig_length) + struct.pack('I', algo.sig_algo) \
1179                        + struct.pack('I', flags) + struct.pack('Q', keyblock_offset) \
1180                        + struct.pack('Q', len(keyblock)) + struct.pack('Q', signature_offset) \
1181                        + struct.pack('Q', algo.sig_bytes) + struct.pack('Q', user_id_offset)  \
1182                        + struct.pack('I', user_id_len) + b'\0' * 52 + keyblock    # reserved 52 bytes
1183
1184        if os.path.exists(sig_content_bin):
1185            os.remove(sig_content_bin)
1186
1187        flags = os.O_WRONLY | os.O_CREAT
1188        modes = stat.S_IWUSR | stat.S_IRUSR
1189        with os.fdopen(os.open(to_be_sig_content_bin, flags, modes), 'wb') as tmp_fd:
1190            tmp_fd.write(self.hvb_cert_content)
1191
1192        if self.signing_helper_with_files is not None:
1193            if len(self.salt) != 32:
1194                raise HvbError("Salt len must be 32 bit, but current salt len is {} bit!".format(len(self.salt)))
1195            p = subprocess.Popen([self.signing_helper_with_files, self.algo,
1196                                  self.pubkey, to_be_sig_content_bin, sig_content_bin])
1197            ret = p.wait()
1198            if ret != 0:
1199                raise HvbError("Failed to sign the image by {}".format(self.signing_helper_with_files))
1200        else:
1201            self.get_signature_content(sig_content_bin, to_be_sig_content_bin)
1202
1203        flags = os.O_RDONLY
1204        with os.fdopen(os.open(sig_content_bin, flags, modes), 'rb') as sig_fd:
1205            sigcontent = sig_fd.read()
1206
1207        self.hvb_cert_content += sigcontent + userid
1208
1209        if os.path.exists(sig_content_bin):
1210            os.remove(sig_content_bin)
1211        if os.path.exists(to_be_sig_content_bin):
1212            os.remove(to_be_sig_content_bin)
1213
1214    def get_signature_content(self, sig_content_bin, to_be_sig_content_bin):
1215        cmd = self.get_sign_command(sig_content_bin, to_be_sig_content_bin)
1216        ret = subprocess.call(cmd)
1217        if ret != 0:
1218            raise HvbError("Failed to sign the image.")
1219
1220        if self.algo == "SM2":
1221            flags = os.O_RDWR
1222            with os.fdopen(os.open(sig_content_bin, flags, stat.S_IWUSR | stat.S_IRUSR), 'r+b') as sig_fd:
1223                content = sig_fd.read()
1224                length = content[3]
1225                if length != 32 and length != 33:
1226                    raise HvbError("Invalid signature information.")
1227
1228                begin = 4 + length % 32
1229                end = begin + 32
1230                data1 = content[begin : end]
1231
1232                length = content[end + 1]
1233                if length != 32 and length != 33:
1234                    raise HvbError("Invalid signature information.")
1235                begin = end + 2 + length % 32
1236                end = begin + 32
1237                data2 = content[begin : end]
1238
1239                data = b''.join([data1, data2])
1240                sig_fd.seek(0)
1241                sig_fd.truncate()
1242                sig_fd.write(data)
1243
1244    def hvb_footer_info(self):
1245        self.footer = b''
1246        footer_magic = self.MAGIC + b'\0' * 4
1247        self.partition_size = int(self.partition_size)
1248
1249        cert_size = len(self.hvb_cert_content)
1250        cert_offset = self.img_len_with_padding + len(self.hashtree)
1251
1252        if self.padding_size is not None:
1253            cert_offset = self.partition_size - self.padding_size
1254
1255        print("cert_size: %x, cert_offset: %x, partition_size: %x" % (cert_size, cert_offset, self.partition_size))
1256        self.footer += footer_magic \
1257                       + struct.pack('4Q', cert_offset, cert_size, self.original_image_length, self.partition_size) \
1258                       + b'\0' * 64
1259
1260    def hvb_make_rvt_image(self):
1261        self.img = 'tmp_{0}_rvt.img'.format(os.getpid())
1262        rvtcontent = b''
1263        rvtcontent += self.RVT_MAGIC
1264        verity_num = len(_params['chain_partition'])
1265        if self.pubkey_num_per_ptn != 1 and self.pubkey_num_per_ptn != 2:
1266            print("invalid pubkey_num_per_ptn: %d" % (self.pubkey_num_per_ptn))
1267            sys.exit(1)
1268        if verity_num % self.pubkey_num_per_ptn != 0:
1269            print("verity_num doesnt match pubkey_num_per_ptn: %d" % (self.pubkey_num_per_ptn))
1270            sys.exit(1)
1271        rvtcontent += struct.pack('I', verity_num // self.pubkey_num_per_ptn)
1272        rvtcontent += struct.pack('I', self.pubkey_num_per_ptn) + b'\0' * self.RVT_RESERVED # rvt_reversed: 60 bytes
1273        cur_sizes = len(rvtcontent)
1274
1275        for index in range(0, verity_num, self.pubkey_num_per_ptn):
1276            chain_partition_data = _params['chain_partition'][index].split(':')
1277            partition_name = chain_partition_data[0].strip()
1278            pubkey = chain_partition_data[1].strip()
1279            if self.pubkey_num_per_ptn == 2:
1280                chain_partition_data_backup = _params['chain_partition'][index + 1].split(':')
1281                partition_name_backup = chain_partition_data_backup[0].strip()
1282                if partition_name != partition_name_backup:
1283                    print("partition name doesnt match for pubkey_num_per_ptn: %d" % (self.pubkey_num_per_ptn))
1284                    sys.exit(1)
1285                pubkey_backup = chain_partition_data_backup[1].strip()
1286
1287            try:
1288                if self.algo == 'SM2':
1289                    key = SMPublicKey(pubkey)
1290                    if self.pubkey_num_per_ptn == 2:
1291                        key_backup = SMPublicKey(pubkey_backup)
1292                else:
1293                    key = RSAPublicKey(pubkey)
1294                    if self.pubkey_num_per_ptn == 2:
1295                        key_backup = RSAPublicKey(pubkey_backup)
1296            except HvbError as err:
1297                sys.exit(1)
1298            pubkey_payload = key.get_public_key()
1299            if self.pubkey_num_per_ptn == 2:
1300                pubkey_payload_backup = key_backup.get_public_key()
1301            pubkey_len = len(pubkey_payload)
1302            pubkey_offset = cur_sizes + 80
1303            rvtcontent += partition_name.encode() + b'\0' * (64 - len(partition_name))    # partition_name
1304            rvtcontent += struct.pack('Q', pubkey_offset) # pubkey_offset
1305            rvtcontent += struct.pack('Q', pubkey_len)    # pubkey_len
1306            rvtcontent += pubkey_payload  # pubkey_payload
1307            if self.pubkey_num_per_ptn == 2:
1308                rvtcontent += pubkey_payload_backup  # pubkey_payload_backup
1309            cur_sizes += 80 + pubkey_len * self.pubkey_num_per_ptn
1310
1311        flags = os.O_WRONLY | os.O_RDONLY | os.O_CREAT
1312        modes = stat.S_IWUSR | stat.S_IRUSR
1313        with os.fdopen(os.open(self.img, flags, modes), 'wb') as rvt_fd:
1314            rvt_fd.write(rvtcontent)
1315
1316        self.original_image_info()
1317        self.hvb_make_image()
1318
1319        if os.path.exists(self.img):
1320            os.remove(self.img)
1321
1322    def hvb_combine_image_info(self):
1323        cert_and_footer = b''
1324        hashtree_length = len(self.hashtree)
1325        hvb_cert_length = len(self.hvb_cert_content)
1326        image = os.path.abspath(self.img)
1327        signed_image = os.path.abspath(self.signed_img)
1328
1329        shutil.copy(image, signed_image)
1330        image = ImageHandle(signed_image)
1331
1332        padding = round_to_multiple(image.img_size, self.block_size)
1333        if padding > image.img_size:
1334            if image.is_sparse is True:     # Original image size must be multiple of block_size
1335                raise HvbError("The sparse image size is not multiple of the block size.")
1336            image.truncate(padding)
1337
1338        padding = round_to_multiple((hvb_cert_length + self.FOOTER_SIZE), self.block_size)
1339        if padding > (hvb_cert_length + self.FOOTER_SIZE):
1340            cert_and_footer = self.hvb_cert_content + b'\0' * (padding - (self.FOOTER_SIZE + hvb_cert_length)) + \
1341                self.footer
1342        else:
1343            cert_and_footer = self.hvb_cert_content + self.footer
1344
1345        if self.partition_size < image.img_size + hashtree_length + len(cert_and_footer):
1346            raise HvbError("[hvbtool][ERROR]: Partition size is too small!")
1347
1348        cert_and_footer = self.hvb_cert_content + b'\0' * (self.partition_size - image.img_size - \
1349            hashtree_length - hvb_cert_length - self.FOOTER_SIZE) + self.footer
1350        image.append_raw(self.hashtree)
1351        image.append_raw(cert_and_footer)
1352        if image.img_handler:
1353            image.img_handler.close()
1354
1355    def parse_rvt_image(self, handle):
1356        handle.seek(0)
1357
1358        header = handle.read(12)
1359        magic, verity_num, pubkey_num_per_ptn = struct.unpack('4sII', header)
1360
1361        msg = ["[rvt info]: \n"]
1362        if magic != self.RVT_MAGIC:
1363            raise HvbError("It is not a valid rvt image.")
1364
1365        if pubkey_num_per_ptn == 0:
1366            pubkey_num_per_ptn = 1
1367            msg.append("\tuse old version pubkey_num_per_ptn\n")
1368        msg.append("\tpubkey_num_per_ptn:          {}\n".format(pubkey_num_per_ptn))
1369
1370        handle.seek(72)
1371        for i in range(verity_num):
1372            # The size of pubkey_des excludes pubkey_payload is 80 bytes
1373            data = handle.read(80)
1374            name, pubkey_offset, pubkey_len = struct.unpack('64s2Q', data)
1375            msg.append('\n\tChain Partition descriptor: \n')
1376            msg.append('\t\tPartition Name:      {}\n'.format(name.decode()))
1377            pubkey = handle.read(pubkey_len)
1378            msg.append("\t\tPublic key:          {}\n".format(hashlib.sha256(pubkey).hexdigest()))
1379            if pubkey_num_per_ptn == 2:
1380                pubkey_backup = handle.read(pubkey_len)
1381                msg.append("\t\tBackup Public key:   {}\n".format(hashlib.sha256(pubkey_backup).hexdigest()))
1382
1383        print(''.join(msg))
1384
1385    def hvb_calc_max_image(self):
1386        if self.calc_max_image_size == "true":
1387            max_metadata_size = self.MAX_RVT_SIZE + self.MAX_FOOTER_SIZE
1388
1389            if self.vertype == 'hashtree':
1390                hashtree_hasher = hashlib.new(self.hash_algo, self.salt)
1391                digest_size = len(hashtree_hasher.digest())
1392                digest_padding = 2 ** ((digest_size - 1).bit_length()) - digest_size
1393
1394                _, hashtree_size = calc_hashtree_size(self.partition_size, self.block_size, digest_size)
1395                max_metadata_size += hashtree_size
1396
1397            max_image_size = self.partition_size - max_metadata_size
1398            print("\ncalc_max_image_size: {}".format(max_image_size))
1399            sys.exit(0)
1400
1401    def hvb_make_image(self):
1402        self.hvb_header()
1403        self.hvb_image_info()
1404        self.hvb_hash_info()
1405        self.hvb_signature_info()
1406        self.hvb_footer_info()
1407        self.hvb_combine_image_info()
1408
1409    def hvb_parse_image(self):
1410        try:
1411            image = ImageHandle(self.img)
1412            image.seek(self.original_image_length - self.FOOTER_SIZE)
1413            footer_bin = image.read(self.FOOTER_SIZE)
1414            footer = HvbFooter(footer_bin)
1415            footer.info_footer()
1416
1417            image.seek(footer.certoffset)
1418            cert_bin = image.read(footer.certsize)
1419            cert = HvbCert(cert_bin)
1420            cert.info_cert()
1421
1422            if 'rvt' in cert.partition.decode():
1423                self.parse_rvt_image(image)
1424        except (HvbError, struct.error):
1425            raise HvbError("Failed to parse the image!")
1426        finally:
1427            if image.img_handler:
1428                image.img_handler.close()
1429
1430    def hvb_erase_image(self):
1431        try:
1432            image = ImageHandle(self.img)
1433            image.seek(self.original_image_length - self.FOOTER_SIZE)
1434            footer_bin = image.read(self.FOOTER_SIZE)
1435            footer = HvbFooter(footer_bin)
1436            image.seek(0)
1437            image.truncate(footer.imagesize)
1438        except (HvbError, struct.error):
1439            raise HvbError("Failed to erase image.")
1440        finally:
1441            if image.img_handler:
1442                image.img_handler.close()
1443
1444    def hvb_verify_image(self):
1445        try:
1446            key_blob = RSAPublicKey(self.pubkey).get_public_key()
1447            print('Verifying image {} using key at {}'.format(self.img, self.pubkey))
1448        except HvbError as err:
1449            print('Pubkey {} is invalid!'.format(self.pubkey))
1450            sys.exit(1)
1451
1452        try:
1453            image = ImageHandle(self.img)
1454            image.seek(self.original_image_length - self.FOOTER_SIZE)
1455            footer_bin = image.read(self.FOOTER_SIZE)
1456            footer = HvbFooter(footer_bin)
1457
1458            image.seek(footer.certoffset)
1459            cert_bin = image.read(footer.certsize)
1460            cert = HvbCert(cert_bin)
1461            if key_blob and key_blob != cert.key:
1462                raise HvbError('Embedded public key does not match given key.')
1463            if not cert.verify_signature(self.pubkey):
1464                raise HvbError('Failed check signature for {}!'.format(self.img))
1465            if not cert.verify_digest(image):
1466                raise HvbError('Failed check digest for {}!'.format(self.img))
1467            print('Successfully verified hvb cert for {}!'.format(self.img))
1468        except (HvbError, struct.error):
1469            raise HvbError("Failed to verify image.")
1470        finally:
1471            if image.img_handler:
1472                image.img_handler.close()
1473
1474
1475def print_help():
1476    print("usage: hvbtool.py [-h]")
1477    print("\t\t{make_hash_footer, make_hashtree_footer, make_rvt_image, parse_image, erase_image}")
1478
1479
1480def parse_arguments(argvs):
1481    global _params
1482    length = len(argvs)
1483    i = 0
1484    args = list()
1485
1486    if length % 2 != 0:
1487        print_help()
1488        print("[hvbtool][ERROR]: invalid argument format!")
1489        sys.exit(1)
1490
1491    while (i < length):
1492        args.append([argvs[i], argvs[i + 1]])
1493        i = i + 2
1494
1495    act = args[0][1]
1496    if act.strip() in VERITY_TYPE.keys():
1497        _params['verity_type'] = VERITY_TYPE[act]
1498    else:
1499        _params['verity_type'] = 'hash'
1500
1501    for item in args[1:]:
1502        itemkey = item[0].strip()[2:]
1503        if itemkey in _params.keys():
1504            if itemkey == "chain_partition":
1505                _params[itemkey].append(item[1].strip())
1506            else:
1507                _params[itemkey] = item[1].strip()
1508        else:
1509            print("[hvbtool][ERROR]: Unknown argument: %s" % item[0])
1510            sys.exit(1)
1511
1512    if  _params['salt'] is None:
1513        _params['salt'] = create_random_data()
1514    return act
1515
1516
1517def necessary_arguments_check(check_list):
1518    for item in check_list:
1519        if _params[item] is None or len(_params[item]) == 0:
1520            print("[hvbtool][ERROR]: The argument '{}' is necessary.".format(item))
1521            return False
1522
1523    return True
1524
1525
1526def check_privkey_arg(act):
1527    if (act == 'make_hash_footer') or (act == 'make_hashtree_footer') or (act == 'make_rvt_image'):
1528        if _params['privkey'] is None and _params['signing_helper_with_files'] is None:
1529            print("[hvbtool][ERROR]: The argument 'privkey' is necessary.")
1530            sys.exit(1)
1531
1532
1533def check_arguments(act):
1534    make_image_arguments_list = ['image', 'algorithm', 'pubkey', 'rollback_index', 'rollback_location']
1535    make_rvt_image_arguments_list = ['algorithm', 'pubkey', 'rollback_index', 'rollback_location', 'chain_partition']
1536    parse_erase_image_arguments_list = ['image']
1537    verify_image_arguments_list = ['image', 'pubkey']
1538    ret = False
1539
1540    if act == 'make_hash_footer' or act == 'make_hashtree_footer':
1541        ret = necessary_arguments_check(make_image_arguments_list)
1542    elif act == 'make_rvt_image':
1543        ret = necessary_arguments_check(make_rvt_image_arguments_list)
1544    elif act == 'parse_image' or act == 'erase_image':
1545        ret = necessary_arguments_check(parse_erase_image_arguments_list)
1546    elif act == 'verify_image':
1547        ret = necessary_arguments_check(verify_image_arguments_list)
1548    else:
1549        print("[hvbtool][ERROR]: Unkown action: {}".format(act))
1550
1551    if ret is False:
1552        sys.exit(1)
1553
1554    check_privkey_arg(act)
1555
1556
1557def main(obj, act):
1558    if act == 'make_hash_footer' or act == 'make_hashtree_footer':
1559        obj.hvb_make_image()
1560    elif act == 'parse_image':
1561        obj.hvb_parse_image()
1562    elif act == 'make_rvt_image':
1563        obj.hvb_make_rvt_image()
1564    elif act == 'erase_image':
1565        obj.hvb_erase_image()
1566    elif act == 'verify_image':
1567        obj.hvb_verify_image()
1568    else:
1569        raise HvbError("Unknown action: {}".format(act))
1570
1571if __name__ == '__main__':
1572    action = parse_arguments(sys.argv)
1573    check_arguments(action)
1574    tool = HvbTool()
1575    try:
1576        main(tool, action)
1577    except (HvbError, struct.error):
1578        print("HVB COMMAND FAILED!")
1579        sys.exit(1)
1580    finally:
1581        if tool.image_handle.img_handler:
1582            tool.image_handle.img_handler.close()
1583    sys.exit(0)
1584