1#!/usr/bin/env python3 2# -*- coding: UTF-8 -*- 3''' 4 * Copyright (c) 2022-2023 Huawei Device Co., Ltd. 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16''' 17 18import binascii 19import hashlib 20import os 21import re 22import stat 23import struct 24import subprocess 25import sys 26import bisect 27import shutil 28import random 29 30 31_params = { 32 'partition': None, \ 33 'partition_size': 0, \ 34 'image': None, \ 35 'verity_type': 'hash', \ 36 'algorithm': 'SHA256_RSA3072', \ 37 'userid': None, \ 38 'rollback_location': None, \ 39 'rollback_index': None, \ 40 'salt': None, \ 41 'pubkey': None, \ 42 'privkey': None, \ 43 'hash_algo': 'SHA256', \ 44 'block_size': 4096, \ 45 'fec_num_roots': 0, \ 46 'pubkey_num_per_ptn': 1, \ 47 'padding_size': None, \ 48 'chain_partition': [], \ 49 'signing_helper_with_files': None, \ 50 'calc_max_image_size': 'false', \ 51 'output': None 52} 53 54 55VERITY_TYPE = { 56 'make_hash_footer': 'hash', \ 57 'make_hashtree_footer': 'hashtree' 58} 59 60 61class Algorithm(object): 62 def __init__(self, sig_algo, hash_algo, bit_length, sig_bytes, hash_bytes, pubkey_bytes): 63 self.sig_algo = sig_algo 64 self.hash_algo = hash_algo 65 self.bit_length = bit_length 66 self.sig_bytes = sig_bytes 67 self.hash_bytes = hash_bytes 68 self.pubkey_bytes = pubkey_bytes 69 70 71ALGORITHMS = {'SHA256_RSA3072': Algorithm( 72 sig_algo=0, 73 hash_algo='sha256', 74 bit_length=3072, 75 sig_bytes=384, 76 hash_bytes=32, 77 pubkey_bytes=8 + 2 * 3072 // 8 78 ), \ 79 'SHA256_RSA4096': Algorithm( 80 sig_algo=1, 81 hash_algo='sha256', 82 bit_length=4096, 83 sig_bytes=512, 84 hash_bytes=32, 85 pubkey_bytes=8 + 2 * 4096 // 8 86 ), \ 87 'SHA256_RSA2048': Algorithm( 88 sig_algo=2, 89 hash_algo='sha256', 90 bit_length=2048, 91 sig_bytes=256, 92 hash_bytes=32, 93 pubkey_bytes=8 + 2 * 2048 // 8 94 ), \ 95 'SM2': Algorithm( 96 sig_algo=3, 97 hash_algo='SM3', 98 bit_length=256, # bit 99 sig_bytes=64, 100 hash_bytes=32, 101 pubkey_bytes=64 102 ) \ 103} 104 105HASH_ALGORITHM = { 106 'sha256' : 0, 107 'sha128' : 1, 108 'sha512' : 2, 109 'SM3' : 3 110} 111 112 113def round_to_multiple(num, size): 114 remainder = num % size 115 if remainder == 0: 116 return num 117 return num + size - remainder 118 119 120def create_random_data(length=64): 121 data = '' 122 chars = '0123456789abcdef' 123 len_chars = len(chars) - 1 124 for i in range(length): 125 data += chars[random.randint(0, len_chars)] 126 return data 127 128 129def encode_long(num_bits, value): 130 ret = bytearray() 131 for bit_pos in range(num_bits, 0, -8): 132 octet = (value >> (bit_pos - 8)) & 0xff 133 ret.extend(struct.pack('!B', octet)) 134 return ret 135 136 137def decode_long(blob): 138 ret = 0 139 for b in bytearray(blob): 140 ret *= 256 141 ret += b 142 return ret 143 144 145def calc_hashtree_size(size, block_size, digest_size): 146 level_offsets = [] 147 level_sizes = [] 148 149 treesize = 0 150 levels = 0 151 152 while size > block_size: 153 blocknum = size // block_size 154 level_size = round_to_multiple(blocknum * digest_size, block_size) 155 level_sizes.append(level_size) 156 treesize += level_size 157 levels += 1 158 size = level_size 159 for i in range(levels): 160 offset = 0 161 for j in range(i + 1, levels): 162 offset += level_sizes[j] 163 level_offsets.append(offset) 164 165 return level_offsets, treesize 166 167 168def get_sm3_digest(handle): 169 cmds = ['openssl', 'dgst', '-SM3', handle] 170 p = subprocess.Popen(cmds, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 171 (pout, perr) = p.communicate(timeout=10) 172 retcode = p.wait() 173 if retcode != 0: 174 raise HvbError("Failed to calculate the hash using SM3.") 175 return binascii.a2b_hex(pout.split(b'=')[-1].strip()) 176 177 178def get_sha256_digest(hash_algo, salt, content): 179 hasher = hashlib.new(hash_algo, salt) 180 hasher.update(content) 181 return hasher.digest() 182 183 184def generate_sha_hash_tree(image, image_size, block_size, hash_algo, salt, hash_level_offsets, tree_size): 185 hash_ret = bytearray(tree_size) 186 hash_src_offset = 0 187 hash_src_size = image_size 188 level_num = 0 189 190 while hash_src_size > block_size: 191 level_output_list = [] 192 remaining = hash_src_size 193 while remaining > 0: 194 hasher = hashlib.new(hash_algo, salt) 195 if level_num == 0: 196 begin = hash_src_offset + hash_src_size - remaining 197 end = min(remaining, block_size) 198 image.seek(begin) 199 data = image.read(end) 200 else: 201 offset = hash_level_offsets[level_num - 1] + hash_src_size - remaining 202 data = hash_ret[offset : offset + block_size] 203 hasher.update(data) 204 205 remaining -= len(data) 206 if len(data) < block_size: 207 hasher.update(b'\0' * (block_size - len(data))) 208 level_output_list.append(hasher.digest()) 209 210 level_output = b''.join(level_output_list) 211 level_output_padding = round_to_multiple(len(level_output), block_size) - len(level_output) 212 level_output += b'\0' * level_output_padding 213 214 offset = hash_level_offsets[level_num] 215 hash_ret[offset : offset + len(level_output)] = level_output 216 217 hash_src_size = len(level_output) 218 level_num += 1 219 220 hasher = hashlib.new(hash_algo, salt) 221 hasher.update(level_output) 222 223 return hasher.digest(), bytes(hash_ret) 224 225 226def generate_sm3_hash_tree(image, image_size, block_size, hash_level_offsets, tree_size): 227 hash_ret = bytearray(tree_size) 228 hash_src_offset = 0 229 hash_src_size = image_size 230 level_num = 0 231 tmp = 'hash_{}.bin'.format(os.getpid()) 232 233 while hash_src_size > block_size: 234 level_output_list = [] 235 remaining = hash_src_size 236 while remaining > 0: 237 hasher = b'' 238 if level_num == 0: 239 begin = hash_src_offset + hash_src_size - remaining 240 end = min(remaining, block_size) 241 image.seek(begin) 242 data = image.read(end) 243 else: 244 offset = hash_level_offsets[level_num - 1] + hash_src_size - remaining 245 data = hash_ret[offset : offset + block_size] 246 hasher += data 247 248 remaining -= len(data) 249 if len(data) < block_size: 250 hasher += b'\0' * (block_size - len(data)) 251 252 if os.path.exists(tmp): 253 os.remove(tmp) 254 255 fd = open(tmp, 'wb') 256 fd.write(hasher) 257 fd.close() 258 259 dgst = get_sm3_digest(tmp) 260 level_output_list.append(dgst) 261 262 level_output = b''.join(level_output_list) 263 level_output_padding = round_to_multiple(len(level_output), block_size) - len(level_output) 264 level_output += b'\0' * level_output_padding 265 266 offset = hash_level_offsets[level_num] 267 hash_ret[offset : offset + len(level_output)] = level_output 268 269 hash_src_size = len(level_output) 270 level_num += 1 271 272 if os.path.exists(tmp): 273 os.remove(tmp) 274 275 fd = open(tmp, 'wb') 276 fd.write(level_output) 277 fd.close() 278 279 root_digest = get_sm3_digest(tmp) 280 281 return root_digest, bytes(hash_ret) 282 283 284class HvbFooter(object): 285 FOOTERMAGIC = b'HVB' + b'\0' * 5 286 FOOTER_FORMAT = ('8s' # Magic 287 '4Q' # Cert offset, Cert size, Image_size, Partition_size 288 '64s' # Reserved 289 ) 290 291 def __init__(self, footer=None): 292 self.foot = footer 293 if self.foot: 294 (self.magic, self.certoffset, self.certsize, self.imagesize, 295 self.partition_size, _) = struct.unpack(self.FOOTER_FORMAT, footer) 296 if self.magic != self.FOOTERMAGIC: 297 raise HvbError('Given footer does not look like a HVB footer.') 298 else: 299 raise HvbError('Given footer is None.') 300 301 def info_footer(self): 302 msg = "[HVB footer]: \n" 303 if self.foot: 304 msg += "\tMaigc: {}\n".format((self.magic).decode()) 305 msg += "\tCert offset: {} bytes\n".format(hex(self.certoffset)) 306 msg += "\tCert size: {} bytes\n".format(self.certsize) 307 msg += "\tImage size: {} bytes\n".format(self.imagesize) 308 msg += "\tPartition size: {} bytes\n\n".format(self.partition_size) 309 else: 310 msg += "There is no footer. \n\n" 311 print(msg) 312 313 314class HvbCert(object): 315 CERTMAGIC = b'HVB\0' 316 ALGORITHM_TYPES = {0 : 'SHA256_RSA3072', 317 1 : 'SHA256_RSA4096', 318 2 : 'SHA256_RSA2048', 319 3 : 'SM2' 320 } 321 322 HASH_ALGORITHMS = {0 : 'SHA256', 323 1 : 'SHA128', 324 2 : 'SHA512', 325 3 : 'SM3' 326 } 327 328 def __init__(self, cert=None): 329 self.cert = cert 330 331 flags = os.O_WRONLY | os.O_CREAT 332 modes = stat.S_IWUSR | stat.S_IRUSR 333 with os.fdopen(os.open('cert.bin', flags, modes), 'wb') as cert_fd: 334 cert_fd.write(self.cert) 335 336 if self.cert: 337 self.mgc, self.major, self.minor = struct.unpack('4s2I', self.cert[0:12]) 338 self.version = '{}.{}'.format(self.major, self.minor) 339 if self.mgc != self.CERTMAGIC: 340 raise HvbError('Given cert does not look like a HVB cert.') 341 self.img_org_len, self.img_len, self.partition = struct.unpack('2Q64s', self.cert[48:128]) 342 self.rollback_location, self.rollback_index = struct.unpack('2Q', self.cert[128:144]) 343 344 verity, self.hash_algo = struct.unpack('2I', self.cert[144:152]) 345 self.verity_type = 'hash' if verity == 1 else 'hashtree' 346 self.salt_offset, self.salt_size = struct.unpack('2Q', self.cert[152:168]) 347 348 self.digest_offset, self.digest_size, self.hashtree_offset, self.hashtree_size, \ 349 self.data_block_size, self.hash_block_size, self.fec_num_roots, \ 350 self.fec_offset, self.fec_size = struct.unpack('9Q', self.cert[168:240]) 351 self.salt = struct.unpack('{}s'.format(self.salt_size), self.cert[240:240 + self.salt_size]) 352 self.digest = struct.unpack('{}s'.format(self.digest_size), \ 353 self.cert[240 + self.salt_size : 240 + self.salt_size + self.digest_size]) 354 hash_payload_size = self.salt_size + self.digest_size 355 signature_info_offset = 240 + hash_payload_size 356 self.algo, self.flags, self.key_offset, self.key_len = struct.unpack('2I2Q', \ 357 self.cert[signature_info_offset + 8 : signature_info_offset + 8 + 24]) 358 self.sig_content_offset, self.sig_content_bytes = struct.unpack('2Q', \ 359 self.cert[signature_info_offset + 32 : signature_info_offset + 32 + 16]) 360 self.userid_offset, self.userid_len = struct.unpack('QI', \ 361 self.cert[signature_info_offset + 48 : signature_info_offset + 48 + 12]) 362 print("userid_offset: {}, userid_len: {}".format(self.userid_offset, self.userid_len)) 363 self.key = self.cert[signature_info_offset + 112 : signature_info_offset + 112 + self.key_len] 364 self.user_id = self.cert[signature_info_offset + 112 + self.key_len + self.sig_content_bytes : \ 365 signature_info_offset + 112 + self.key_len + self.sig_content_bytes + self.userid_len] 366 367 else: 368 raise HvbError('Given cert is None.') 369 370 def info_cert(self): 371 msg = "[HVB cert]: \n" 372 if self.cert: 373 msg += "\tHVB tool version: hvb tool {}\n".format(self.version) 374 msg += "\tOriginal Image length: {} bytes\n".format(self.img_org_len) 375 msg += "\tImage length: {} bytes (4K alignment)\n".format(self.img_len) 376 msg += "\tPartition name: {}\n".format(self.partition.decode()) 377 msg += "\tverity type(hash/hashtree): {}\n".format(self.verity_type) 378 msg += "\tsalt size: {} bytes\n".format(self.salt_size) 379 if self.hash_algo not in self.HASH_ALGORITHMS: 380 raise HvbError("Unknown hash algorithm: {}".format(self.hash_algo)) 381 msg += "\tHash algorithm: {}\n".format(self.HASH_ALGORITHMS[self.hash_algo]) 382 msg += "\tdigest size: {} bytes\n".format(self.digest_size) 383 msg += "\thashtree size: {}\n".format(self.hashtree_size) 384 msg += "\tfec size: {}\n".format(self.fec_size) 385 msg += "\thashpayload:\n" 386 msg += "\t\tsalt: {}\n".format((binascii.b2a_hex(self.salt[0]).decode())) 387 msg += "\t\tdigest: {}\n".format((binascii.b2a_hex(self.digest[0]).decode())) 388 if self.hashtree_size != 0: 389 msg += "\t\thashtree offset: 0x{:x}\n".format(self.hashtree_offset) 390 if self.fec_size != 0: 391 msg += "\t\tfec offset: 0x{:x}\n".format(self.fec_offset) 392 if self.algo not in self.ALGORITHM_TYPES: 393 raise HvbError("Unknown algorithm type: {}".format(self.algo)) 394 msg += "\tAlgorithm: {}\n".format(self.ALGORITHM_TYPES[self.algo]) 395 msg += "\tUser id: {}\n".format(self.user_id.decode()) 396 if self.ALGORITHM_TYPES[self.algo] == 'SM2': 397 msg += "\tPublic key: {}\n\n".format(binascii.b2a_hex(self.key).decode()) 398 else: 399 msg += "\tPublic key (sha256): {}\n\n".format(hashlib.sha256(self.key).hexdigest()) 400 else: 401 msg += 'There is no certificate.\n\n' 402 print(msg) 403 404 def verify_signature(self, pubkey): 405 if self.algo not in self.ALGORITHM_TYPES: 406 raise HvbError("Unknown algorithm: {}".format(self.algo)) 407 algorithm_type = self.ALGORITHM_TYPES.get(self.algo) 408 if algorithm_type not in ALGORITHMS: 409 raise HvbError("Unknown algorithm type: {}".format(algorithm_type)) 410 algorithm = ALGORITHMS.get(algorithm_type) 411 to_be_sig_content = self.cert[0 : self.sig_content_offset] 412 sig_content = self.cert[self.sig_content_offset : self.sig_content_offset + self.sig_content_bytes] 413 414 to_be_sig_content_bin = os.sep.join([os.getcwd(), 'tmp.{0}.bin'.format(os.getpid())]) 415 sig_content_bin = os.sep.join([os.getcwd(), 'sigcontent.{0}.bin'.format(os.getpid())]) 416 flags = os.O_RDONLY | os.O_WRONLY | os.O_CREAT 417 modes = stat.S_IWUSR | stat.S_IRUSR 418 with os.fdopen(os.open(to_be_sig_content_bin, flags, modes), 'wb') as tmp_fd: 419 tmp_fd.write(to_be_sig_content) 420 with os.fdopen(os.open(sig_content_bin, flags, modes), 'wb') as tmp_fd: 421 tmp_fd.write(sig_content) 422 cmds = ['openssl', 'dgst', '-verify', pubkey, '-sigopt', 'rsa_padding_mode:pss', 423 '-sigopt', 'rsa_pss_saltlen:32', '-sha256', '-signature', sig_content_bin, to_be_sig_content_bin] 424 p = subprocess.Popen(cmds, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 425 (pout, perr) = p.communicate(timeout=10) 426 retcode = p.wait() 427 if os.path.exists(sig_content_bin): 428 os.remove(sig_content_bin) 429 if os.path.exists(to_be_sig_content_bin): 430 os.remove(to_be_sig_content_bin) 431 print(pout.decode().strip()) 432 if retcode != 0: 433 raise HvbError('Error verifying data: {}'.format(perr)) 434 if pout.decode().strip() != 'Verified OK': 435 sys.stderr.write('Signature not correct\n') 436 return False 437 return True 438 439 def verify_digest(self, image): 440 image_size = self.img_org_len 441 hash_algo = self.HASH_ALGORITHMS.get(self.hash_algo) 442 salt = self.salt[0] 443 if self.verity_type == 'hash': 444 image.seek(0) 445 image_content = image.read(image_size) 446 hasher = hashlib.new(hash_algo, salt) 447 hasher.update(image_content) 448 digest = hasher.digest() 449 elif self.verity_type == 'hashtree': 450 hashtree_hasher = hashlib.new(hash_algo, salt) 451 digest_size = len(hashtree_hasher.digest()) 452 digest_padding = 2 ** ((digest_size - 1).bit_length()) - digest_size 453 remainder = image.block_size - image.img_size % image.block_size 454 if remainder != image.block_size: 455 image.append_raw(b'\0' * remainder) 456 level_offsets, tree_size = calc_hashtree_size(image_size, image.block_size, digest_size) 457 if hash_algo == 'SM3': 458 digest, hashtree = generate_sm3_hash_tree(image, image_size, image.block_size, 459 hash_algo, level_offsets, tree_size) 460 else: 461 digest, hashtree = generate_sha_hash_tree(image, image_size, image.block_size, 462 hash_algo, salt, level_offsets, tree_size) 463 else: 464 print("[hvbtool][ERROR]: Invalid verity_type: %d", self.vertype) 465 return False 466 if self.digest[0] and digest != self.digest[0]: 467 return False 468 return True 469 470 471class SMPublicKey(object): 472 def __init__(self, pubkey): 473 self.privkey = pubkey 474 self.pubkey = b'' 475 476 cmds = ['openssl', 'ec', '-pubin', '-in', pubkey, '-text'] 477 process = subprocess.Popen(cmds, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 478 try: 479 (out, err) = process.communicate(timeout=10) 480 except subprocess.TimeoutExpired: 481 process.kill() 482 raise HvbError("Get public key timeout!") 483 if process.wait() != 0: 484 raise HvbError("Failed to get public key: {}".format(err)) 485 486 self.key_data = out.split(b'\n') 487 self.pubkey_idx = self.key_data.index(b'pub:') + 1 488 489 def get_public_key(self): 490 for i in range(self.pubkey_idx, self.pubkey_idx + 5): # The size of public key is 64 bytes, occupies 5 lines. 491 self.pubkey += b''.join(self.key_data[i].strip().split(b':')) 492 493 return binascii.a2b_hex(self.pubkey[2:]) # The prefix occupies two bytes. 494 495 496class RSAPublicKey(object): 497 MODULUS_PREFIX = b'modulus=' 498 BIT_LENGTH_KEYWORD = b'Public-Key:' 499 500 def __init__(self, pubkey): 501 self.pubkey = pubkey 502 self.modulus_bits = self.get_bit_length(self.pubkey) 503 cmds = ['openssl', 'rsa', '-in', pubkey, '-modulus', '-noout', '-pubin'] 504 process = subprocess.Popen(cmds, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 505 try: 506 (out, err) = process.communicate(timeout=10) 507 except subprocess.TimeoutExpired: 508 process.kill() 509 raise HvbError("Get public key timeout!") 510 if process.wait() != 0: 511 raise HvbError("Failed to get public key: {}".format(err)) 512 513 if not out.lower().startswith(self.MODULUS_PREFIX): 514 raise HvbError('Invalid modulus') 515 516 self.modulus = out[len(self.MODULUS_PREFIX):].strip() 517 self.modulusdata = int(self.modulus, 16) 518 self.exponent = 65537 519 520 def get_bit_length(self, pubkey): 521 bitlen = 0 522 cmd = ['openssl', 'rsa', '-inform', 'PEM', '-in', pubkey, '-pubin', '-text'] 523 child = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 524 lines = child.stdout.read().split(b'\n') 525 for line in lines: 526 if self.BIT_LENGTH_KEYWORD in line: 527 bitlen = int(re.findall(b'\d+', line.split(self.BIT_LENGTH_KEYWORD)[-1])[0]) 528 break 529 return bitlen 530 531 def calc_egcd(self, num1, num2): 532 if num1 == 0: 533 return (num2, 0, 1) 534 egcd_g, egcd_y, egcd_x = self.calc_egcd(num2 % num1, num1) 535 return (egcd_g, egcd_x - (num2 // num1) * egcd_y, egcd_y) 536 537 def calc_modinv(self, num1, modulo): 538 modinv_gcd, modinv_x, _ = self.calc_egcd(num1, modulo) 539 if modinv_gcd != 1: 540 raise HvbError("modular inverse does not exist.") 541 return modinv_x % modulo 542 543 def get_public_key(self): 544 pkey_n0 = 2 ** 64 - self.calc_modinv(self.modulusdata, 2 ** 64) 545 pkey_r = 2 ** self.modulusdata.bit_length() 546 pkey_prr = bytes(encode_long(self.modulus_bits, pkey_r * pkey_r % self.modulusdata)) 547 modulus = bytes(encode_long(self.modulus_bits, self.modulusdata)) 548 549 return struct.pack('!QQ', self.modulus_bits, pkey_n0) + modulus + pkey_prr 550 551 552class HvbError(Exception): 553 def __init__(self, message): 554 print("[HvbError]: " + message) 555 Exception.__init__(self, message) 556 557 558class ImageChunk(object): 559 CHUNK_HEADER_FORMAT = '<2H2I' 560 CHUNK_HEADER_SIZE = struct.calcsize(CHUNK_HEADER_FORMAT) 561 CHUNK_TYPE_RAW = 0xcac1 562 CHUNK_TYPE_FILL = 0xcac2 563 CHUNK_TYPE_DONT_CARE = 0xcac3 564 CHUNK_TYPE_CRC32 = 0xcac4 565 566 def __init__(self, chunk_type, chunk_offset, nsparsed_output_offset, 567 nsparsed_output_size, input_offset, fill_data): 568 """Initializes an ImageChunk object. 569 570 Arguments: 571 chunk_type: One of TYPE_RAW, TYPE_FILL, or TYPE_DONT_CARE. 572 chunk_offset: Offset in the sparse file where this chunk begins. 573 output_offset: Offset in de-sparsified file. 574 output_size: Number of bytes in output. 575 input_offset: Offset in sparse file where the chunk data begins if TYPE_RAW otherwise None. 576 fill_data: Blob as bytes with data to fill if TYPE_FILL otherwise None. 577 """ 578 579 self.chunk_type = chunk_type 580 self.chunk_offset = chunk_offset 581 self.nsparsed_chunk_offset = nsparsed_output_offset 582 self.nsparsed_output_size = nsparsed_output_size 583 self.sparsed_input_offset = input_offset 584 self.fill_data = fill_data 585 # Check invariants. 586 if self.chunk_type == self.CHUNK_TYPE_RAW: 587 if self.fill_data is not None: 588 raise HvbError('RAW chunk cannot have fill_data set.') 589 if not self.sparsed_input_offset: 590 raise HvbError('RAW chunk must have input_offset set.') 591 elif self.chunk_type == self.CHUNK_TYPE_FILL: 592 if self.fill_data is None: 593 raise HvbError('FILL chunk must have fill_data set.') 594 if self.sparsed_input_offset: 595 raise HvbError('FILL chunk cannot have input_offset set.') 596 elif self.chunk_type == self.CHUNK_TYPE_DONT_CARE: 597 if self.fill_data is not None: 598 raise HvbError('DONT_CARE chunk cannot have fill_data set.') 599 if self.sparsed_input_offset: 600 raise HvbError('DONT_CARE chunk cannot have input_offset set.') 601 else: 602 raise HvbError('Invalid chunk type') 603 604 605class ImageHandle(object): 606 #Descriptions of sparse image 607 SIMAGE_MAGIC = 0xed26ff3a 608 SIMAGE_HEADER_FORMAT = '<I4H4I' 609 SIMAGE_HEADER_SIZE = struct.calcsize(SIMAGE_HEADER_FORMAT) 610 611 def __init__(self, file_name): 612 self.image_file = file_name 613 self.is_sparse = False 614 self.block_size = 4096 # A block size is 4096 bytes. 615 self.total_blks = 0 616 self.total_chunks = 0 617 self.header_analyze() 618 619 def header_analyze(self): 620 self.img_handler = open(self.image_file, 'r+b') 621 self.img_handler.seek(0, os.SEEK_END) 622 self.img_size = self.img_handler.tell() 623 print("Initial image length: ", self.img_size) 624 625 self.img_handler.seek(0, os.SEEK_SET) 626 header = self.img_handler.read(self.SIMAGE_HEADER_SIZE) 627 628 """ Sparse header 629 magic 0xed26ff3a 630 major_version (0x1) - reject images with higher major versions 631 minor_version (0x0) - allow images with higer minor versions 632 file_hdr_sz 28 bytes for first revision of the file format 633 chunk_hdr_sz 12 bytes for first revision of the file format 634 blk_sz block size in bytes, must be a multiple of 4 (4096) 635 total_blks total blocks in the non-sparse output image 636 total_chunks total chunks in the sparse input image 637 image_checksum CRC32 checksum of the original data, counting "don't care 638 as 0. Standard 802.3 polynomial, use a Public Domain 639 table implementation 640 """ 641 (magic, major_version, minor_version, file_hdr_sz, 642 chunk_hdr_sz, block_size, self.total_blks, self.total_chunks, 643 img_checksum) = struct.unpack(self.SIMAGE_HEADER_FORMAT, header) 644 if magic != self.SIMAGE_MAGIC: 645 return 646 647 self.block_size = block_size 648 print("It's a sparse image.") 649 if self.SIMAGE_HEADER_SIZE != file_hdr_sz: 650 raise HvbError("Incorrect sparse image header size: {}".format(file_hdr_sz)) 651 if ImageChunk.CHUNK_HEADER_SIZE != chunk_hdr_sz: 652 raise HvbError("Incorrect chunk header size: {}".format(chunk_hdr_sz)) 653 654 self.chunks = list() 655 nsparsed_output_offset, nsparsed_chunk_nums = self.chunk_analyze() 656 self.sparse_end = self.img_handler.tell() 657 658 if self.total_blks != nsparsed_chunk_nums: 659 raise HvbError("The header said we should have {} output blocks, " 660 'but we saw {}'.format(self.total_blks, nsparsed_chunk_nums)) 661 if self.sparse_end != self.img_size: 662 raise HvbError("There were {} bytes of extra data at the end of the " 663 "file.".format(self.img_size - self.sparse_end)) 664 665 self.nsparsed_chunk_offset_list = [c.nsparsed_chunk_offset for c in self.chunks] 666 self.is_sparse = True 667 self.img_size = nsparsed_output_offset 668 669 def chunk_analyze(self): 670 nsparsed_output_offset = 0 671 nsparsed_chunk_nums = 0 672 673 for i in range(self.total_chunks): 674 chunk_offset = self.img_handler.tell() 675 676 """ chunk header 677 chunk_type Type of this chunk (Raw, Fill, Dont care, CRC32) 678 chunk_sz Size of the chunk before the sparse(The unit is blk_sz, that is, 4096) 679 total_sz The size of the chunk after sparse, includes chunk_header and chunk data. 680 """ 681 chunk_header = self.img_handler.read(ImageChunk.CHUNK_HEADER_SIZE) 682 (chunk_type, _, chunk_sz, total_sz) = struct.unpack(ImageChunk.CHUNK_HEADER_FORMAT, chunk_header) 683 chunk_data_size = total_sz - ImageChunk.CHUNK_HEADER_SIZE 684 685 if chunk_type == ImageChunk.CHUNK_TYPE_RAW: 686 if chunk_data_size != (chunk_sz * self.block_size): 687 raise HvbError("Raw chunk size ({}) does not match output " 688 "size ({})".format(chunk_data_size, (chunk_sz * self.block_size))) 689 self.chunks.append(ImageChunk(ImageChunk.CHUNK_TYPE_RAW, chunk_offset, 690 nsparsed_output_offset, chunk_sz * self.block_size, 691 self.img_handler.tell(), None)) 692 self.img_handler.seek(chunk_data_size, os.SEEK_CUR) 693 elif chunk_type == ImageChunk.CHUNK_TYPE_FILL: 694 if chunk_data_size != 4: 695 raise HvbError("Fill chunk should have 4 bytes of fill, but this " 696 "has {}".format(chunk_data_size)) 697 fill_data = self.img_handler.read(4) 698 self.chunks.append(ImageChunk(ImageChunk.CHUNK_TYPE_FILL, chunk_offset, 699 nsparsed_output_offset, chunk_sz * self.block_size, 700 None, fill_data)) 701 elif chunk_type == ImageChunk.CHUNK_TYPE_DONT_CARE: 702 if chunk_data_size != 0: 703 raise HvbError("Don\'t care chunk input size is non-zero ({})". 704 format(chunk_data_size)) 705 self.chunks.append(ImageChunk(ImageChunk.CHUNK_TYPE_DONT_CARE, chunk_offset, 706 nsparsed_output_offset, chunk_sz * self.block_size, 707 None, None)) 708 elif chunk_type == ImageChunk.CHUNK_TYPE_CRC32: 709 if chunk_data_size != 4: 710 raise HvbError("CRC32 chunk should have 4 bytes of CRC, but " 711 "this has {}".format(chunk_data_size)) 712 self.img_handler.read(4) 713 else: 714 raise HvbError("Unknown chunk type: {}".format(chunk_type)) 715 716 nsparsed_chunk_nums += chunk_sz 717 nsparsed_output_offset += chunk_sz * self.block_size 718 719 return nsparsed_output_offset, nsparsed_chunk_nums 720 721 def update_chunks_and_blocks(self): 722 """Helper function to update the image header. 723 724 The the |total_chunks| and |total_blocks| fields in the header 725 will be set to value of the |_num_total_blocks| and 726 |_num_total_chunks| attributes. 727 728 """ 729 num_chunks_and_blocks_offset = 16 730 num_chunks_and_blocks_format = '<II' 731 self.img_handler.seek(num_chunks_and_blocks_offset, os.SEEK_SET) 732 self.img_handler.write(struct.pack(num_chunks_and_blocks_format, 733 self.total_blks, self.total_chunks)) 734 735 def append_dont_care(self, num_bytes): 736 """Appends a DONT_CARE chunk to the sparse file. 737 738 The given number of bytes must be a multiple of the block size. 739 740 Arguments: 741 num_bytes: Size in number of bytes of the DONT_CARE chunk. 742 743 """ 744 if num_bytes % self.block_size != 0: 745 raise HvbError("Given number of bytes must be a multiple of the block size.") 746 747 if not self.is_sparse: 748 self.img_handler.seek(0, os.SEEK_END) 749 # This is more efficient that writing NUL bytes since it'll add 750 # a hole on file systems that support sparse files (native 751 # sparse, not OpenHarmony sparse). 752 self.img_handler.truncate(self.img_handler.tell() + num_bytes) 753 self.header_analyze() 754 return 755 756 self.total_chunks += 1 757 self.total_blks += num_bytes // self.block_size 758 self.update_chunks_and_blocks() 759 760 self.img_handler.seek(self.sparse_end, os.SEEK_SET) 761 self.img_handler.write(struct.pack(ImageChunk.CHUNK_HEADER_FORMAT, 762 ImageChunk.CHUNK_TYPE_DONT_CARE, 763 0, # Reserved 764 num_bytes // self.block_size, 765 struct.calcsize(ImageChunk.CHUNK_HEADER_FORMAT))) 766 self.header_analyze() 767 768 def append_raw(self, data): 769 """Appends a RAW chunk to the sparse file. 770 771 The length of the given data must be a multiple of the block size. 772 773 Arguments: 774 data: Data to append as bytes. 775 776 """ 777 print("SELF>BLOCK_SIZE: ", self.block_size) 778 if len(data) % self.block_size != 0: 779 raise HvbError("Given data must be a multiple of the block size.") 780 781 if not self.is_sparse: 782 self.img_handler.seek(0, os.SEEK_END) 783 self.img_handler.write(data) 784 self.header_analyze() 785 return 786 787 self.total_chunks += 1 788 self.total_blks += len(data) // self.block_size 789 self.update_chunks_and_blocks() 790 791 self.img_handler.seek(self.sparse_end, os.SEEK_SET) 792 self.img_handler.write(struct.pack(ImageChunk.CHUNK_HEADER_FORMAT, 793 ImageChunk.CHUNK_TYPE_RAW, 794 0, # Reserved 795 len(data) // self.block_size, 796 len(data) + 797 struct.calcsize(ImageChunk.CHUNK_HEADER_FORMAT))) 798 self.img_handler.write(data) 799 self.header_analyze() 800 801 def append_fill(self, fill_data, size): 802 """Appends a fill chunk to the sparse file. 803 804 The total length of the fill data must be a multiple of the block size. 805 806 Arguments: 807 fill_data: Fill data to append - must be four bytes. 808 size: Number of chunk - must be a multiple of four and the block size. 809 810 """ 811 if len(fill_data) != 4 or size % 4 != 0 or size % self.block_size != 0: 812 raise HvbError("The total length of the fill data must be a multiple of the block size.") 813 814 if not self.is_sparse: 815 self.img_handler.seek(0, os.SEEK_END) 816 self.img_handler.write(fill_data * (size // 4)) 817 self.header_analyze() 818 return 819 820 self.total_chunks += 1 821 self.total_blks += size // self.block_size 822 self.update_chunks_and_blocks() 823 824 self.img_handler.seek(self.sparse_end, os.SEEK_SET) 825 self.img_handler.write(struct.pack(ImageChunk.CHUNK_HEADER_FORMAT, 826 ImageChunk.CHUNK_TYPE_FILL, 827 0, # Reserved 828 size // self.block_size, 829 4 + struct.calcsize(ImageChunk.CHUNK_HEADER_FORMAT))) 830 self.img_handler.write(fill_data) 831 self.header_analyze() 832 833 def seek(self, offset): 834 """Sets the cursor position for reading from unsparsified file. 835 836 Arguments: 837 offset: Offset to seek to from the beginning of the file. 838 839 Raises: 840 RuntimeError: If the given offset is negative. 841 """ 842 if offset < 0: 843 raise RuntimeError('Seeking with negative offset: {}'.format(offset)) 844 self._file_pos = offset 845 846 def read(self, size): 847 """Reads data from the unsparsified file. 848 849 This method may return fewer than |size| bytes of data if the end 850 of the file was encountered. 851 852 The file cursor for reading is advanced by the number of bytes 853 read. 854 855 Arguments: 856 size: Number of bytes to read. 857 858 Returns: 859 The data as bytes. 860 """ 861 if not self.is_sparse: 862 self.img_handler.seek(self._file_pos) 863 data = self.img_handler.read(size) 864 self._file_pos += len(data) 865 return data 866 867 # Iterate over all chunks. 868 chunk_idx = bisect.bisect_right(self.nsparsed_chunk_offset_list, 869 self._file_pos) - 1 870 data = bytearray() 871 to_go = size 872 while to_go > 0: 873 chunk = self.chunks[chunk_idx] 874 chunk_pos_offset = self._file_pos - chunk.nsparsed_chunk_offset 875 chunk_pos_to_go = min(chunk.nsparsed_output_size - chunk_pos_offset, to_go) 876 877 if chunk.chunk_type == ImageChunk.CHUNK_TYPE_RAW: 878 self.img_handler.seek(chunk.sparsed_input_offset + chunk_pos_offset) 879 data.extend(self.img_handler.read(chunk_pos_to_go)) 880 elif chunk.chunk_type == ImageChunk.CHUNK_TYPE_FILL: 881 all_data = chunk.fill_data * (chunk_pos_to_go // len(chunk.fill_data) + 2) 882 offset_mod = chunk_pos_offset % len(chunk.fill_data) 883 data.extend(all_data[offset_mod:(offset_mod + chunk_pos_to_go)]) 884 else: 885 if chunk.chunk_type != ImageChunk.CHUNK_TYPE_DONT_CARE: 886 raise HvbError("Invalid chunk type!") 887 data.extend(b'\0' * chunk_pos_to_go) 888 889 to_go -= chunk_pos_to_go 890 self._file_pos += chunk_pos_to_go 891 chunk_idx += 1 892 # Generate partial read in case of EOF. 893 if chunk_idx >= len(self.nsparsed_chunk_offset_list): 894 break 895 896 return bytes(data) 897 898 def tell(self): 899 """Returns the file cursor position for reading from unsparsified file. 900 901 Returns: 902 The file cursor position for reading. 903 """ 904 return self._file_pos 905 906 def truncate(self, size): 907 """Truncates the unsparsified file. 908 909 Arguments: 910 size: Desired size of unsparsified file. 911 912 Raises: 913 ValueError: If desired size isn't a multiple of the block size. 914 """ 915 916 if not self.is_sparse: 917 self.img_handler.truncate(size) 918 self.header_analyze() 919 return 920 921 if size % self.block_size != 0: 922 raise ValueError('Cannot truncate to a size which is not a multiple ' 923 'of the block size') 924 925 if size == self.img_size: 926 # Trivial where there's nothing to do. 927 return 928 929 if size < self.img_size: 930 chunk_idx = bisect.bisect_right(self.nsparsed_chunk_offset_list, size) - 1 931 chunk = self.chunks[chunk_idx] 932 if chunk.nsparsed_chunk_offset != size: 933 # Truncation in the middle of a trunk - need to keep the chunk 934 # and modify it. 935 chunk_idx_for_update = chunk_idx + 1 936 num_to_keep = size - chunk.nsparsed_chunk_offset 937 if num_to_keep % self.block_size != 0: 938 raise HvbError("Remainder size of bytes must be multiple of the block size.") 939 940 if chunk.chunk_type == ImageChunk.CHUNK_TYPE_RAW: 941 truncate_at = (chunk.nsparsed_chunk_offset + 942 struct.calcsize(ImageChunk.CHUNK_HEADER_FORMAT) + num_to_keep) 943 data_sz = num_to_keep 944 elif chunk.chunk_type == ImageChunk.CHUNK_TYPE_FILL: 945 truncate_at = (chunk.nsparsed_chunk_offset + 946 struct.calcsize(ImageChunk.CHUNK_HEADER_FORMAT) + 4) 947 data_sz = 4 948 elif chunk.chunk_type == ImageChunk.CHUNK_TYPE_DONT_CARE: 949 truncate_at = chunk.nsparsed_chunk_offset + struct.calcsize(ImageChunk.CHUNK_HEADER_FORMAT) 950 data_sz = 0 951 else: 952 raise HvbError("Invalid chunk type.") 953 chunk_sz = num_to_keep // self.block_size 954 total_sz = data_sz + struct.calcsize(ImageChunk.CHUNK_HEADER_FORMAT) 955 self.img_handler.seek(chunk.nsparsed_chunk_offset) 956 self.img_handler.write(struct.pack(ImageChunk.CHUNK_HEADER_FORMAT, 957 chunk.chunk_type, 958 0, # Reserved 959 chunk_sz, total_sz)) 960 chunk.output_size = num_to_keep 961 else: 962 # Truncation at trunk boundary. 963 truncate_at = chunk.chunk_offset 964 chunk_idx_for_update = chunk_idx 965 966 self.total_chunks = chunk_idx_for_update 967 self.total_blks = 0 968 for i in range(0, chunk_idx_for_update): 969 self.total_blks += self.chunks[i].nsparsed_output_size // self.block_size 970 self.update_chunks_and_blocks() 971 self.img_handler.truncate(truncate_at) 972 973 # We've modified the file so re-read all data. 974 self.header_analyze() 975 else: 976 # Truncating to grow - just add a DONT_CARE section. 977 self.append_dont_care(size - self.img_size) 978 979 980class HvbTool(object): 981 MAGIC = b'HVB\0' 982 HVB_VERSION_MAJOR = 1 983 HVB_VERSION_MINOR = 1 984 FOOTER_SIZE = 104 985 VERITY_RESERVED = b'\0' * 36 986 RVT_MAGIC = b'rot\0' 987 988 MAX_RVT_SIZE = 64 * 1024 989 MAX_FOOTER_SIZE = 4096 990 RVT_RESERVED = 60 991 992 993 def __init__(self): 994 self.img = _params['image'] 995 self.partition = _params['partition'] 996 self.partition_size = int(_params['partition_size']) 997 self.vertype = _params['verity_type'].lower() # verity type: hash - 1 or hashtree - 2 (fix) 998 self.algo = _params['algorithm'] 999 self.user_id = _params['userid'] 1000 self.pubkey = _params['pubkey'] 1001 self.privkey = _params['privkey'] 1002 self.block_size = _params['block_size'] 1003 self.padding_size = _params['padding_size'] 1004 self.signing_helper_with_files = _params['signing_helper_with_files'] 1005 self.pubkey_num_per_ptn = int(_params['pubkey_num_per_ptn']) 1006 self.calc_max_image_size = _params['calc_max_image_size'].lower() 1007 self.signed_img = _params['output'] 1008 self.hash_algo = ALGORITHMS[self.algo].hash_algo 1009 1010 if _params['salt'] is not None: 1011 self.salt = binascii.a2b_hex(_params['salt']) 1012 1013 if self.algo == 'SM2' and self.user_id is None: 1014 self.user_id = create_random_data(16) # Set user_id 16 bytes 1015 print("USERID: {}".format(self.user_id)) 1016 1017 self.hashtree = b'' 1018 self.digest = b'' 1019 self.fec = b'' 1020 self.hvb_cert_content = b'' 1021 1022 self.hvb_calc_max_image() 1023 self.original_image_info() 1024 1025 def original_image_info(self): 1026 if self.img is None: 1027 return 1028 1029 self.image_handle = ImageHandle(self.img) 1030 self.original_image_length = self.image_handle.img_size 1031 # Image length algins to 4096 bytes 1032 self.img_len_with_padding = round_to_multiple(self.original_image_length, self.block_size) 1033 print("Image length(%s): %d bytes" % (self.img, self.original_image_length)) 1034 1035 def hvb_header(self): 1036 self.hvb_cert_content = self.MAGIC 1037 self.hvb_cert_content += struct.pack('I', self.HVB_VERSION_MAJOR) 1038 self.hvb_cert_content += struct.pack('I', self.HVB_VERSION_MINOR) 1039 self.hvb_cert_content += self.VERITY_RESERVED 1040 1041 def hvb_image_info(self): 1042 max_partition_name_len = 64 1043 partition_name = (self.partition).encode('utf-8') + b'\0' * (max_partition_name_len - len(self.partition)) 1044 1045 self.hvb_cert_content += struct.pack('Q', self.original_image_length) 1046 self.hvb_cert_content += struct.pack('Q', self.img_len_with_padding) 1047 self.hvb_cert_content += partition_name 1048 self.hvb_cert_content += struct.pack('2Q', int(_params['rollback_location']), int(_params['rollback_index'])) 1049 1050 def image_hash(self): 1051 # 0: SHA256 1: SHA128 2: SHA512 3:SM3 1052 halgo = HASH_ALGORITHM[ALGORITHMS[self.algo].hash_algo] 1053 print("HALGO: {}".format(halgo)) 1054 1055 print("digest: {}".format(binascii.b2a_hex(self.digest))) 1056 print("digest size: ", len(self.digest)) 1057 1058 hash_algo = struct.pack('I', halgo) 1059 salt_offset = struct.pack('Q', 240) # 根据HVB证书格式,salt偏移位置固定,为240字节的偏移 1060 salt_size = struct.pack('Q', len(self.salt)) 1061 digest_offset = struct.pack('Q', 240 + len(self.salt)) 1062 digest_size = struct.pack('Q', len(self.digest)) 1063 return hash_algo + salt_offset + salt_size + digest_offset + digest_size 1064 1065 def create_hashtree(self, digest_size): 1066 size = self.image_handle.img_size 1067 1068 level_offsets, tree_size = calc_hashtree_size(size, self.block_size, digest_size) 1069 print("level_offsets: {}, tree_size: {}".format(level_offsets, tree_size)) 1070 if self.hash_algo == 'SM3': 1071 rootdigest, self.hashtree = generate_sm3_hash_tree(self.image_handle, size, self.block_size, level_offsets, tree_size) 1072 else: 1073 rootdigest, self.hashtree = generate_sha_hash_tree(self.image_handle, size, self.block_size, self.hash_algo, 1074 self.salt, level_offsets, tree_size) 1075 if len(self.hashtree) % self.block_size != 0: 1076 self.hashtree += b'\0' * (self.block_size - len(self.hashtree) % self.block_size) 1077 print("root digest: ", binascii.b2a_hex(rootdigest)) 1078 return rootdigest 1079 1080 def image_hash_tree(self): 1081 image_hashtree = {"hashtree_offset": self.img_len_with_padding, \ 1082 "hashtree_size": len(self.hashtree), "data_block_size": self.block_size, \ 1083 "hash_block_size": self.block_size, "fec_num_roots": 0, \ 1084 "fec_offset": 240 + len(self.salt) + len(self.digest), "fec_size": 0} 1085 hashtree_struct = struct.Struct('Q' * len(image_hashtree)) 1086 dlist = [] 1087 for item in image_hashtree: 1088 dlist.append(image_hashtree[item]) 1089 return hashtree_struct.pack(*dlist) 1090 1091 def hvb_hash_info(self): 1092 verity_type = 0 1093 1094 if self.vertype == 'hash': 1095 verity_type = 1 # hash: 1 hashtree: 2 1096 1097 if self.hash_algo == 'SM3': 1098 self.digest = get_sm3_digest(self.img) 1099 else: 1100 self.image_handle.seek(0) 1101 image_content = self.image_handle.read(self.image_handle.img_size) 1102 self.digest = get_sha256_digest(self.hash_algo, self.salt, image_content) 1103 elif self.vertype == 'hashtree': 1104 verity_type = 2 # hash: 1 hashtree: 2 1105 1106 if self.hash_algo == 'SM3': 1107 digest_size = ALGORITHMS[self.algo].hash_bytes 1108 else: 1109 hashtree_hasher = hashlib.new(self.hash_algo, self.salt) 1110 digest_size = len(hashtree_hasher.digest()) 1111 digest_padding = 2 ** ((digest_size - 1).bit_length()) - digest_size 1112 print("hash_algo: {}, salt: {}".format(self.hash_algo, self.salt)) 1113 print("digest_size: {}, digest_padding: {}".format(digest_size, digest_padding)) 1114 remainder = self.block_size - self.image_handle.img_size % self.block_size 1115 if remainder != self.block_size: 1116 self.image_handle.append_raw(b'\0' * remainder) 1117 self.img_len_with_padding = self.image_handle.img_size 1118 self.digest = self.create_hashtree(digest_size + digest_padding) 1119 else: 1120 print("[hvbtool][ERROR]: Invalid verity_type: %d", self.vertype) 1121 sys.exit(1) 1122 1123 imghash = self.image_hash() 1124 imghashtree = self.image_hash_tree() 1125 hashpayload = self.salt + self.digest 1126 1127 hashinfo = struct.pack('I', verity_type) + imghash + imghashtree + hashpayload 1128 self.hashinfo_size = len(hashinfo) 1129 self.hvb_cert_content += hashinfo 1130 1131 def get_sign_command(self, sig_content_bin, to_be_sig_content_bin): 1132 if self.algo == 'SM2': 1133 cmd = [ 1134 'openssl', 'dgst', '-SM3', '-sign', self.privkey, '-sigopt', 'distid:{}'.format(self.user_id), 1135 '-out', sig_content_bin, to_be_sig_content_bin 1136 ] 1137 else: 1138 cmd = [ 1139 'openssl', 'dgst', '-sign', self.privkey, '-sigopt', 'rsa_padding_mode:pss', 1140 '-sigopt', 'rsa_pss_saltlen:{}'.format(len(self.salt)), '-sha256', '-out', 1141 sig_content_bin, to_be_sig_content_bin 1142 ] 1143 1144 return cmd 1145 1146 def hvb_signature_info(self): 1147 sig_content_bin = os.sep.join([os.getcwd(), 'sigcontent.{0}.bin'.format(os.getpid())]) 1148 to_be_sig_content_bin = os.sep.join([os.getcwd(), 'tmp.{0}.bin'.format(os.getpid())]) 1149 # 签名算法 0:SHA256_RSA3072(默认值), 1:SHA256_RSA4096 , 2:SHA256_RSA2048 1150 if self.algo not in ALGORITHMS: 1151 raise HvbError("Unknown algorithm: {}".format(self.algo)) 1152 algo = ALGORITHMS[self.algo] 1153 flags = 0 # 预留的flag标记,默认全为0 1154 user_id_len = 0 1155 userid = b'' 1156 1157 if self.user_id is not None: 1158 user_id_len = len(self.user_id) 1159 userid = self.user_id.encode() 1160 1161 keyblock_offset = 144 + self.hashinfo_size + 112 1162 1163 try: 1164 if self.algo == "SM2": 1165 key = SMPublicKey(self.pubkey) 1166 else: 1167 key = RSAPublicKey(self.pubkey) 1168 except HvbError as err: 1169 sys.exit(1) 1170 keyblock = key.get_public_key() 1171 1172 signature_offset = keyblock_offset + len(keyblock) 1173 sig_length = len(self.hvb_cert_content) + 112 + len(keyblock) 1174 user_id_offset = signature_offset + algo.sig_bytes 1175 1176 print("user_id_offset: {}, user_id_len: {}".format(user_id_offset, user_id_len)) 1177 1178 self.hvb_cert_content += struct.pack('Q', sig_length) + struct.pack('I', algo.sig_algo) \ 1179 + struct.pack('I', flags) + struct.pack('Q', keyblock_offset) \ 1180 + struct.pack('Q', len(keyblock)) + struct.pack('Q', signature_offset) \ 1181 + struct.pack('Q', algo.sig_bytes) + struct.pack('Q', user_id_offset) \ 1182 + struct.pack('I', user_id_len) + b'\0' * 52 + keyblock # reserved 52 bytes 1183 1184 if os.path.exists(sig_content_bin): 1185 os.remove(sig_content_bin) 1186 1187 flags = os.O_WRONLY | os.O_CREAT 1188 modes = stat.S_IWUSR | stat.S_IRUSR 1189 with os.fdopen(os.open(to_be_sig_content_bin, flags, modes), 'wb') as tmp_fd: 1190 tmp_fd.write(self.hvb_cert_content) 1191 1192 if self.signing_helper_with_files is not None: 1193 if len(self.salt) != 32: 1194 raise HvbError("Salt len must be 32 bit, but current salt len is {} bit!".format(len(self.salt))) 1195 p = subprocess.Popen([self.signing_helper_with_files, self.algo, 1196 self.pubkey, to_be_sig_content_bin, sig_content_bin]) 1197 ret = p.wait() 1198 if ret != 0: 1199 raise HvbError("Failed to sign the image by {}".format(self.signing_helper_with_files)) 1200 else: 1201 self.get_signature_content(sig_content_bin, to_be_sig_content_bin) 1202 1203 flags = os.O_RDONLY 1204 with os.fdopen(os.open(sig_content_bin, flags, modes), 'rb') as sig_fd: 1205 sigcontent = sig_fd.read() 1206 1207 self.hvb_cert_content += sigcontent + userid 1208 1209 if os.path.exists(sig_content_bin): 1210 os.remove(sig_content_bin) 1211 if os.path.exists(to_be_sig_content_bin): 1212 os.remove(to_be_sig_content_bin) 1213 1214 def get_signature_content(self, sig_content_bin, to_be_sig_content_bin): 1215 cmd = self.get_sign_command(sig_content_bin, to_be_sig_content_bin) 1216 ret = subprocess.call(cmd) 1217 if ret != 0: 1218 raise HvbError("Failed to sign the image.") 1219 1220 if self.algo == "SM2": 1221 flags = os.O_RDWR 1222 with os.fdopen(os.open(sig_content_bin, flags, stat.S_IWUSR | stat.S_IRUSR), 'r+b') as sig_fd: 1223 content = sig_fd.read() 1224 length = content[3] 1225 if length != 32 and length != 33: 1226 raise HvbError("Invalid signature information.") 1227 1228 begin = 4 + length % 32 1229 end = begin + 32 1230 data1 = content[begin : end] 1231 1232 length = content[end + 1] 1233 if length != 32 and length != 33: 1234 raise HvbError("Invalid signature information.") 1235 begin = end + 2 + length % 32 1236 end = begin + 32 1237 data2 = content[begin : end] 1238 1239 data = b''.join([data1, data2]) 1240 sig_fd.seek(0) 1241 sig_fd.truncate() 1242 sig_fd.write(data) 1243 1244 def hvb_footer_info(self): 1245 self.footer = b'' 1246 footer_magic = self.MAGIC + b'\0' * 4 1247 self.partition_size = int(self.partition_size) 1248 1249 cert_size = len(self.hvb_cert_content) 1250 cert_offset = self.img_len_with_padding + len(self.hashtree) 1251 1252 if self.padding_size is not None: 1253 cert_offset = self.partition_size - self.padding_size 1254 1255 print("cert_size: %x, cert_offset: %x, partition_size: %x" % (cert_size, cert_offset, self.partition_size)) 1256 self.footer += footer_magic \ 1257 + struct.pack('4Q', cert_offset, cert_size, self.original_image_length, self.partition_size) \ 1258 + b'\0' * 64 1259 1260 def hvb_make_rvt_image(self): 1261 self.img = 'tmp_{0}_rvt.img'.format(os.getpid()) 1262 rvtcontent = b'' 1263 rvtcontent += self.RVT_MAGIC 1264 verity_num = len(_params['chain_partition']) 1265 if self.pubkey_num_per_ptn != 1 and self.pubkey_num_per_ptn != 2: 1266 print("invalid pubkey_num_per_ptn: %d" % (self.pubkey_num_per_ptn)) 1267 sys.exit(1) 1268 if verity_num % self.pubkey_num_per_ptn != 0: 1269 print("verity_num doesnt match pubkey_num_per_ptn: %d" % (self.pubkey_num_per_ptn)) 1270 sys.exit(1) 1271 rvtcontent += struct.pack('I', verity_num // self.pubkey_num_per_ptn) 1272 rvtcontent += struct.pack('I', self.pubkey_num_per_ptn) + b'\0' * self.RVT_RESERVED # rvt_reversed: 60 bytes 1273 cur_sizes = len(rvtcontent) 1274 1275 for index in range(0, verity_num, self.pubkey_num_per_ptn): 1276 chain_partition_data = _params['chain_partition'][index].split(':') 1277 partition_name = chain_partition_data[0].strip() 1278 pubkey = chain_partition_data[1].strip() 1279 if self.pubkey_num_per_ptn == 2: 1280 chain_partition_data_backup = _params['chain_partition'][index + 1].split(':') 1281 partition_name_backup = chain_partition_data_backup[0].strip() 1282 if partition_name != partition_name_backup: 1283 print("partition name doesnt match for pubkey_num_per_ptn: %d" % (self.pubkey_num_per_ptn)) 1284 sys.exit(1) 1285 pubkey_backup = chain_partition_data_backup[1].strip() 1286 1287 try: 1288 if self.algo == 'SM2': 1289 key = SMPublicKey(pubkey) 1290 if self.pubkey_num_per_ptn == 2: 1291 key_backup = SMPublicKey(pubkey_backup) 1292 else: 1293 key = RSAPublicKey(pubkey) 1294 if self.pubkey_num_per_ptn == 2: 1295 key_backup = RSAPublicKey(pubkey_backup) 1296 except HvbError as err: 1297 sys.exit(1) 1298 pubkey_payload = key.get_public_key() 1299 if self.pubkey_num_per_ptn == 2: 1300 pubkey_payload_backup = key_backup.get_public_key() 1301 pubkey_len = len(pubkey_payload) 1302 pubkey_offset = cur_sizes + 80 1303 rvtcontent += partition_name.encode() + b'\0' * (64 - len(partition_name)) # partition_name 1304 rvtcontent += struct.pack('Q', pubkey_offset) # pubkey_offset 1305 rvtcontent += struct.pack('Q', pubkey_len) # pubkey_len 1306 rvtcontent += pubkey_payload # pubkey_payload 1307 if self.pubkey_num_per_ptn == 2: 1308 rvtcontent += pubkey_payload_backup # pubkey_payload_backup 1309 cur_sizes += 80 + pubkey_len * self.pubkey_num_per_ptn 1310 1311 flags = os.O_WRONLY | os.O_RDONLY | os.O_CREAT 1312 modes = stat.S_IWUSR | stat.S_IRUSR 1313 with os.fdopen(os.open(self.img, flags, modes), 'wb') as rvt_fd: 1314 rvt_fd.write(rvtcontent) 1315 1316 self.original_image_info() 1317 self.hvb_make_image() 1318 1319 if os.path.exists(self.img): 1320 os.remove(self.img) 1321 1322 def hvb_combine_image_info(self): 1323 cert_and_footer = b'' 1324 hashtree_length = len(self.hashtree) 1325 hvb_cert_length = len(self.hvb_cert_content) 1326 image = os.path.abspath(self.img) 1327 signed_image = os.path.abspath(self.signed_img) 1328 1329 shutil.copy(image, signed_image) 1330 image = ImageHandle(signed_image) 1331 1332 padding = round_to_multiple(image.img_size, self.block_size) 1333 if padding > image.img_size: 1334 if image.is_sparse is True: # Original image size must be multiple of block_size 1335 raise HvbError("The sparse image size is not multiple of the block size.") 1336 image.truncate(padding) 1337 1338 padding = round_to_multiple((hvb_cert_length + self.FOOTER_SIZE), self.block_size) 1339 if padding > (hvb_cert_length + self.FOOTER_SIZE): 1340 cert_and_footer = self.hvb_cert_content + b'\0' * (padding - (self.FOOTER_SIZE + hvb_cert_length)) + \ 1341 self.footer 1342 else: 1343 cert_and_footer = self.hvb_cert_content + self.footer 1344 1345 if self.partition_size < image.img_size + hashtree_length + len(cert_and_footer): 1346 raise HvbError("[hvbtool][ERROR]: Partition size is too small!") 1347 1348 cert_and_footer = self.hvb_cert_content + b'\0' * (self.partition_size - image.img_size - \ 1349 hashtree_length - hvb_cert_length - self.FOOTER_SIZE) + self.footer 1350 image.append_raw(self.hashtree) 1351 image.append_raw(cert_and_footer) 1352 if image.img_handler: 1353 image.img_handler.close() 1354 1355 def parse_rvt_image(self, handle): 1356 handle.seek(0) 1357 1358 header = handle.read(12) 1359 magic, verity_num, pubkey_num_per_ptn = struct.unpack('4sII', header) 1360 1361 msg = ["[rvt info]: \n"] 1362 if magic != self.RVT_MAGIC: 1363 raise HvbError("It is not a valid rvt image.") 1364 1365 if pubkey_num_per_ptn == 0: 1366 pubkey_num_per_ptn = 1 1367 msg.append("\tuse old version pubkey_num_per_ptn\n") 1368 msg.append("\tpubkey_num_per_ptn: {}\n".format(pubkey_num_per_ptn)) 1369 1370 handle.seek(72) 1371 for i in range(verity_num): 1372 # The size of pubkey_des excludes pubkey_payload is 80 bytes 1373 data = handle.read(80) 1374 name, pubkey_offset, pubkey_len = struct.unpack('64s2Q', data) 1375 msg.append('\n\tChain Partition descriptor: \n') 1376 msg.append('\t\tPartition Name: {}\n'.format(name.decode())) 1377 pubkey = handle.read(pubkey_len) 1378 msg.append("\t\tPublic key: {}\n".format(hashlib.sha256(pubkey).hexdigest())) 1379 if pubkey_num_per_ptn == 2: 1380 pubkey_backup = handle.read(pubkey_len) 1381 msg.append("\t\tBackup Public key: {}\n".format(hashlib.sha256(pubkey_backup).hexdigest())) 1382 1383 print(''.join(msg)) 1384 1385 def hvb_calc_max_image(self): 1386 if self.calc_max_image_size == "true": 1387 max_metadata_size = self.MAX_RVT_SIZE + self.MAX_FOOTER_SIZE 1388 1389 if self.vertype == 'hashtree': 1390 hashtree_hasher = hashlib.new(self.hash_algo, self.salt) 1391 digest_size = len(hashtree_hasher.digest()) 1392 digest_padding = 2 ** ((digest_size - 1).bit_length()) - digest_size 1393 1394 _, hashtree_size = calc_hashtree_size(self.partition_size, self.block_size, digest_size) 1395 max_metadata_size += hashtree_size 1396 1397 max_image_size = self.partition_size - max_metadata_size 1398 print("\ncalc_max_image_size: {}".format(max_image_size)) 1399 sys.exit(0) 1400 1401 def hvb_make_image(self): 1402 self.hvb_header() 1403 self.hvb_image_info() 1404 self.hvb_hash_info() 1405 self.hvb_signature_info() 1406 self.hvb_footer_info() 1407 self.hvb_combine_image_info() 1408 1409 def hvb_parse_image(self): 1410 try: 1411 image = ImageHandle(self.img) 1412 image.seek(self.original_image_length - self.FOOTER_SIZE) 1413 footer_bin = image.read(self.FOOTER_SIZE) 1414 footer = HvbFooter(footer_bin) 1415 footer.info_footer() 1416 1417 image.seek(footer.certoffset) 1418 cert_bin = image.read(footer.certsize) 1419 cert = HvbCert(cert_bin) 1420 cert.info_cert() 1421 1422 if 'rvt' in cert.partition.decode(): 1423 self.parse_rvt_image(image) 1424 except (HvbError, struct.error): 1425 raise HvbError("Failed to parse the image!") 1426 finally: 1427 if image.img_handler: 1428 image.img_handler.close() 1429 1430 def hvb_erase_image(self): 1431 try: 1432 image = ImageHandle(self.img) 1433 image.seek(self.original_image_length - self.FOOTER_SIZE) 1434 footer_bin = image.read(self.FOOTER_SIZE) 1435 footer = HvbFooter(footer_bin) 1436 image.seek(0) 1437 image.truncate(footer.imagesize) 1438 except (HvbError, struct.error): 1439 raise HvbError("Failed to erase image.") 1440 finally: 1441 if image.img_handler: 1442 image.img_handler.close() 1443 1444 def hvb_verify_image(self): 1445 try: 1446 key_blob = RSAPublicKey(self.pubkey).get_public_key() 1447 print('Verifying image {} using key at {}'.format(self.img, self.pubkey)) 1448 except HvbError as err: 1449 print('Pubkey {} is invalid!'.format(self.pubkey)) 1450 sys.exit(1) 1451 1452 try: 1453 image = ImageHandle(self.img) 1454 image.seek(self.original_image_length - self.FOOTER_SIZE) 1455 footer_bin = image.read(self.FOOTER_SIZE) 1456 footer = HvbFooter(footer_bin) 1457 1458 image.seek(footer.certoffset) 1459 cert_bin = image.read(footer.certsize) 1460 cert = HvbCert(cert_bin) 1461 if key_blob and key_blob != cert.key: 1462 raise HvbError('Embedded public key does not match given key.') 1463 if not cert.verify_signature(self.pubkey): 1464 raise HvbError('Failed check signature for {}!'.format(self.img)) 1465 if not cert.verify_digest(image): 1466 raise HvbError('Failed check digest for {}!'.format(self.img)) 1467 print('Successfully verified hvb cert for {}!'.format(self.img)) 1468 except (HvbError, struct.error): 1469 raise HvbError("Failed to verify image.") 1470 finally: 1471 if image.img_handler: 1472 image.img_handler.close() 1473 1474 1475def print_help(): 1476 print("usage: hvbtool.py [-h]") 1477 print("\t\t{make_hash_footer, make_hashtree_footer, make_rvt_image, parse_image, erase_image}") 1478 1479 1480def parse_arguments(argvs): 1481 global _params 1482 length = len(argvs) 1483 i = 0 1484 args = list() 1485 1486 if length % 2 != 0: 1487 print_help() 1488 print("[hvbtool][ERROR]: invalid argument format!") 1489 sys.exit(1) 1490 1491 while (i < length): 1492 args.append([argvs[i], argvs[i + 1]]) 1493 i = i + 2 1494 1495 act = args[0][1] 1496 if act.strip() in VERITY_TYPE.keys(): 1497 _params['verity_type'] = VERITY_TYPE[act] 1498 else: 1499 _params['verity_type'] = 'hash' 1500 1501 for item in args[1:]: 1502 itemkey = item[0].strip()[2:] 1503 if itemkey in _params.keys(): 1504 if itemkey == "chain_partition": 1505 _params[itemkey].append(item[1].strip()) 1506 else: 1507 _params[itemkey] = item[1].strip() 1508 else: 1509 print("[hvbtool][ERROR]: Unknown argument: %s" % item[0]) 1510 sys.exit(1) 1511 1512 if _params['salt'] is None: 1513 _params['salt'] = create_random_data() 1514 return act 1515 1516 1517def necessary_arguments_check(check_list): 1518 for item in check_list: 1519 if _params[item] is None or len(_params[item]) == 0: 1520 print("[hvbtool][ERROR]: The argument '{}' is necessary.".format(item)) 1521 return False 1522 1523 return True 1524 1525 1526def check_privkey_arg(act): 1527 if (act == 'make_hash_footer') or (act == 'make_hashtree_footer') or (act == 'make_rvt_image'): 1528 if _params['privkey'] is None and _params['signing_helper_with_files'] is None: 1529 print("[hvbtool][ERROR]: The argument 'privkey' is necessary.") 1530 sys.exit(1) 1531 1532 1533def check_arguments(act): 1534 make_image_arguments_list = ['image', 'algorithm', 'pubkey', 'rollback_index', 'rollback_location'] 1535 make_rvt_image_arguments_list = ['algorithm', 'pubkey', 'rollback_index', 'rollback_location', 'chain_partition'] 1536 parse_erase_image_arguments_list = ['image'] 1537 verify_image_arguments_list = ['image', 'pubkey'] 1538 ret = False 1539 1540 if act == 'make_hash_footer' or act == 'make_hashtree_footer': 1541 ret = necessary_arguments_check(make_image_arguments_list) 1542 elif act == 'make_rvt_image': 1543 ret = necessary_arguments_check(make_rvt_image_arguments_list) 1544 elif act == 'parse_image' or act == 'erase_image': 1545 ret = necessary_arguments_check(parse_erase_image_arguments_list) 1546 elif act == 'verify_image': 1547 ret = necessary_arguments_check(verify_image_arguments_list) 1548 else: 1549 print("[hvbtool][ERROR]: Unkown action: {}".format(act)) 1550 1551 if ret is False: 1552 sys.exit(1) 1553 1554 check_privkey_arg(act) 1555 1556 1557def main(obj, act): 1558 if act == 'make_hash_footer' or act == 'make_hashtree_footer': 1559 obj.hvb_make_image() 1560 elif act == 'parse_image': 1561 obj.hvb_parse_image() 1562 elif act == 'make_rvt_image': 1563 obj.hvb_make_rvt_image() 1564 elif act == 'erase_image': 1565 obj.hvb_erase_image() 1566 elif act == 'verify_image': 1567 obj.hvb_verify_image() 1568 else: 1569 raise HvbError("Unknown action: {}".format(act)) 1570 1571if __name__ == '__main__': 1572 action = parse_arguments(sys.argv) 1573 check_arguments(action) 1574 tool = HvbTool() 1575 try: 1576 main(tool, action) 1577 except (HvbError, struct.error): 1578 print("HVB COMMAND FAILED!") 1579 sys.exit(1) 1580 finally: 1581 if tool.image_handle.img_handler: 1582 tool.image_handle.img_handler.close() 1583 sys.exit(0) 1584