1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2022 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16""" 17Description : Generate the update.bin file 18""" 19import os 20import struct 21import hashlib 22import subprocess 23from log_exception import UPDATE_LOGGER 24from utils import OPTIONS_MANAGER 25from create_hashdata import HashType 26from create_hashdata import CreateHash 27from create_hashdata import HASH_BLOCK_SIZE 28from cryptography.hazmat.primitives import serialization 29from cryptography.hazmat.primitives import hashes 30from cryptography.hazmat.backends import default_backend 31from cryptography.hazmat.primitives.asymmetric import padding 32 33UPGRADE_FILE_HEADER_LEN = 180 34UPGRADE_RESERVE_LEN = 16 35SIGN_SHA256_LEN = 256 36SIGN_SHA384_LEN = 384 37UPGRADE_SIGNATURE_LEN = SIGN_SHA256_LEN + SIGN_SHA384_LEN 38TLV_SIZE = 4 39UPGRADE_PKG_HEADER_SIZE = 136 40UPGRADE_PKG_TIME_SIZE = 32 41UPGRADE_COMPINFO_SIZE = 71 42UPGRADE_COMPINFO_SIZE_L2 = 87 43COMPONENT_ADDR_SIZE = 16 44COMPONENT_ADDR_SIZE_L2 = 32 45COMPONENT_INFO_FMT_SIZE = 5 46COMPONENT_VERSION_SIZE = 10 47COMPONENT_SIZE_FMT_SIZE = 8 48COMPONENT_DIGEST_SIZE = 32 49BLOCK_SIZE = 8192 50HEADER_TLV_TYPE = 0x11 51HEADER_TLV_TYPE_L2 = 0x01 52# signature algorithm 53SIGN_ALGO_RSA = "SHA256withRSA" 54SIGN_ALGO_PSS = "SHA256withPSS" 55 56""" 57Format 58H: unsigned short 59I: unsigned int 60B: unsigned char 61s: char[] 62""" 63TLV_FMT = "2H" 64UPGRADE_PKG_HEADER_FMT = "2I64s64s" 65UPGRADE_PKG_TIME_FMT = "16s16s" 66COMPONENT_INFO_FMT = "H3B" 67COMPONENT_SIZE_FMT = "iI" 68 69 70class CreatePackage(object): 71 """ 72 Create the update.bin file 73 """ 74 75 def __init__(self, head_list, component_list, save_path, key_path): 76 self.head_list = head_list 77 self.component_list = component_list 78 self.save_path = save_path 79 self.key_path = key_path 80 self.compinfo_offset = 0 81 self.component_offset = 0 82 self.sign_offset = 0 83 self.hash_info_offset = 0 84 85 if OPTIONS_MANAGER.not_l2: 86 self.upgrade_compinfo_size = UPGRADE_COMPINFO_SIZE 87 self.header_tlv_type = HEADER_TLV_TYPE 88 else: 89 self.upgrade_compinfo_size = UPGRADE_COMPINFO_SIZE_L2 90 self.header_tlv_type = HEADER_TLV_TYPE_L2 91 92 def verify_param(self): 93 if self.head_list is None or self.component_list is None or \ 94 self.save_path is None or self.key_path is None: 95 UPDATE_LOGGER.print_log("Check param failed!", UPDATE_LOGGER.ERROR_LOG) 96 return False 97 if os.path.isdir(self.key_path): 98 UPDATE_LOGGER.print_log("Invalid keyname", UPDATE_LOGGER.ERROR_LOG) 99 return False 100 if self.head_list.__sizeof__() <= 0 or self.component_list.__sizeof__() <= 0: 101 UPDATE_LOGGER.print_log("Invalid param", UPDATE_LOGGER.ERROR_LOG) 102 return False 103 return True 104 105 def write_pkginfo(self, package_file): 106 try: 107 # Type is 1 for package header in TLV format 108 header_tlv = struct.pack(TLV_FMT, self.header_tlv_type, UPGRADE_PKG_HEADER_SIZE) 109 pkg_info_length = \ 110 UPGRADE_RESERVE_LEN + TLV_SIZE + TLV_SIZE + TLV_SIZE + \ 111 UPGRADE_PKG_HEADER_SIZE + UPGRADE_PKG_TIME_SIZE + \ 112 self.upgrade_compinfo_size * self.head_list.entry_count 113 upgrade_pkg_header = struct.pack( 114 UPGRADE_PKG_HEADER_FMT, pkg_info_length, self.head_list.update_file_version, 115 self.head_list.product_update_id, self.head_list.software_version) 116 117 # Type is 2 for time in TLV format 118 time_tlv = struct.pack(TLV_FMT, 0x02, UPGRADE_PKG_TIME_SIZE) 119 upgrade_pkg_time = struct.pack( 120 UPGRADE_PKG_TIME_FMT, self.head_list.date, self.head_list.time) 121 122 # Type is 5 for component in TLV format 123 component_tlv = struct.pack( 124 TLV_FMT, 0x05, self.upgrade_compinfo_size * self.head_list.entry_count) 125 except struct.error: 126 UPDATE_LOGGER.print_log("Pack fail!", log_type=UPDATE_LOGGER.ERROR_LOG) 127 return False 128 129 # write pkginfo 130 pkginfo = header_tlv + upgrade_pkg_header + time_tlv + upgrade_pkg_time + component_tlv 131 try: 132 package_file.write(pkginfo) 133 except IOError: 134 UPDATE_LOGGER.print_log("write fail!", log_type=UPDATE_LOGGER.ERROR_LOG) 135 return False 136 UPDATE_LOGGER.print_log("Write package header complete") 137 return True 138 139 def write_component_info(self, component, package_file): 140 UPDATE_LOGGER.print_log("component information StartOffset:%s"\ 141 % self.compinfo_offset) 142 if OPTIONS_MANAGER.not_l2: 143 component_addr_size = COMPONENT_ADDR_SIZE 144 else: 145 component_addr_size = COMPONENT_ADDR_SIZE_L2 146 147 try: 148 package_file.seek(self.compinfo_offset) 149 package_file.write(component.component_addr) 150 self.compinfo_offset += component_addr_size 151 152 package_file.seek(self.compinfo_offset) 153 component_info = struct.pack( 154 COMPONENT_INFO_FMT, component.id, component.res_type, 155 component.flags, component.type) 156 package_file.write(component_info) 157 self.compinfo_offset += COMPONENT_INFO_FMT_SIZE 158 159 package_file.seek(self.compinfo_offset) 160 package_file.write(component.version) 161 self.compinfo_offset += COMPONENT_VERSION_SIZE 162 163 package_file.seek(self.compinfo_offset) 164 component_size = struct.pack( 165 COMPONENT_SIZE_FMT, component.size, component.original_size) 166 package_file.write(component_size) 167 self.compinfo_offset += COMPONENT_SIZE_FMT_SIZE 168 169 package_file.seek(self.compinfo_offset) 170 package_file.write(component.digest) 171 self.compinfo_offset += COMPONENT_DIGEST_SIZE 172 except (struct.error, IOError): 173 return False 174 return True 175 176 def write_component(self, component, package_file): 177 UPDATE_LOGGER.print_log("Add component to package StartOffset:%s"\ 178 % self.component_offset) 179 try: 180 with open(component.file_path, "rb") as component_file: 181 component_data = component_file.read() 182 package_file.seek(self.component_offset) 183 package_file.write(component_data) 184 component_len = len(component_data) 185 self.component_offset += component_len 186 except IOError: 187 return False 188 UPDATE_LOGGER.print_log("Write component complete ComponentSize:%s"\ 189 % component_len) 190 return True 191 192 def calculate_hash(self, package_file): 193 hash_sha256 = hashlib.sha256() 194 remain_len = self.component_offset 195 196 package_file.seek(0) 197 while remain_len > BLOCK_SIZE: 198 hash_sha256.update(package_file.read(BLOCK_SIZE)) 199 remain_len -= BLOCK_SIZE 200 if remain_len > 0: 201 hash_sha256.update(package_file.read(remain_len)) 202 return hash_sha256.digest() 203 204 def calculate_header_hash(self, package_file): 205 hash_sha256 = hashlib.sha256() 206 remain_len = self.hash_info_offset 207 208 package_file.seek(0) 209 while remain_len > BLOCK_SIZE: 210 hash_sha256.update(package_file.read(BLOCK_SIZE)) 211 remain_len -= BLOCK_SIZE 212 if remain_len > 0: 213 hash_sha256.update(package_file.read(remain_len)) 214 return hash_sha256.digest() 215 216 def sign_digest_with_pss(self, digest): 217 try: 218 with open(self.key_path, 'rb') as f_r: 219 key_data = f_r.read() 220 private_key = serialization.load_pem_private_key( 221 key_data, 222 password=None, 223 backend=default_backend()) 224 225 pad = padding.PSS( 226 mgf=padding.MGF1(hashes.SHA256()), 227 salt_length=padding.PSS.MAX_LENGTH) 228 229 signature = private_key.sign(digest, pad, hashes.SHA256()) 230 except (OSError, ValueError): 231 return False 232 return signature 233 234 def sign_digest(self, digest): 235 try: 236 with open(self.key_path, 'rb') as f_r: 237 key_data = f_r.read() 238 private_key = serialization.load_pem_private_key( 239 key_data, 240 password=None, 241 backend=default_backend()) 242 signature = private_key.sign(digest, padding.PKCS1v15(), hashes.SHA256()) 243 except (OSError, ValueError): 244 return False 245 return signature 246 247 def sign(self, sign_algo): 248 with open(self.save_path, "rb+") as package_file: 249 # calculate hash for .bin package 250 digest = self.calculate_hash(package_file) 251 if not digest: 252 UPDATE_LOGGER.print_log("calculate hash for .bin package failed", 253 log_type=UPDATE_LOGGER.ERROR_LOG) 254 return False 255 256 # sign .bin package 257 if sign_algo == SIGN_ALGO_RSA: 258 signature = self.sign_digest(digest) 259 elif sign_algo == SIGN_ALGO_PSS: 260 signature = self.sign_digest_with_pss(digest) 261 else: 262 UPDATE_LOGGER.print_log("invalid sign_algo!", log_type=UPDATE_LOGGER.ERROR_LOG) 263 return False 264 if not signature: 265 UPDATE_LOGGER.print_log("sign .bin package failed!", log_type=UPDATE_LOGGER.ERROR_LOG) 266 return False 267 268 if len(signature) == SIGN_SHA384_LEN: 269 self.sign_offset += SIGN_SHA256_LEN 270 271 # write signed .bin package 272 package_file.seek(self.sign_offset) 273 package_file.write(signature) 274 UPDATE_LOGGER.print_log( 275 ".bin package signing success! SignOffset: %s" % self.sign_offset) 276 return True 277 278 def sign_header(self, sign_algo, hash_check_data, package_file): 279 # calculate hash for .bin package 280 digest = self.calculate_header_hash(package_file) 281 if not digest: 282 UPDATE_LOGGER.print_log("calculate hash for .bin package failed", 283 log_type=UPDATE_LOGGER.ERROR_LOG) 284 return False 285 286 # sign .bin header 287 if sign_algo == SIGN_ALGO_RSA: 288 signature = self.sign_digest(digest) 289 elif sign_algo == SIGN_ALGO_PSS: 290 signature = self.sign_digest_with_pss(digest) 291 else: 292 UPDATE_LOGGER.print_log("invalid sign_algo!", log_type=UPDATE_LOGGER.ERROR_LOG) 293 return False 294 if not signature: 295 UPDATE_LOGGER.print_log("sign .bin package failed!", log_type=UPDATE_LOGGER.ERROR_LOG) 296 return False 297 298 # write signed .bin header 299 hash_check_data.write_signdata(signature) 300 package_file.seek(self.hash_info_offset) 301 package_file.write(hash_check_data.signdata) 302 self.hash_info_offset += len(hash_check_data.signdata) 303 UPDATE_LOGGER.print_log( 304 ".bin package header signing success! SignOffset: %s" % self.hash_info_offset) 305 return True 306 307 def create_package(self): 308 """ 309 Create the update.bin file 310 return: update package creation result 311 """ 312 if not self.verify_param(): 313 UPDATE_LOGGER.print_log("verify param failed!", UPDATE_LOGGER.ERROR_LOG) 314 return False 315 316 hash_check_data = CreateHash(HashType.SHA256, self.head_list.entry_count) 317 hash_check_data.write_hashinfo() 318 package_fd = os.open(self.save_path, os.O_RDWR | os.O_CREAT, 0o755) 319 with os.fdopen(package_fd, "wb+") as package_file: 320 # Add information to package 321 if not self.write_pkginfo(package_file): 322 UPDATE_LOGGER.print_log( 323 "Write pkginfo failed!", log_type=UPDATE_LOGGER.ERROR_LOG) 324 return False 325 # Add component to package 326 self.compinfo_offset = UPGRADE_FILE_HEADER_LEN 327 self.component_offset = UPGRADE_FILE_HEADER_LEN + \ 328 self.head_list.entry_count * self.upgrade_compinfo_size + \ 329 UPGRADE_RESERVE_LEN + SIGN_SHA256_LEN + SIGN_SHA384_LEN 330 for i in range(0, self.head_list.entry_count): 331 UPDATE_LOGGER.print_log("Add component %s" 332 % self.component_list[i].component_addr) 333 if not self.write_component_info(self.component_list[i], package_file): 334 UPDATE_LOGGER.print_log("write component info failed: %s" 335 % self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG) 336 return False 337 if OPTIONS_MANAGER.sd_card and (not hash_check_data.write_component_hash_data(self.component_list[i])): 338 UPDATE_LOGGER.print_log("write component hash data failed: %s" 339 % self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG) 340 return False 341 342 try: 343 # Add descriptPackageId to package 344 package_file.seek(self.compinfo_offset) 345 package_file.write( 346 (self.head_list.describe_package_id.decode().ljust(UPGRADE_RESERVE_LEN, "\0")).encode()) 347 except IOError: 348 UPDATE_LOGGER.print_log( 349 "Add descriptPackageId failed!", log_type=UPDATE_LOGGER.ERROR_LOG) 350 return False 351 352 self.hash_info_offset = self.compinfo_offset + UPGRADE_RESERVE_LEN 353 if OPTIONS_MANAGER.sd_card: 354 try: 355 # Add hash check data to package 356 hash_check_data.write_hashdata() 357 package_file.seek(self.hash_info_offset) 358 package_file.write(hash_check_data.hashinfo_value + hash_check_data.hashdata) 359 self.hash_info_offset += len(hash_check_data.hashinfo_value + hash_check_data.hashdata) 360 361 except IOError: 362 UPDATE_LOGGER.print_log( 363 "Add hash check data failed!", log_type=UPDATE_LOGGER.ERROR_LOG) 364 return False 365 self.sign_header(SIGN_ALGO_RSA, hash_check_data, package_file) 366 self.component_offset = self.hash_info_offset 367 for i in range(0, self.head_list.entry_count): 368 if not self.write_component(self.component_list[i], package_file): 369 UPDATE_LOGGER.print_log("write component failed: %s" 370 % self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG) 371 return False 372 373 UPDATE_LOGGER.print_log("Write update package complete") 374 return True