1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2021 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import binascii 17import copy 18import os 19import subprocess 20import tempfile 21import time 22import collections as collect 23import enum 24import ctypes 25 26from log_exception import UPDATE_LOGGER 27from script_generator import create_script 28from utils import HASH_CONTENT_LEN_DICT 29from utils import OPTIONS_MANAGER 30from utils import REGISTER_SCRIPT_FILE_NAME 31from utils import ON_SERVER 32from utils import SCRIPT_KEY_LIST 33from utils import EXTEND_OPTIONAL_COMPONENT_LIST 34from utils import COMPONENT_INFO_INNIT 35from utils import UPDATE_EXE_FILE_NAME 36from utils import TOTAL_SCRIPT_FILE_NAME 37from utils import EXTEND_COMPONENT_LIST 38from utils import LINUX_HASH_ALGORITHM_DICT 39from utils import BUILD_TOOLS_FILE_NAME 40from utils import get_lib_api 41 42IS_DEL = 0 43SIGNING_LENGTH_256 = 256 44DIGEST_LEN = 32 45HASH_VALUE_MAX_LEN = 128 46 47 48class SignMethod(enum.Enum): 49 RSA = 1 50 ECC = 2 51 52 53class PkgHeader(ctypes.Structure): 54 _fields_ = [("digest_method", ctypes.c_ubyte), 55 ("sign_method", ctypes.c_ubyte), 56 ("pkg_type", ctypes.c_ubyte), 57 ("pkg_flags", ctypes.c_ubyte), 58 ("entry_count", ctypes.c_int), 59 ("update_file_version", ctypes.c_int), 60 ("product_update_id", ctypes.c_char_p), 61 ("software_version", ctypes.c_char_p), 62 ("date", ctypes.c_char_p), 63 ("time", ctypes.c_char_p), 64 ("describe_package_id", ctypes.c_char_p)] 65 66 67class PkgComponent(ctypes.Structure): 68 _fields_ = [("digest", ctypes.c_ubyte * DIGEST_LEN), 69 ("file_path", ctypes.c_char_p), 70 ("component_addr", ctypes.c_char_p), 71 ("version", ctypes.c_char_p), 72 ("size", ctypes.c_int), 73 ("id", ctypes.c_int), 74 ("original_size", ctypes.c_int), 75 ("res_type", ctypes.c_ubyte), 76 ("type", ctypes.c_ubyte), 77 ("flags", ctypes.c_ubyte)] 78 79 80class SignInfo(ctypes.Structure): 81 _fields_ = [("sign_offset", ctypes.c_int), 82 ("hash_len", ctypes.c_int), 83 ("hash_code", ctypes.c_ubyte * (HASH_VALUE_MAX_LEN + 1))] 84 85 86def create_update_bin(): 87 """ 88 Call the interface to generate the update.bin file. 89 :return update_bin_obj: Update file object. 90 If exception occurs, return False. 91 """ 92 update_bin_obj = tempfile.NamedTemporaryFile(prefix="update_bin-") 93 94 head_value_list = OPTIONS_MANAGER.head_info_list 95 component_dict = OPTIONS_MANAGER.component_info_dict 96 full_image_file_obj_list = OPTIONS_MANAGER.full_image_file_obj_list 97 full_img_list = OPTIONS_MANAGER.full_img_list 98 incremental_img_list = OPTIONS_MANAGER.incremental_img_list 99 incremental_image_file_obj_list = \ 100 OPTIONS_MANAGER.incremental_image_file_obj_list 101 102 all_image_file_obj_list = \ 103 incremental_image_file_obj_list + full_image_file_obj_list 104 if not OPTIONS_MANAGER.not_l2: 105 if OPTIONS_MANAGER.partition_file_obj is not None: 106 all_image_name = \ 107 EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST + \ 108 incremental_img_list + full_img_list 109 else: 110 all_image_name = \ 111 EXTEND_COMPONENT_LIST + incremental_img_list + full_img_list 112 else: 113 all_image_name = \ 114 incremental_img_list + full_img_list 115 sort_component_dict = collect.OrderedDict() 116 for each_image_name in all_image_name: 117 sort_component_dict[each_image_name] = \ 118 component_dict.get(each_image_name) 119 component_dict = copy.deepcopy(sort_component_dict) 120 head_list = get_head_list(len(component_dict), head_value_list) 121 122 component_list = get_component_list( 123 all_image_file_obj_list, component_dict) 124 125 save_patch = update_bin_obj.name.encode("utf-8") 126 sign_info = SignInfo() 127 if OPTIONS_MANAGER.private_key == ON_SERVER: 128 private_key = "./update_package.py" 129 else: 130 private_key = OPTIONS_MANAGER.private_key.encode("utf-8") 131 lib = get_lib_api() 132 lib_l1 = get_lib_api(is_l2=False) 133 if OPTIONS_MANAGER.not_l2: 134 lib_l1.CreatePackageWithSignInfo( 135 ctypes.pointer(head_list), component_list, save_patch, 136 private_key, ctypes.pointer(sign_info)) 137 138 offset = sign_info.sign_offset 139 hash_code = bytes(sign_info.hash_code).decode('ascii') 140 else: 141 lib.CreatePackage( 142 ctypes.pointer(head_list), component_list, save_patch, 143 OPTIONS_MANAGER.private_key.encode("utf-8")) 144 offset = 0 145 hash_code = b"" 146 147 if OPTIONS_MANAGER.private_key == ON_SERVER: 148 signing_package(update_bin_obj.name, 149 OPTIONS_MANAGER.hash_algorithm, hash_code=hash_code, 150 position=offset) 151 152 UPDATE_LOGGER.print_log(".bin package signing success!") 153 UPDATE_LOGGER.print_log( 154 "Create update package .bin complete! path: %s" % update_bin_obj.name) 155 return update_bin_obj, lib 156 157 158def get_component_list(all_image_file_obj_list, component_dict): 159 """ 160 Get the list of component information according to 161 the component information structure. 162 :param all_image_file_obj_list: all image object file list 163 :param component_dict: Component information content dict 164 :return component_list: List of component information. 165 If exception occurs, return False. 166 """ 167 pkg_components = PkgComponent * len(component_dict) 168 component_list = pkg_components() 169 if not OPTIONS_MANAGER.not_l2: 170 if OPTIONS_MANAGER.partition_file_obj is not None: 171 extend_component_list = \ 172 EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST 173 extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path, 174 OPTIONS_MANAGER.board_list_file_path, 175 OPTIONS_MANAGER.partition_file_obj.name] 176 else: 177 extend_component_list = EXTEND_COMPONENT_LIST 178 extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path, 179 OPTIONS_MANAGER.board_list_file_path] 180 else: 181 extend_component_list = [] 182 extend_path_list = [] 183 idx = 0 184 for key, component in component_dict.items(): 185 if idx < len(extend_component_list): 186 file_path = extend_path_list[idx] 187 else: 188 file_path = \ 189 all_image_file_obj_list[idx - len(extend_component_list)].name 190 digest = get_hash_content(file_path, OPTIONS_MANAGER.hash_algorithm) 191 if digest is None: 192 return 193 if component is None: 194 component = copy.copy(COMPONENT_INFO_INNIT) 195 component[0] = key 196 component_list[idx].digest = (ctypes.c_ubyte * 32).from_buffer_copy( 197 binascii.a2b_hex(digest.encode('utf-8'))) 198 component_list[idx].file_path = file_path.encode("utf-8") 199 if not OPTIONS_MANAGER.not_l2: 200 component_list[idx].component_addr = \ 201 ('/%s' % component[0]).encode("utf-8") 202 else: 203 component_list[idx].component_addr = \ 204 ('%s' % component[0]).encode("utf-8") 205 component_list[idx].version = component[4].encode("utf-8") 206 component_list[idx].size = os.path.getsize(file_path) 207 component_list[idx].id = int(component[1]) 208 if component[3] == 1: 209 component_list[idx].original_size = os.path.getsize(file_path) 210 else: 211 component_list[idx].original_size = 0 212 component_list[idx].res_type = int(component[2]) 213 component_list[idx].type = int(component[3]) 214 component_list[idx].flags = IS_DEL 215 216 idx += 1 217 return component_list 218 219 220def get_head_list(component_count, head_value_list): 221 """ 222 According to the header structure, get the list of HEAD headers. 223 :param component_count: number of components 224 :param head_value_list: list of header values 225 :return head_list: header list 226 """ 227 head_list = PkgHeader() 228 if OPTIONS_MANAGER.signing_length != SIGNING_LENGTH_256: 229 # PKG_DIGEST_TYPE_SHA384 3,use sha384 230 head_list.digest_method = 3 231 else: 232 # PKG_DIGEST_TYPE_SHA256 2,use sha256 233 head_list.digest_method = 2 234 if OPTIONS_MANAGER.private_key == ON_SERVER: 235 head_list.sign_method = 0 236 else: 237 if OPTIONS_MANAGER.signing_algorithm == "ECC": 238 # signing algorithm use ECC 239 head_list.sign_method = SignMethod.ECC.value 240 else: 241 # signing algorithm use RSA 242 head_list.sign_method = SignMethod.RSA.value 243 head_list.pkg_type = 1 244 if OPTIONS_MANAGER.not_l2: 245 head_list.pkg_flags = 1 246 else: 247 head_list.pkg_flags = 0 248 head_list.entry_count = component_count 249 head_list.update_file_version = int(head_value_list[0]) 250 head_list.product_update_id = head_value_list[1].encode("utf-8") 251 head_list.software_version = head_value_list[2].encode("utf-8") 252 head_list.date = head_value_list[3].encode("utf-8") 253 head_list.time = head_value_list[4].encode("utf-8") 254 head_list.describe_package_id = ctypes.c_char_p("update/info.bin".encode()) 255 return head_list 256 257 258def get_tools_component_list(count, opera_script_dict): 259 """ 260 Get the list of component information according to 261 the component information structure. 262 :param count: number of components 263 :param opera_script_dict: script file name and path dict 264 :return component_list: list of component information. 265 If exception occurs, return False. 266 """ 267 pkg_components = PkgComponent * count 268 component_list = pkg_components() 269 component_value_list = list(opera_script_dict.keys()) 270 component_num = 0 271 for i, component in enumerate(component_value_list): 272 component_list[i].file_path = component.encode("utf-8") 273 component_list[i].component_addr = \ 274 (opera_script_dict[component]).encode("utf-8") 275 component_num += 1 276 return component_list, component_num 277 278 279def get_tools_head_list(component_count): 280 """ 281 According to the header structure, get the list of HEAD headers. 282 :param component_count: number of components 283 :return head_list: header list 284 """ 285 head_list = PkgHeader() 286 head_list.digest_method = 0 287 head_list.sign_method = 0 288 head_list.pkg_type = 2 289 head_list.pkg_flags = 0 290 head_list.entry_count = component_count 291 return head_list 292 293 294def get_signing_from_server(package_path, hash_algorithm, hash_code=None): 295 """ 296 Server update package signature requires the vendor to 297 implement its own service signature interface, as shown below: 298 ip = "" 299 user_name = "" 300 pass_word = "" 301 signe_jar = "" 302 signing_config = [signe_jar, ip, user_name, pass_word, 303 hash_code, hash_algorithm] 304 cmd = ' '.join(signing_config) 305 subprocess.Popen( 306 cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 307 :param package_path: update package file path 308 :param hash_algorithm: hash algorithm 309 :param hash_code: hash code 310 :return: 311 """ 312 UPDATE_LOGGER.print_log("Signing %s, hash algorithm is: %s, " 313 "Signing hash code: %s" % 314 (package_path, hash_algorithm, hash_code)) 315 signing_content = "" 316 return signing_content.encode() 317 318 319def signing_package(package_path, hash_algorithm, hash_code=None, 320 position=0, package_type='.bin'): 321 """ 322 Update package signature. 323 :param package_path: update package file path 324 :param hash_algorithm: hash algorithm 325 :param position: signature write location 326 :param hash_code: hash code 327 :param package_type: the type of package,.bin/.zip 328 :return: 329 """ 330 try: 331 signing_content = get_signing_from_server( 332 package_path, hash_algorithm, hash_code) 333 if position != 0: 334 with open(package_path, mode='rb+') as f_r: 335 f_r.seek(position) 336 f_r.write(signing_content) 337 else: 338 with open(package_path, mode='ab') as f_w: 339 f_w.write(signing_content) 340 return True 341 except (OSError, TypeError): 342 UPDATE_LOGGER.print_log("%s package signing failed!" % package_type) 343 raise OSError 344 345 346def create_build_tools_zip(lib): 347 """ 348 Create the update package file. 349 :param lib: lib object 350 :return: 351 """ 352 opera_script_file_name_dict = OPTIONS_MANAGER.opera_script_file_name_dict 353 tmp_dict = {} 354 for each in SCRIPT_KEY_LIST: 355 tmp_dict[each] = [] 356 if opera_script_file_name_dict == tmp_dict: 357 UPDATE_LOGGER.print_log( 358 "Script dict is null!", 359 log_type=UPDATE_LOGGER.ERROR_LOG) 360 return False 361 count = 0 362 opera_script_dict = {} 363 for each_value in opera_script_file_name_dict.values(): 364 for each in each_value: 365 opera_script_dict[each[1].name] = each[0] 366 count += 1 367 # other_file_count --> 1(updater_binary) + 1(loadScript.us) 368 other_file_count = 2 369 count += other_file_count 370 if OPTIONS_MANAGER.register_script_file_obj is not None: 371 count += 1 372 head_list = get_tools_head_list(count) 373 component_list, num = \ 374 get_tools_component_list(count, opera_script_dict) 375 total_script_file_obj = OPTIONS_MANAGER.total_script_file_obj 376 register_script_file_obj = OPTIONS_MANAGER.register_script_file_obj 377 update_exe_path = os.path.join(OPTIONS_MANAGER.target_package_dir, 378 UPDATE_EXE_FILE_NAME) 379 if not os.path.exists(update_exe_path): 380 UPDATE_LOGGER.print_log( 381 "updater_binary file does not exist!path: %s" % update_exe_path, 382 log_type=UPDATE_LOGGER.ERROR_LOG) 383 return False 384 file_obj = tempfile.NamedTemporaryFile(prefix="build_tools-") 385 file_save_patch = file_obj.name.encode("utf-8") 386 component_list[num].file_path = update_exe_path.encode("utf-8") 387 component_list[num].component_addr = \ 388 UPDATE_EXE_FILE_NAME.encode("utf-8") 389 component_list[num + 1].file_path = \ 390 total_script_file_obj.name.encode("utf-8") 391 component_list[num + 1].component_addr = \ 392 TOTAL_SCRIPT_FILE_NAME.encode("utf-8") 393 394 if OPTIONS_MANAGER.register_script_file_obj is not None: 395 component_list[num + 2].file_path = \ 396 register_script_file_obj.name.encode("utf-8") 397 component_list[num + 2].component_addr = \ 398 REGISTER_SCRIPT_FILE_NAME.encode("utf-8") 399 400 if OPTIONS_MANAGER.private_key == ON_SERVER: 401 private_key = "./update_package.py" 402 else: 403 private_key = OPTIONS_MANAGER.private_key.encode("utf-8") 404 405 lib.CreatePackage( 406 ctypes.pointer(head_list), component_list, file_save_patch, 407 private_key) 408 return file_obj 409 410 411def build_update_package(no_zip, update_package, prelude_script, 412 verse_script, refrain_script, ending_script): 413 """ 414 Create the update package file. 415 :param no_zip: no zip 416 :param update_package: update package path 417 :param prelude_script: prelude object 418 :param verse_script: verse object 419 :param refrain_script: refrain object 420 :param ending_script: ending object 421 :return: If exception occurs, return False. 422 """ 423 update_bin_obj, lib = create_update_bin() 424 OPTIONS_MANAGER.update_bin_obj = update_bin_obj 425 426 update_file_name = ''.join( 427 [OPTIONS_MANAGER.product, '_ota_', 428 time.strftime("%H%M%S", time.localtime())]) 429 if not no_zip: 430 update_package_path = os.path.join( 431 update_package, '%s.zip' % update_file_name) 432 OPTIONS_MANAGER.update_package_file_path = update_package_path 433 434 create_script(prelude_script, verse_script, 435 refrain_script, ending_script) 436 437 build_tools_zip_obj = create_build_tools_zip(lib) 438 if build_tools_zip_obj is False: 439 UPDATE_LOGGER.print_log( 440 "Create build tools zip failed!", 441 log_type=UPDATE_LOGGER.ERROR_LOG) 442 return False 443 OPTIONS_MANAGER.build_tools_zip_obj = build_tools_zip_obj 444 head_list = PkgHeader() 445 if OPTIONS_MANAGER.signing_length != SIGNING_LENGTH_256: 446 # PKG_DIGEST_TYPE_SHA384 3,use sha384 447 head_list.digest_method = 3 448 else: 449 # PKG_DIGEST_TYPE_SHA256 2,use sha256 450 head_list.digest_method = 2 451 if OPTIONS_MANAGER.private_key == ON_SERVER: 452 head_list.sign_method = 0 453 else: 454 if OPTIONS_MANAGER.signing_algorithm == "ECC": 455 # signing algorithm use ECC 456 head_list.sign_method = SignMethod.ECC.value 457 else: 458 # signing algorithm use RSA 459 head_list.sign_method = SignMethod.RSA.value 460 head_list.pkg_type = 2 461 head_list.pkg_flags = 0 462 head_list.entry_count = 2 463 pkg_components = PkgComponent * 2 464 component_list = pkg_components() 465 component_list[0].file_path = \ 466 OPTIONS_MANAGER.update_bin_obj.name.encode("utf-8") 467 component_list[0].component_addr = 'update.bin'.encode("utf-8") 468 component_list[1].file_path = \ 469 OPTIONS_MANAGER.build_tools_zip_obj.name.encode("utf-8") 470 component_list[1].component_addr = \ 471 BUILD_TOOLS_FILE_NAME.encode("utf-8") 472 473 sign_info = SignInfo() 474 if OPTIONS_MANAGER.private_key == ON_SERVER: 475 private_key = "./update_package.py" 476 else: 477 private_key = OPTIONS_MANAGER.private_key.encode("utf-8") 478 lib = get_lib_api() 479 lib_l1 = get_lib_api(is_l2=False) 480 if OPTIONS_MANAGER.not_l2: 481 lib_l1.CreatePackageWithSignInfo( 482 ctypes.pointer(head_list), component_list, 483 update_package_path.encode("utf-8"), 484 private_key, ctypes.pointer(sign_info)) 485 else: 486 lib.CreatePackage( 487 ctypes.pointer(head_list), component_list, 488 update_package_path.encode("utf-8"), 489 OPTIONS_MANAGER.private_key.encode("utf-8")) 490 491 if OPTIONS_MANAGER.private_key == ON_SERVER: 492 hash_code = "".join(["%x" % each for each in sign_info.hash_code]) 493 signing_package(update_bin_obj.name, 494 OPTIONS_MANAGER.hash_algorithm, hash_code, 495 package_type='.zip') 496 497 UPDATE_LOGGER.print_log(".zip package signing success!") 498 UPDATE_LOGGER.print_log( 499 "Create update package .bin complete! path: %s" % 500 update_package_path) 501 else: 502 update_package_path = os.path.join( 503 update_package, '%s.bin' % update_file_name) 504 OPTIONS_MANAGER.update_package_file_path = update_package_path 505 with open(OPTIONS_MANAGER.update_bin_obj.name, 'rb') as r_f: 506 content = r_f.read() 507 with open(update_package_path, 'wb') as w_f: 508 w_f.write(content) 509 return True 510 511 512def get_hash_content(file_path, hash_algorithm): 513 """ 514 Use SHA256SUM to get the hash value of the file. 515 :param file_path : file path 516 :param hash_algorithm: hash algorithm 517 :return hash_content: hash value 518 """ 519 try: 520 cmd = [LINUX_HASH_ALGORITHM_DICT[hash_algorithm], file_path] 521 except KeyError: 522 UPDATE_LOGGER.print_log( 523 "Unsupported hash algorithm! %s" % hash_algorithm, 524 log_type=UPDATE_LOGGER.ERROR_LOG) 525 return None 526 if not os.path.exists(file_path): 527 UPDATE_LOGGER.print_log( 528 "%s failed!" % LINUX_HASH_ALGORITHM_DICT.get(hash_algorithm), 529 UPDATE_LOGGER.ERROR_LOG) 530 raise RuntimeError 531 process_obj = subprocess.Popen( 532 cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 533 process_obj.wait() 534 hash_content = \ 535 process_obj.stdout.read().decode(encoding='gbk').split(' ')[0] 536 if len(hash_content) != HASH_CONTENT_LEN_DICT.get(hash_algorithm): 537 UPDATE_LOGGER.print_log( 538 "Get hash content failed! The length of the hash_content is 0!", 539 UPDATE_LOGGER.ERROR_LOG) 540 raise RuntimeError 541 if process_obj.returncode == 0: 542 UPDATE_LOGGER.print_log( 543 "Get hash content success! path: %s" % file_path) 544 return hash_content 545