1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2021 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import binascii 17import copy 18import os 19import subprocess 20import tempfile 21import time 22import collections as collect 23import enum 24import ctypes 25import zipfile 26 27from log_exception import UPDATE_LOGGER 28from script_generator import create_script 29from utils import sign_package 30from utils import HASH_CONTENT_LEN_DICT 31from utils import OPTIONS_MANAGER 32from utils import REGISTER_SCRIPT_FILE_NAME 33from utils import ON_SERVER 34from utils import SCRIPT_KEY_LIST 35from utils import EXTEND_OPTIONAL_COMPONENT_LIST 36from utils import COMPONENT_INFO_INNIT 37from utils import UPDATE_EXE_FILE_NAME 38from utils import TOTAL_SCRIPT_FILE_NAME 39from utils import EXTEND_PATH_EVENT 40from utils import LINUX_HASH_ALGORITHM_DICT 41from utils import UPDATE_BIN_FILE_NAME 42from utils import BUILD_TOOLS_FILE_NAME 43from utils import SIGN_PACKAGE_EVENT 44from utils import CHECK_BINARY_EVENT 45from utils import ZIP_EVENT 46from utils import GENERATE_SIGNED_DATA_EVENT 47from utils import DECOUPLED_EVENT 48from utils import get_extend_path_list 49from create_update_package import CreatePackage 50from create_update_package import SIGN_ALGO_RSA 51from create_update_package import SIGN_ALGO_PSS 52from create_signed_data import generate_signed_data_default 53 54IS_DEL = 0 55SIGNING_LENGTH_256 = 256 56DIGEST_LEN = 32 57HASH_VALUE_MAX_LEN = 128 58 59 60class SignMethod(enum.Enum): 61 RSA = 1 62 ECC = 2 63 64 65class PkgHeader(ctypes.Structure): 66 _fields_ = [("digest_method", ctypes.c_ubyte), 67 ("sign_method", ctypes.c_ubyte), 68 ("pkg_type", ctypes.c_ubyte), 69 ("pkg_flags", ctypes.c_ubyte), 70 ("entry_count", ctypes.c_int), 71 ("update_file_version", ctypes.c_int), 72 ("product_update_id", ctypes.c_char_p), 73 ("software_version", ctypes.c_char_p), 74 ("date", ctypes.c_char_p), 75 ("time", ctypes.c_char_p), 76 ("describe_package_id", ctypes.c_char_p)] 77 78 79class PkgComponent(ctypes.Structure): 80 _fields_ = [("digest", ctypes.c_ubyte * DIGEST_LEN), 81 ("file_path", ctypes.c_char_p), 82 ("component_addr", ctypes.c_char_p), 83 ("version", ctypes.c_char_p), 84 ("size", ctypes.c_int), 85 ("id", ctypes.c_int), 86 ("original_size", ctypes.c_int), 87 ("res_type", ctypes.c_ubyte), 88 ("type", ctypes.c_ubyte), 89 ("flags", ctypes.c_ubyte)] 90 91 92class SignInfo(ctypes.Structure): 93 _fields_ = [("sign_offset", ctypes.c_int), 94 ("hash_len", ctypes.c_int), 95 ("hash_code", ctypes.c_ubyte * (HASH_VALUE_MAX_LEN + 1))] 96 97 98def create_update_bin(): 99 """ 100 Call the interface to generate the update.bin file. 101 :return update_bin_obj: Update file object. 102 If exception occurs, return False. 103 """ 104 update_bin_obj = tempfile.NamedTemporaryFile(dir=OPTIONS_MANAGER.update_package, prefix="update_bin-") 105 106 head_value_list = OPTIONS_MANAGER.head_info_list 107 component_dict = OPTIONS_MANAGER.component_info_dict 108 full_image_file_obj_list = OPTIONS_MANAGER.full_image_file_obj_list 109 full_img_list = OPTIONS_MANAGER.full_img_list 110 111 extend_component_list = get_extend_path_list() 112 if not OPTIONS_MANAGER.not_l2: 113 if OPTIONS_MANAGER.partition_file_obj is not None: 114 all_image_name = \ 115 extend_component_list + EXTEND_OPTIONAL_COMPONENT_LIST + full_img_list 116 else: 117 all_image_name = extend_component_list + full_img_list 118 else: 119 all_image_name = full_img_list 120 sort_component_dict = collect.OrderedDict() 121 for each_image_name in all_image_name: 122 sort_component_dict[each_image_name] = component_dict.get(each_image_name) 123 component_dict = copy.deepcopy(sort_component_dict) 124 head_list = get_head_list(len(component_dict), head_value_list) 125 126 component_list = get_component_list( 127 full_image_file_obj_list, component_dict) 128 129 save_patch = update_bin_obj.name.encode("utf-8") 130 if OPTIONS_MANAGER.private_key == ON_SERVER: 131 private_key = "./update_package.py" 132 else: 133 private_key = OPTIONS_MANAGER.private_key.encode("utf-8") 134 135 if OPTIONS_MANAGER.not_l2: 136 sign_algo = SIGN_ALGO_PSS 137 else: 138 sign_algo = SIGN_ALGO_RSA 139 140 # create bin 141 package = CreatePackage(head_list, component_list, save_patch, OPTIONS_MANAGER.private_key) 142 if not package.create_package(): 143 UPDATE_LOGGER.print_log("Create update package .bin failed!", UPDATE_LOGGER.ERROR_LOG) 144 return False 145 146 UPDATE_LOGGER.print_log("Create update package .bin complete! path: %s" % update_bin_obj.name) 147 return update_bin_obj 148 149 150def get_component_list(all_image_file_obj_list, component_dict): 151 """ 152 Get the list of component information according to 153 the component information structure. 154 :param all_image_file_obj_list: all image object file list 155 :param component_dict: Component information content dict 156 :return component_list: List of component information. 157 If exception occurs, return False. 158 """ 159 pkg_components = PkgComponent * len(component_dict) 160 component_list = pkg_components() 161 extend_list = get_extend_path_list() 162 if not OPTIONS_MANAGER.not_l2: 163 if OPTIONS_MANAGER.partition_file_obj is not None: 164 extend_component_list = extend_list + EXTEND_OPTIONAL_COMPONENT_LIST 165 extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path, 166 OPTIONS_MANAGER.board_list_file_path, 167 OPTIONS_MANAGER.partition_file_obj.name] 168 else: 169 extend_component_list = extend_list 170 extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path, 171 OPTIONS_MANAGER.board_list_file_path] 172 else: 173 extend_component_list = [] 174 extend_path_list = [] 175 get_path_list = OPTIONS_MANAGER.init.invoke_event(EXTEND_PATH_EVENT) 176 if get_path_list: 177 extend_path_list = get_path_list() 178 idx = 0 179 for key, component in component_dict.items(): 180 if idx < len(extend_component_list): 181 file_path = extend_path_list[idx] 182 else: 183 file_path = all_image_file_obj_list[idx - len(extend_component_list)].name 184 digest = get_hash_content(file_path, OPTIONS_MANAGER.hash_algorithm) 185 if not digest: 186 return 187 if component is None: 188 component = copy.copy(COMPONENT_INFO_INNIT) 189 component[0] = key 190 component_list[idx].digest = (ctypes.c_ubyte * 32).from_buffer_copy( 191 binascii.a2b_hex(digest.encode('utf-8'))) 192 component_list[idx].file_path = file_path.encode("utf-8") 193 if not OPTIONS_MANAGER.not_l2: 194 component_list[idx].component_addr = ('/%s' % component[0]).encode("utf-8") 195 else: 196 component_list[idx].component_addr = ('%s' % component[0]).encode("utf-8") 197 component_list[idx].version = component[4].encode("utf-8") 198 component_list[idx].size = os.path.getsize(file_path) 199 component_list[idx].id = int(component[1]) 200 if component[3] == 1: 201 component_list[idx].original_size = os.path.getsize(file_path) 202 else: 203 component_list[idx].original_size = 0 204 component_list[idx].res_type = int(component[2]) 205 component_list[idx].type = int(component[3]) 206 component_list[idx].flags = IS_DEL 207 208 idx += 1 209 return component_list 210 211 212def get_head_list(component_count, head_value_list): 213 """ 214 According to the header structure, get the list of HEAD headers. 215 :param component_count: number of components 216 :param head_value_list: list of header values 217 :return head_list: header list 218 """ 219 head_list = PkgHeader() 220 if OPTIONS_MANAGER.signing_length != SIGNING_LENGTH_256: 221 # PKG_DIGEST_TYPE_SHA384 3,use sha384 222 head_list.digest_method = 3 223 else: 224 # PKG_DIGEST_TYPE_SHA256 2,use sha256 225 head_list.digest_method = 2 226 if OPTIONS_MANAGER.private_key == ON_SERVER: 227 head_list.sign_method = 0 228 else: 229 if OPTIONS_MANAGER.signing_algorithm == "ECC": 230 # signing algorithm use ECC 231 head_list.sign_method = SignMethod.ECC.value 232 else: 233 # signing algorithm use RSA 234 head_list.sign_method = SignMethod.RSA.value 235 head_list.pkg_type = 1 236 if OPTIONS_MANAGER.not_l2: 237 head_list.pkg_flags = 1 238 else: 239 head_list.pkg_flags = 0 240 head_list.entry_count = component_count 241 head_list.update_file_version = int(head_value_list[0]) 242 head_list.product_update_id = head_value_list[1].encode("utf-8") 243 head_list.software_version = head_value_list[2].encode("utf-8") 244 head_list.date = head_value_list[3].encode("utf-8") 245 head_list.time = head_value_list[4].encode("utf-8") 246 head_list.describe_package_id = ctypes.c_char_p("update/info.bin".encode()) 247 return head_list 248 249 250def get_tools_component_list(count, opera_script_dict): 251 """ 252 Get the list of component information according to 253 the component information structure. 254 :param count: number of components 255 :param opera_script_dict: script file name and path dict 256 :return component_list: list of component information. 257 If exception occurs, return False. 258 """ 259 pkg_components = PkgComponent * count 260 component_list = pkg_components() 261 component_value_list = list(opera_script_dict.keys()) 262 component_num = 0 263 for i, component in enumerate(component_value_list): 264 component_list[i].file_path = component.encode("utf-8") 265 component_list[i].component_addr = \ 266 (opera_script_dict[component]).encode("utf-8") 267 component_num += 1 268 return component_list, component_num 269 270 271def get_tools_head_list(component_count): 272 """ 273 According to the header structure, get the list of HEAD headers. 274 :param component_count: number of components 275 :return head_list: header list 276 """ 277 head_list = PkgHeader() 278 head_list.digest_method = 0 279 head_list.sign_method = 0 280 head_list.pkg_type = 2 281 head_list.pkg_flags = 0 282 head_list.entry_count = component_count 283 return head_list 284 285 286def get_signing_from_server(package_path, hash_algorithm, hash_code=None): 287 """ 288 Server update package signature requires the vendor to 289 implement its own service signature interface, as shown below: 290 ip = "" 291 user_name = "" 292 pass_word = "" 293 signe_jar = "" 294 signing_config = [signe_jar, ip, user_name, pass_word, 295 hash_code, hash_algorithm] 296 cmd = ' '.join(signing_config) 297 subprocess.Popen( 298 cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 299 :param package_path: update package file path 300 :param hash_algorithm: hash algorithm 301 :param hash_code: hash code 302 :return: 303 """ 304 UPDATE_LOGGER.print_log("Signing %s, hash algorithm is: %s, " 305 "Signing hash code: %s" % 306 (package_path, hash_algorithm, hash_code)) 307 signing_content = "" 308 return signing_content.encode() 309 310 311def create_build_tools_zip(): 312 """ 313 Create the update package file. 314 :param lib: lib object 315 :return: 316 """ 317 opera_script_file_name_dict = OPTIONS_MANAGER.opera_script_file_name_dict 318 tmp_dict = {} 319 for each in SCRIPT_KEY_LIST: 320 tmp_dict[each] = [] 321 if opera_script_file_name_dict == tmp_dict: 322 UPDATE_LOGGER.print_log( 323 "Script dict is null!", 324 log_type=UPDATE_LOGGER.ERROR_LOG) 325 return False 326 count = 0 327 opera_script_dict = {} 328 for each_value in opera_script_file_name_dict.values(): 329 for each in each_value: 330 opera_script_dict[each[1].name] = each[0] 331 count += 1 332 # other_file_count --> 1(updater_binary) + 1(loadScript.us) 333 other_file_count = 2 334 count += other_file_count 335 if OPTIONS_MANAGER.register_script_file_obj is not None: 336 count += 1 337 head_list = get_tools_head_list(count) 338 component_list, num = get_tools_component_list(count, opera_script_dict) 339 total_script_file_obj = OPTIONS_MANAGER.total_script_file_obj 340 register_script_file_obj = OPTIONS_MANAGER.register_script_file_obj 341 update_exe_path = os.path.join(OPTIONS_MANAGER.target_package_dir, UPDATE_EXE_FILE_NAME) 342 343 file_obj = tempfile.NamedTemporaryFile(dir=OPTIONS_MANAGER.update_package, prefix="build_tools-") 344 files_to_sign = [] 345 zip_file = zipfile.ZipFile(file_obj.name, 'w', zipfile.ZIP_DEFLATED) 346 # file name will be prefixed by build_tools in hash signed data 347 name_format_str = "build_tools/{}" 348 # add opera_script to build_tools.zip 349 for key, value in opera_script_dict.items(): 350 zip_file.write(key, value) 351 files_to_sign += [(key, name_format_str.format(value))] 352 binary_check = OPTIONS_MANAGER.init.invoke_event(CHECK_BINARY_EVENT) 353 if callable(binary_check) is False or (callable(binary_check) and binary_check() is False): 354 if not os.path.exists(update_exe_path): 355 UPDATE_LOGGER.print_log("updater_binary file does not exist!path: %s" % update_exe_path, 356 log_type=UPDATE_LOGGER.ERROR_LOG) 357 return False 358 # add update_binary to build_tools.zip 359 zip_file.write(update_exe_path, UPDATE_EXE_FILE_NAME) 360 files_to_sign += [(update_exe_path, name_format_str.format(UPDATE_EXE_FILE_NAME))] 361 362 # add loadScript to build_tools.zip 363 zip_file.write(total_script_file_obj.name, TOTAL_SCRIPT_FILE_NAME) 364 files_to_sign += [(total_script_file_obj.name, name_format_str.format(TOTAL_SCRIPT_FILE_NAME))] 365 if OPTIONS_MANAGER.register_script_file_obj is not None: 366 zip_file.write(register_script_file_obj.name, REGISTER_SCRIPT_FILE_NAME) 367 files_to_sign += [(register_script_file_obj.name, name_format_str.format(REGISTER_SCRIPT_FILE_NAME))] 368 369 if create_hsd_for_build_tools(zip_file, files_to_sign) is False: 370 zip_file.close() 371 return False 372 zip_file.close() 373 return file_obj 374 375 376def do_sign_package(update_package, update_file_name): 377 signed_package = os.path.join( 378 update_package, "%s.zip" % update_file_name) 379 OPTIONS_MANAGER.signed_package = signed_package 380 if os.path.exists(signed_package): 381 os.remove(signed_package) 382 383 sign_ota_package = \ 384 OPTIONS_MANAGER.init.invoke_event(SIGN_PACKAGE_EVENT) 385 if sign_ota_package: 386 return sign_ota_package() 387 else: 388 return sign_package() 389 390 391def get_update_file_name(): 392 if OPTIONS_MANAGER.sd_card : 393 package_type = "sd" 394 elif OPTIONS_MANAGER.source_package : 395 package_type = "diff" 396 else : 397 package_type = "full" 398 if OPTIONS_MANAGER.not_l2: 399 update_file_name = ''.join( 400 ["updater_", OPTIONS_MANAGER.target_package_version.replace(" ", "_")]) 401 else : 402 update_file_name = ''.join( 403 ["updater_", package_type]) 404 return update_file_name 405 406 407def do_zip_update_package(): 408 zip_file = zipfile.ZipFile(OPTIONS_MANAGER.update_package_file_path, 409 'w', zipfile.ZIP_DEFLATED, allowZip64=True) 410 # add files to update package 411 do_add_files = OPTIONS_MANAGER.init.invoke_event(ZIP_EVENT) 412 if callable(do_add_files) and do_add_files(zip_file) is False: 413 UPDATE_LOGGER.print_log("add files fail", UPDATE_LOGGER.ERROR_LOG) 414 zip_file.close() 415 return False 416 # add update.bin to update package 417 zip_file.write(OPTIONS_MANAGER.update_bin_obj.name, "update.bin") 418 # add build_tools.zip to update package 419 zip_file.write(OPTIONS_MANAGER.build_tools_zip_obj.name, BUILD_TOOLS_FILE_NAME) 420 421 zip_file.write(OPTIONS_MANAGER.board_list_file_path, "board_list") 422 decouple_res = OPTIONS_MANAGER.init.invoke_event(DECOUPLED_EVENT) 423 if decouple_res is False: 424 zip_file.write(OPTIONS_MANAGER.version_mbn_file_path, "version_list") 425 426 if OPTIONS_MANAGER.max_stash_size != 0: 427 max_stash_file_obj = tempfile.NamedTemporaryFile(mode="w+") 428 max_stash_file_obj.write(str(OPTIONS_MANAGER.max_stash_size)) 429 max_stash_file_obj.flush() 430 zip_file.write(max_stash_file_obj.name, "all_max_stash") 431 432 for package_patch_zip in OPTIONS_MANAGER.incremental_block_file_obj_dict.values(): 433 package_patch_zip.package_block_patch(zip_file) 434 435 for partition, patch_obj in OPTIONS_MANAGER.incremental_image_file_obj_dict.items(): 436 zip_file.write(patch_obj.name, "%s.patch.dat" % partition) 437 438 zip_file.close() 439 return True 440 441 442def create_hsd_for_build_tools(zip_file, files_to_sign): 443 """ 444 generate hash signed data for build_tools.zip 445 """ 446 generate_signed_data_ext = OPTIONS_MANAGER.init.invoke_event(GENERATE_SIGNED_DATA_EVENT) 447 signed_data = "" 448 # add hash signed data to build_tools.zip 449 if generate_signed_data_ext is False: 450 signed_data = generate_signed_data_default(files_to_sign) 451 else: 452 signed_data = generate_signed_data_ext(files_to_sign) 453 if signed_data == "": 454 UPDATE_LOGGER.print_log("generate_signed_data failed", log_type=UPDATE_LOGGER.ERROR_LOG) 455 zip_file.close() 456 return False 457 zip_file.writestr("hash_signed_data", signed_data) 458 return True 459 460 461def build_update_package(no_zip, update_package, prelude_script, 462 verse_script, refrain_script, ending_script): 463 """ 464 Create the update package file. 465 :param no_zip: no zip 466 :param update_package: update package path 467 :param prelude_script: prelude object 468 :param verse_script: verse object 469 :param refrain_script: refrain object 470 :param ending_script: ending object 471 :return: If exception occurs, return False. 472 """ 473 update_bin_obj = create_update_bin() 474 if update_bin_obj: 475 OPTIONS_MANAGER.update_bin_obj = update_bin_obj 476 else: 477 return False 478 479 update_file_name = get_update_file_name() 480 481 if not no_zip: 482 update_package_path = os.path.join( 483 update_package, '%s_unsigned.zip' % update_file_name) 484 OPTIONS_MANAGER.update_package_file_path = update_package_path 485 486 create_script(prelude_script, verse_script, 487 refrain_script, ending_script) 488 489 build_tools_zip_obj = create_build_tools_zip() 490 if build_tools_zip_obj is False: 491 UPDATE_LOGGER.print_log( 492 "Create build tools zip failed!", 493 log_type=UPDATE_LOGGER.ERROR_LOG) 494 return False 495 OPTIONS_MANAGER.build_tools_zip_obj = build_tools_zip_obj 496 497 if not do_zip_update_package(): 498 UPDATE_LOGGER.print_log("Zip update package fail", UPDATE_LOGGER.ERROR_LOG) 499 return False 500 501 sign_result = do_sign_package(update_package, update_file_name) 502 503 if not sign_result: 504 UPDATE_LOGGER.print_log("Sign ota package fail", UPDATE_LOGGER.ERROR_LOG) 505 return False 506 if os.path.exists(update_package_path): 507 os.remove(update_package_path) 508 else: 509 update_package_path = os.path.join( 510 update_package, '%s.bin' % update_file_name) 511 if os.path.exists(update_package_path): 512 os.remove(update_package_path) 513 OPTIONS_MANAGER.update_package_file_path = update_package_path 514 with open(OPTIONS_MANAGER.update_bin_obj.name, 'rb') as r_f: 515 content = r_f.read() 516 with open(update_package_path, 'wb') as w_f: 517 w_f.write(content) 518 return True 519 520 521def get_hash_content(file_path, hash_algorithm): 522 """ 523 Use SHA256SUM to get the hash value of the file. 524 :param file_path : file path 525 :param hash_algorithm: hash algorithm 526 :return hash_content: hash value 527 """ 528 try: 529 cmd = [LINUX_HASH_ALGORITHM_DICT[hash_algorithm], file_path] 530 except KeyError: 531 UPDATE_LOGGER.print_log( 532 "Unsupported hash algorithm! %s" % hash_algorithm, 533 log_type=UPDATE_LOGGER.ERROR_LOG) 534 return False 535 if not os.path.exists(file_path): 536 UPDATE_LOGGER.print_log( 537 "%s failed!" % LINUX_HASH_ALGORITHM_DICT[hash_algorithm], 538 UPDATE_LOGGER.ERROR_LOG) 539 raise RuntimeError 540 process_obj = subprocess.Popen( 541 cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 542 process_obj.wait() 543 hash_content = \ 544 process_obj.stdout.read().decode(encoding='gbk').split(' ')[0] 545 if len(hash_content) != HASH_CONTENT_LEN_DICT.get(hash_algorithm): 546 UPDATE_LOGGER.print_log( 547 "Get hash content failed! The length of the hash_content is 0!", 548 UPDATE_LOGGER.ERROR_LOG) 549 raise RuntimeError 550 if process_obj.returncode == 0: 551 UPDATE_LOGGER.print_log( 552 "Get hash content success! path: %s" % file_path) 553 return hash_content 554