1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2021 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import binascii 17import copy 18import os 19import subprocess 20import tempfile 21import time 22import collections as collect 23import enum 24import ctypes 25import zipfile 26 27from log_exception import UPDATE_LOGGER 28from script_generator import create_script 29from utils import sign_package 30from utils import HASH_CONTENT_LEN_DICT 31from utils import OPTIONS_MANAGER 32from utils import REGISTER_SCRIPT_FILE_NAME 33from utils import ON_SERVER 34from utils import SCRIPT_KEY_LIST 35from utils import EXTEND_OPTIONAL_COMPONENT_LIST 36from utils import COMPONENT_INFO_INNIT 37from utils import UPDATE_EXE_FILE_NAME 38from utils import TOTAL_SCRIPT_FILE_NAME 39from utils import EXTEND_COMPONENT_LIST 40from utils import LINUX_HASH_ALGORITHM_DICT 41from utils import UPDATE_BIN_FILE_NAME 42from utils import BUILD_TOOLS_FILE_NAME 43from utils import SIGN_PACKAGE_EVENT 44from utils import GENERATE_SIGNED_DATA_EVENT 45from create_update_package import CreatePackage 46from create_update_package import SIGN_ALGO_RSA 47from create_update_package import SIGN_ALGO_PSS 48from create_signed_data import generate_signed_data_default 49 50IS_DEL = 0 51SIGNING_LENGTH_256 = 256 52DIGEST_LEN = 32 53HASH_VALUE_MAX_LEN = 128 54 55 56class SignMethod(enum.Enum): 57 RSA = 1 58 ECC = 2 59 60 61class PkgHeader(ctypes.Structure): 62 _fields_ = [("digest_method", ctypes.c_ubyte), 63 ("sign_method", ctypes.c_ubyte), 64 ("pkg_type", ctypes.c_ubyte), 65 ("pkg_flags", ctypes.c_ubyte), 66 ("entry_count", ctypes.c_int), 67 ("update_file_version", ctypes.c_int), 68 ("product_update_id", ctypes.c_char_p), 69 ("software_version", ctypes.c_char_p), 70 ("date", ctypes.c_char_p), 71 ("time", ctypes.c_char_p), 72 ("describe_package_id", ctypes.c_char_p)] 73 74 75class PkgComponent(ctypes.Structure): 76 _fields_ = [("digest", ctypes.c_ubyte * DIGEST_LEN), 77 ("file_path", ctypes.c_char_p), 78 ("component_addr", ctypes.c_char_p), 79 ("version", ctypes.c_char_p), 80 ("size", ctypes.c_int), 81 ("id", ctypes.c_int), 82 ("original_size", ctypes.c_int), 83 ("res_type", ctypes.c_ubyte), 84 ("type", ctypes.c_ubyte), 85 ("flags", ctypes.c_ubyte)] 86 87 88class SignInfo(ctypes.Structure): 89 _fields_ = [("sign_offset", ctypes.c_int), 90 ("hash_len", ctypes.c_int), 91 ("hash_code", ctypes.c_ubyte * (HASH_VALUE_MAX_LEN + 1))] 92 93 94def create_update_bin(): 95 """ 96 Call the interface to generate the update.bin file. 97 :return update_bin_obj: Update file object. 98 If exception occurs, return False. 99 """ 100 update_bin_obj = tempfile.NamedTemporaryFile( 101 dir=OPTIONS_MANAGER.update_package, prefix="update_bin-") 102 103 head_value_list = OPTIONS_MANAGER.head_info_list 104 component_dict = OPTIONS_MANAGER.component_info_dict 105 full_image_file_obj_list = OPTIONS_MANAGER.full_image_file_obj_list 106 full_img_list = OPTIONS_MANAGER.full_img_list 107 108 if not OPTIONS_MANAGER.not_l2: 109 if OPTIONS_MANAGER.partition_file_obj is not None: 110 all_image_name = \ 111 EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST + full_img_list 112 else: 113 all_image_name = EXTEND_COMPONENT_LIST + full_img_list 114 else: 115 all_image_name = full_img_list 116 sort_component_dict = collect.OrderedDict() 117 for each_image_name in all_image_name: 118 sort_component_dict[each_image_name] = \ 119 component_dict.get(each_image_name) 120 component_dict = copy.deepcopy(sort_component_dict) 121 head_list = get_head_list(len(component_dict), head_value_list) 122 123 component_list = get_component_list( 124 full_image_file_obj_list, component_dict) 125 126 save_patch = update_bin_obj.name.encode("utf-8") 127 if OPTIONS_MANAGER.private_key == ON_SERVER: 128 private_key = "./update_package.py" 129 else: 130 private_key = OPTIONS_MANAGER.private_key.encode("utf-8") 131 132 if OPTIONS_MANAGER.not_l2: 133 sign_algo = SIGN_ALGO_PSS 134 else: 135 sign_algo = SIGN_ALGO_RSA 136 137 # create bin 138 package = CreatePackage(head_list, component_list, save_patch, 139 OPTIONS_MANAGER.private_key) 140 if not package.create_package(): 141 UPDATE_LOGGER.print_log( 142 "Create update package .bin failed!", UPDATE_LOGGER.ERROR_LOG) 143 return False 144 145 UPDATE_LOGGER.print_log( 146 "Create update package .bin complete! path: %s" % update_bin_obj.name) 147 return update_bin_obj 148 149 150def get_component_list(all_image_file_obj_list, component_dict): 151 """ 152 Get the list of component information according to 153 the component information structure. 154 :param all_image_file_obj_list: all image object file list 155 :param component_dict: Component information content dict 156 :return component_list: List of component information. 157 If exception occurs, return False. 158 """ 159 pkg_components = PkgComponent * len(component_dict) 160 component_list = pkg_components() 161 if not OPTIONS_MANAGER.not_l2: 162 if OPTIONS_MANAGER.partition_file_obj is not None: 163 extend_component_list = \ 164 EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST 165 extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path, 166 OPTIONS_MANAGER.board_list_file_path, 167 OPTIONS_MANAGER.partition_file_obj.name] 168 else: 169 extend_component_list = EXTEND_COMPONENT_LIST 170 extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path, 171 OPTIONS_MANAGER.board_list_file_path] 172 else: 173 extend_component_list = [] 174 extend_path_list = [] 175 idx = 0 176 for key, component in component_dict.items(): 177 if idx < len(extend_component_list): 178 file_path = extend_path_list[idx] 179 else: 180 file_path = \ 181 all_image_file_obj_list[idx - len(extend_component_list)].name 182 digest = get_hash_content(file_path, OPTIONS_MANAGER.hash_algorithm) 183 if not digest: 184 return 185 if component is None: 186 component = copy.copy(COMPONENT_INFO_INNIT) 187 component[0] = key 188 component_list[idx].digest = (ctypes.c_ubyte * 32).from_buffer_copy( 189 binascii.a2b_hex(digest.encode('utf-8'))) 190 component_list[idx].file_path = file_path.encode("utf-8") 191 if not OPTIONS_MANAGER.not_l2: 192 component_list[idx].component_addr = \ 193 ('/%s' % component[0]).encode("utf-8") 194 else: 195 component_list[idx].component_addr = \ 196 ('%s' % component[0]).encode("utf-8") 197 component_list[idx].version = component[4].encode("utf-8") 198 component_list[idx].size = os.path.getsize(file_path) 199 component_list[idx].id = int(component[1]) 200 if component[3] == 1: 201 component_list[idx].original_size = os.path.getsize(file_path) 202 else: 203 component_list[idx].original_size = 0 204 component_list[idx].res_type = int(component[2]) 205 component_list[idx].type = int(component[3]) 206 component_list[idx].flags = IS_DEL 207 208 idx += 1 209 return component_list 210 211 212def get_head_list(component_count, head_value_list): 213 """ 214 According to the header structure, get the list of HEAD headers. 215 :param component_count: number of components 216 :param head_value_list: list of header values 217 :return head_list: header list 218 """ 219 head_list = PkgHeader() 220 if OPTIONS_MANAGER.signing_length != SIGNING_LENGTH_256: 221 # PKG_DIGEST_TYPE_SHA384 3,use sha384 222 head_list.digest_method = 3 223 else: 224 # PKG_DIGEST_TYPE_SHA256 2,use sha256 225 head_list.digest_method = 2 226 if OPTIONS_MANAGER.private_key == ON_SERVER: 227 head_list.sign_method = 0 228 else: 229 if OPTIONS_MANAGER.signing_algorithm == "ECC": 230 # signing algorithm use ECC 231 head_list.sign_method = SignMethod.ECC.value 232 else: 233 # signing algorithm use RSA 234 head_list.sign_method = SignMethod.RSA.value 235 head_list.pkg_type = 1 236 if OPTIONS_MANAGER.not_l2: 237 head_list.pkg_flags = 1 238 else: 239 head_list.pkg_flags = 0 240 head_list.entry_count = component_count 241 head_list.update_file_version = int(head_value_list[0]) 242 head_list.product_update_id = head_value_list[1].encode("utf-8") 243 head_list.software_version = head_value_list[2].encode("utf-8") 244 head_list.date = head_value_list[3].encode("utf-8") 245 head_list.time = head_value_list[4].encode("utf-8") 246 head_list.describe_package_id = ctypes.c_char_p("update/info.bin".encode()) 247 return head_list 248 249 250def get_tools_component_list(count, opera_script_dict): 251 """ 252 Get the list of component information according to 253 the component information structure. 254 :param count: number of components 255 :param opera_script_dict: script file name and path dict 256 :return component_list: list of component information. 257 If exception occurs, return False. 258 """ 259 pkg_components = PkgComponent * count 260 component_list = pkg_components() 261 component_value_list = list(opera_script_dict.keys()) 262 component_num = 0 263 for i, component in enumerate(component_value_list): 264 component_list[i].file_path = component.encode("utf-8") 265 component_list[i].component_addr = \ 266 (opera_script_dict[component]).encode("utf-8") 267 component_num += 1 268 return component_list, component_num 269 270 271def get_tools_head_list(component_count): 272 """ 273 According to the header structure, get the list of HEAD headers. 274 :param component_count: number of components 275 :return head_list: header list 276 """ 277 head_list = PkgHeader() 278 head_list.digest_method = 0 279 head_list.sign_method = 0 280 head_list.pkg_type = 2 281 head_list.pkg_flags = 0 282 head_list.entry_count = component_count 283 return head_list 284 285 286def get_signing_from_server(package_path, hash_algorithm, hash_code=None): 287 """ 288 Server update package signature requires the vendor to 289 implement its own service signature interface, as shown below: 290 ip = "" 291 user_name = "" 292 pass_word = "" 293 signe_jar = "" 294 signing_config = [signe_jar, ip, user_name, pass_word, 295 hash_code, hash_algorithm] 296 cmd = ' '.join(signing_config) 297 subprocess.Popen( 298 cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 299 :param package_path: update package file path 300 :param hash_algorithm: hash algorithm 301 :param hash_code: hash code 302 :return: 303 """ 304 UPDATE_LOGGER.print_log("Signing %s, hash algorithm is: %s, " 305 "Signing hash code: %s" % 306 (package_path, hash_algorithm, hash_code)) 307 signing_content = "" 308 return signing_content.encode() 309 310 311def create_build_tools_zip(): 312 """ 313 Create the update package file. 314 :param lib: lib object 315 :return: 316 """ 317 opera_script_file_name_dict = OPTIONS_MANAGER.opera_script_file_name_dict 318 tmp_dict = {} 319 for each in SCRIPT_KEY_LIST: 320 tmp_dict[each] = [] 321 if opera_script_file_name_dict == tmp_dict: 322 UPDATE_LOGGER.print_log( 323 "Script dict is null!", 324 log_type=UPDATE_LOGGER.ERROR_LOG) 325 return False 326 count = 0 327 opera_script_dict = {} 328 for each_value in opera_script_file_name_dict.values(): 329 for each in each_value: 330 opera_script_dict[each[1].name] = each[0] 331 count += 1 332 # other_file_count --> 1(updater_binary) + 1(loadScript.us) 333 other_file_count = 2 334 count += other_file_count 335 if OPTIONS_MANAGER.register_script_file_obj is not None: 336 count += 1 337 head_list = get_tools_head_list(count) 338 component_list, num = \ 339 get_tools_component_list(count, opera_script_dict) 340 total_script_file_obj = OPTIONS_MANAGER.total_script_file_obj 341 register_script_file_obj = OPTIONS_MANAGER.register_script_file_obj 342 update_exe_path = os.path.join(OPTIONS_MANAGER.target_package_dir, 343 UPDATE_EXE_FILE_NAME) 344 if not os.path.exists(update_exe_path): 345 UPDATE_LOGGER.print_log( 346 "updater_binary file does not exist!path: %s" % update_exe_path, 347 log_type=UPDATE_LOGGER.ERROR_LOG) 348 return False 349 350 file_obj = tempfile.NamedTemporaryFile( 351 dir=OPTIONS_MANAGER.update_package, prefix="build_tools-") 352 files_to_sign = [] 353 zip_file = zipfile.ZipFile(file_obj.name, 'w', zipfile.ZIP_DEFLATED) 354 # file name will be prefixed by build_tools in hash signed data 355 name_format_str = "build_tools/{}" 356 # add opera_script to build_tools.zip 357 for key, value in opera_script_dict.items(): 358 zip_file.write(key, value) 359 files_to_sign += [(key, name_format_str.format(value))] 360 361 # add update_binary to build_tools.zip 362 zip_file.write(update_exe_path, UPDATE_EXE_FILE_NAME) 363 files_to_sign += [(update_exe_path, name_format_str.format(UPDATE_EXE_FILE_NAME))] 364 365 # add loadScript to build_tools.zip 366 zip_file.write(total_script_file_obj.name, TOTAL_SCRIPT_FILE_NAME) 367 files_to_sign += [(total_script_file_obj.name, name_format_str.format(TOTAL_SCRIPT_FILE_NAME))] 368 if OPTIONS_MANAGER.register_script_file_obj is not None: 369 zip_file.write(register_script_file_obj.name, REGISTER_SCRIPT_FILE_NAME) 370 files_to_sign += [(register_script_file_obj.name, name_format_str.format(REGISTER_SCRIPT_FILE_NAME))] 371 372 if create_hsd_for_build_tools(zip_file, files_to_sign) is False: 373 return False 374 return file_obj 375 376 377def do_zip_update_package(): 378 zip_file = zipfile.ZipFile(OPTIONS_MANAGER.update_package_file_path, 379 'w', zipfile.ZIP_DEFLATED, allowZip64=True) 380 # add update.bin to update package 381 zip_file.write(OPTIONS_MANAGER.update_bin_obj.name, "update.bin") 382 # add build_tools.zip to update package 383 zip_file.write(OPTIONS_MANAGER.build_tools_zip_obj.name, BUILD_TOOLS_FILE_NAME) 384 385 zip_file.write(OPTIONS_MANAGER.version_mbn_file_path, "version_list") 386 zip_file.write(OPTIONS_MANAGER.board_list_file_path, "board_list") 387 388 if OPTIONS_MANAGER.max_stash_size != 0: 389 max_stash_file_obj = tempfile.NamedTemporaryFile(mode="w+") 390 max_stash_file_obj.write(str(OPTIONS_MANAGER.max_stash_size)) 391 max_stash_file_obj.flush() 392 zip_file.write(max_stash_file_obj.name, "all_max_stash") 393 394 for package_patch_zip in OPTIONS_MANAGER.incremental_block_file_obj_dict.values(): 395 package_patch_zip.package_block_patch(zip_file) 396 397 for partition, patch_obj in OPTIONS_MANAGER.incremental_image_file_obj_dict.items(): 398 zip_file.write(patch_obj.name, "%s.patch.dat" % partition) 399 400 zip_file.close() 401 402 403def create_hsd_for_build_tools(zip_file, files_to_sign): 404 """ 405 generate hash signed data for build_tools.zip 406 """ 407 generate_signed_data_ext = OPTIONS_MANAGER.init.invoke_event(GENERATE_SIGNED_DATA_EVENT) 408 signed_data = "" 409 # add hash signed data to build_tools.zip 410 if generate_signed_data_ext is False: 411 signed_data = generate_signed_data_default(files_to_sign) 412 else: 413 signed_data = generate_signed_data_ext(files_to_sign) 414 if signed_data == "": 415 UPDATE_LOGGER.print_log("generate_signed_data failed", log_type=UPDATE_LOGGER.ERROR_LOG) 416 zip_file.close() 417 return False 418 zip_file.writestr("hash_signed_data", signed_data) 419 zip_file.close() 420 return True 421 422 423def build_update_package(no_zip, update_package, prelude_script, 424 verse_script, refrain_script, ending_script): 425 """ 426 Create the update package file. 427 :param no_zip: no zip 428 :param update_package: update package path 429 :param prelude_script: prelude object 430 :param verse_script: verse object 431 :param refrain_script: refrain object 432 :param ending_script: ending object 433 :return: If exception occurs, return False. 434 """ 435 update_bin_obj = create_update_bin() 436 if update_bin_obj: 437 OPTIONS_MANAGER.update_bin_obj = update_bin_obj 438 else: 439 return False 440 441 if OPTIONS_MANAGER.sd_card : 442 package_type = "sd" 443 elif OPTIONS_MANAGER.source_package : 444 package_type = "diff" 445 else : 446 package_type = "full" 447 if OPTIONS_MANAGER.not_l2: 448 update_file_name = ''.join( 449 ["updater_", OPTIONS_MANAGER.target_package_version.replace(" ", "_")]) 450 else : 451 update_file_name = ''.join( 452 ["updater_", package_type]) 453 454 if not no_zip: 455 update_package_path = os.path.join( 456 update_package, '%s_unsigned.zip' % update_file_name) 457 OPTIONS_MANAGER.update_package_file_path = update_package_path 458 459 create_script(prelude_script, verse_script, 460 refrain_script, ending_script) 461 462 build_tools_zip_obj = create_build_tools_zip() 463 if build_tools_zip_obj is False: 464 UPDATE_LOGGER.print_log( 465 "Create build tools zip failed!", 466 log_type=UPDATE_LOGGER.ERROR_LOG) 467 return False 468 OPTIONS_MANAGER.build_tools_zip_obj = build_tools_zip_obj 469 470 do_zip_update_package() 471 472 signed_package = os.path.join( 473 update_package, "%s.zip" % update_file_name) 474 OPTIONS_MANAGER.signed_package = signed_package 475 if os.path.exists(signed_package): 476 os.remove(signed_package) 477 478 sign_ota_package = \ 479 OPTIONS_MANAGER.init.invoke_event(SIGN_PACKAGE_EVENT) 480 if sign_ota_package: 481 sign_result = sign_ota_package() 482 else: 483 sign_result = sign_package() 484 485 if not sign_result: 486 UPDATE_LOGGER.print_log("Sign ota package fail", UPDATE_LOGGER.ERROR_LOG) 487 return False 488 if os.path.exists(update_package_path): 489 os.remove(update_package_path) 490 else: 491 update_package_path = os.path.join( 492 update_package, '%s.bin' % update_file_name) 493 if os.path.exists(update_package_path): 494 os.remove(update_package_path) 495 OPTIONS_MANAGER.update_package_file_path = update_package_path 496 with open(OPTIONS_MANAGER.update_bin_obj.name, 'rb') as r_f: 497 content = r_f.read() 498 with open(update_package_path, 'wb') as w_f: 499 w_f.write(content) 500 return True 501 502 503def get_hash_content(file_path, hash_algorithm): 504 """ 505 Use SHA256SUM to get the hash value of the file. 506 :param file_path : file path 507 :param hash_algorithm: hash algorithm 508 :return hash_content: hash value 509 """ 510 try: 511 cmd = [LINUX_HASH_ALGORITHM_DICT[hash_algorithm], file_path] 512 except KeyError: 513 UPDATE_LOGGER.print_log( 514 "Unsupported hash algorithm! %s" % hash_algorithm, 515 log_type=UPDATE_LOGGER.ERROR_LOG) 516 return False 517 if not os.path.exists(file_path): 518 UPDATE_LOGGER.print_log( 519 "%s failed!" % LINUX_HASH_ALGORITHM_DICT[hash_algorithm], 520 UPDATE_LOGGER.ERROR_LOG) 521 raise RuntimeError 522 process_obj = subprocess.Popen( 523 cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 524 process_obj.wait() 525 hash_content = \ 526 process_obj.stdout.read().decode(encoding='gbk').split(' ')[0] 527 if len(hash_content) != HASH_CONTENT_LEN_DICT.get(hash_algorithm): 528 UPDATE_LOGGER.print_log( 529 "Get hash content failed! The length of the hash_content is 0!", 530 UPDATE_LOGGER.ERROR_LOG) 531 raise RuntimeError 532 if process_obj.returncode == 0: 533 UPDATE_LOGGER.print_log( 534 "Get hash content success! path: %s" % file_path) 535 return hash_content 536