1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2022 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16""" 17Description : Generate the update.bin file 18""" 19import os 20import struct 21import hashlib 22import subprocess 23from log_exception import UPDATE_LOGGER 24from utils import OPTIONS_MANAGER 25from cryptography.hazmat.primitives import serialization 26from cryptography.hazmat.primitives import hashes 27from cryptography.hazmat.backends import default_backend 28from cryptography.hazmat.primitives.asymmetric import padding 29 30UPGRADE_FILE_HEADER_LEN = 180 31UPGRADE_RESERVE_LEN = 16 32SIGN_SHA256_LEN = 256 33SIGN_SHA384_LEN = 384 34UPGRADE_SIGNATURE_LEN = SIGN_SHA256_LEN + SIGN_SHA384_LEN 35TLV_SIZE = 4 36UPGRADE_PKG_HEADER_SIZE = 136 37UPGRADE_PKG_TIME_SIZE = 32 38UPGRADE_COMPINFO_SIZE = 71 39UPGRADE_COMPINFO_SIZE_L2 = 87 40COMPONEBT_ADDR_SIZE = 16 41COMPONEBT_ADDR_SIZE_L2 = 32 42COMPONENT_INFO_FMT_SIZE = 5 43COMPONENT_VERSION_SIZE = 10 44COMPONENT_SIZE_FMT_SIZE = 8 45COMPONENT_DIGEST_SIZE = 32 46BLCOK_SIZE = 8192 47HEADER_TLV_TYPE = 0x11 48HEADER_TLV_TYPE_L2 = 0x01 49# signature algorithm 50SIGN_ALGO_RSA = "SHA256withRSA" 51SIGN_ALGO_PSS = "SHA256withPSS" 52 53""" 54Format 55H: unsigned short 56I: unsigned int 57B: unsigned char 58s: char[] 59""" 60TLV_FMT = "2H" 61UPGRADE_PKG_HEADER_FMT = "2I64s64s" 62UPGRADE_PKG_TIME_FMT = "16s16s" 63COMPONENT_INFO_FMT = "H3B" 64COMPONENT_SIZE_FMT = "iI" 65 66 67class CreatePackage(object): 68 """ 69 Create the update.bin file 70 """ 71 72 def __init__(self, head_list, component_list, save_path, key_path): 73 self.head_list = head_list 74 self.component_list = component_list 75 self.save_path = save_path 76 self.key_path = key_path 77 self.compinfo_offset = 0 78 self.component_offset = 0 79 self.sign_offset = 0 80 81 if OPTIONS_MANAGER.not_l2: 82 self.upgrade_compinfo_size = UPGRADE_COMPINFO_SIZE 83 self.header_tlv_type = HEADER_TLV_TYPE 84 else: 85 self.upgrade_compinfo_size = UPGRADE_COMPINFO_SIZE_L2 86 self.header_tlv_type = HEADER_TLV_TYPE_L2 87 88 def verify_param(self): 89 if self.head_list is None or self.component_list is None or \ 90 self.save_path is None or self.key_path is None: 91 UPDATE_LOGGER.print_log("Check param failed!", UPDATE_LOGGER.ERROR_LOG) 92 return False 93 if os.path.isdir(self.key_path): 94 UPDATE_LOGGER.print_log("Invalid keyname", UPDATE_LOGGER.ERROR_LOG) 95 return False 96 if self.head_list.__sizeof__() <= 0 or self.component_list.__sizeof__() <= 0: 97 UPDATE_LOGGER.print_log("Invalid param", UPDATE_LOGGER.ERROR_LOG) 98 return False 99 return True 100 101 def write_pkginfo(self, package_file): 102 try: 103 # Type is 1 for package header in TLV format 104 header_tlv = struct.pack(TLV_FMT, self.header_tlv_type, UPGRADE_PKG_HEADER_SIZE) 105 pkg_info_length = \ 106 UPGRADE_RESERVE_LEN + TLV_SIZE + TLV_SIZE + TLV_SIZE + \ 107 UPGRADE_PKG_HEADER_SIZE + UPGRADE_PKG_TIME_SIZE + \ 108 self.upgrade_compinfo_size * self.head_list.entry_count 109 upgrade_pkg_header = struct.pack( 110 UPGRADE_PKG_HEADER_FMT, pkg_info_length, self.head_list.update_file_version, 111 self.head_list.product_update_id, self.head_list.software_version) 112 113 # Type is 2 for time in TLV format 114 time_tlv = struct.pack(TLV_FMT, 0x02, UPGRADE_PKG_TIME_SIZE) 115 upgrade_pkg_time = struct.pack( 116 UPGRADE_PKG_TIME_FMT, self.head_list.date, self.head_list.time) 117 118 # Type is 5 for component in TLV format 119 component_tlv = struct.pack( 120 TLV_FMT, 0x05, self.upgrade_compinfo_size * self.head_list.entry_count) 121 except struct.error: 122 UPDATE_LOGGER.print_log("Pack fail!", log_type=UPDATE_LOGGER.ERROR_LOG) 123 return False 124 125 # write pkginfo 126 pkginfo = header_tlv + upgrade_pkg_header + time_tlv + upgrade_pkg_time + component_tlv 127 try: 128 package_file.write(pkginfo) 129 except IOError: 130 UPDATE_LOGGER.print_log("write fail!", log_type=UPDATE_LOGGER.ERROR_LOG) 131 return False 132 UPDATE_LOGGER.print_log("Write package header complete") 133 return True 134 135 def write_component_info(self, component, package_file): 136 UPDATE_LOGGER.print_log("component information StartOffset:%s"\ 137 % self.compinfo_offset) 138 if OPTIONS_MANAGER.not_l2: 139 component_addr_size = COMPONEBT_ADDR_SIZE 140 else: 141 component_addr_size = COMPONEBT_ADDR_SIZE_L2 142 143 try: 144 package_file.seek(self.compinfo_offset) 145 package_file.write(component.component_addr) 146 self.compinfo_offset += component_addr_size 147 148 package_file.seek(self.compinfo_offset) 149 component_info = struct.pack( 150 COMPONENT_INFO_FMT, component.id, component.res_type, 151 component.flags, component.type) 152 package_file.write(component_info) 153 self.compinfo_offset += COMPONENT_INFO_FMT_SIZE 154 155 package_file.seek(self.compinfo_offset) 156 package_file.write(component.version) 157 self.compinfo_offset += COMPONENT_VERSION_SIZE 158 159 package_file.seek(self.compinfo_offset) 160 component_size = struct.pack( 161 COMPONENT_SIZE_FMT, component.size, component.original_size) 162 package_file.write(component_size) 163 self.compinfo_offset += COMPONENT_SIZE_FMT_SIZE 164 165 package_file.seek(self.compinfo_offset) 166 package_file.write(component.digest) 167 self.compinfo_offset += COMPONENT_DIGEST_SIZE 168 except (struct.error, IOError): 169 return False 170 return True 171 172 def write_component(self, component, package_file): 173 UPDATE_LOGGER.print_log("Add component to package StartOffset:%s"\ 174 % self.component_offset) 175 try: 176 with open(component.file_path, "rb") as component_file: 177 component_data = component_file.read() 178 package_file.seek(self.component_offset) 179 package_file.write(component_data) 180 component_len = len(component_data) 181 self.component_offset += component_len 182 except IOError: 183 return False 184 UPDATE_LOGGER.print_log("Write component complete ComponentSize:%s"\ 185 % component_len) 186 return True 187 188 def calculate_hash(self, package_file): 189 hash_sha256 = hashlib.sha256() 190 remain_len = self.component_offset 191 192 package_file.seek(0) 193 while remain_len > BLCOK_SIZE: 194 hash_sha256.update(package_file.read(BLCOK_SIZE)) 195 remain_len -= BLCOK_SIZE 196 if remain_len > 0: 197 hash_sha256.update(package_file.read(remain_len)) 198 return hash_sha256.digest() 199 200 def sign_digest_with_pss(self, digset): 201 try: 202 with open(self.key_path, 'rb') as f_r: 203 key_data = f_r.read() 204 private_key = serialization.load_pem_private_key( 205 key_data, 206 password=None, 207 backend=default_backend()) 208 209 pad = padding.PSS( 210 mgf=padding.MGF1(hashes.SHA256()), 211 salt_length=padding.PSS.MAX_LENGTH) 212 213 signature = private_key.sign(digset, pad, hashes.SHA256()) 214 except (OSError, ValueError): 215 return False 216 return signature 217 218 def sign_digest(self, digset): 219 try: 220 with open(self.key_path, 'rb') as f_r: 221 key_data = f_r.read() 222 private_key = serialization.load_pem_private_key( 223 key_data, 224 password=None, 225 backend=default_backend()) 226 signature = private_key.sign(digset, padding.PKCS1v15(), hashes.SHA256()) 227 except (OSError, ValueError): 228 return False 229 return signature 230 231 def sign(self, sign_algo): 232 with open(self.save_path, "rb+") as package_file: 233 # calculate hash for .bin package 234 digest = self.calculate_hash(package_file) 235 if not digest: 236 UPDATE_LOGGER.print_log("calculate hash for .bin package failed", 237 log_type=UPDATE_LOGGER.ERROR_LOG) 238 return False 239 240 # sign .bin package 241 if sign_algo == SIGN_ALGO_RSA: 242 signature = self.sign_digest(digest) 243 elif sign_algo == SIGN_ALGO_PSS: 244 signature = self.sign_digest_with_pss(digest) 245 else: 246 UPDATE_LOGGER.print_log("invalid sign_algo!", log_type=UPDATE_LOGGER.ERROR_LOG) 247 return False 248 if not signature: 249 UPDATE_LOGGER.print_log("sign .bin package failed!", log_type=UPDATE_LOGGER.ERROR_LOG) 250 return False 251 252 if len(signature) == SIGN_SHA384_LEN: 253 self.sign_offset += SIGN_SHA256_LEN 254 255 # write signed .bin package 256 package_file.seek(self.sign_offset) 257 package_file.write(signature) 258 UPDATE_LOGGER.print_log( 259 ".bin package signing success! SignOffset: %s" % self.sign_offset) 260 return True 261 262 def create_package(self): 263 """ 264 Create the update.bin file 265 return: update package creation result 266 """ 267 if not self.verify_param(): 268 UPDATE_LOGGER.print_log("verify param failed!", UPDATE_LOGGER.ERROR_LOG) 269 return False 270 package_fd = os.open(self.save_path, os.O_RDWR | os.O_CREAT, 0o755) 271 with os.fdopen(package_fd, "wb+") as package_file: 272 # Add information to package 273 if not self.write_pkginfo(package_file): 274 UPDATE_LOGGER.print_log( 275 "Write pkginfo failed!", log_type=UPDATE_LOGGER.ERROR_LOG) 276 return False 277 # Add component to package 278 self.compinfo_offset = UPGRADE_FILE_HEADER_LEN 279 self.component_offset = UPGRADE_FILE_HEADER_LEN + \ 280 self.head_list.entry_count * self.upgrade_compinfo_size + \ 281 UPGRADE_RESERVE_LEN + SIGN_SHA256_LEN + SIGN_SHA384_LEN 282 for i in range(0, self.head_list.entry_count): 283 UPDATE_LOGGER.print_log("Add component %s" 284 % self.component_list[i].component_addr) 285 if not self.write_component_info(self.component_list[i], package_file): 286 UPDATE_LOGGER.print_log("write component info failed: %s" 287 % self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG) 288 return False 289 if not self.write_component(self.component_list[i], package_file): 290 UPDATE_LOGGER.print_log("write component failed: %s" 291 % self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG) 292 return False 293 try: 294 # Add descriptPackageId to package 295 package_file.seek(self.compinfo_offset) 296 package_file.write(self.head_list.describe_package_id) 297 except IOError: 298 UPDATE_LOGGER.print_log( 299 "Add descriptPackageId failed!", log_type=UPDATE_LOGGER.ERROR_LOG) 300 return False 301 try: 302 # Sign 303 self.sign_offset = self.compinfo_offset + UPGRADE_RESERVE_LEN 304 package_file.seek(self.sign_offset) 305 sign_buffer = bytes(UPGRADE_SIGNATURE_LEN) 306 package_file.write(sign_buffer) 307 except IOError: 308 UPDATE_LOGGER.print_log( 309 "Add Sign failed!", log_type=UPDATE_LOGGER.ERROR_LOG) 310 return False 311 UPDATE_LOGGER.print_log("Write update package complete") 312 return True