1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2021 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import binascii 17import copy 18import os 19import subprocess 20import tempfile 21import time 22import collections as collect 23import enum 24import ctypes 25import zipfile 26 27from log_exception import UPDATE_LOGGER 28from script_generator import create_script 29from utils import sign_package 30from utils import HASH_CONTENT_LEN_DICT 31from utils import OPTIONS_MANAGER 32from utils import REGISTER_SCRIPT_FILE_NAME 33from utils import ON_SERVER 34from utils import SCRIPT_KEY_LIST 35from utils import EXTEND_OPTIONAL_COMPONENT_LIST 36from utils import COMPONENT_INFO_INNIT 37from utils import UPDATE_EXE_FILE_NAME 38from utils import TOTAL_SCRIPT_FILE_NAME 39from utils import EXTEND_COMPONENT_LIST 40from utils import LINUX_HASH_ALGORITHM_DICT 41from utils import BUILD_TOOLS_FILE_NAME 42from utils import SIGN_PACKAGE_EVENT 43from create_update_package import CreatePackage 44from create_update_package import SIGN_ALGO_RSA 45from create_update_package import SIGN_ALGO_PSS 46 47IS_DEL = 0 48SIGNING_LENGTH_256 = 256 49DIGEST_LEN = 32 50HASH_VALUE_MAX_LEN = 128 51 52 53class SignMethod(enum.Enum): 54 RSA = 1 55 ECC = 2 56 57 58class PkgHeader(ctypes.Structure): 59 _fields_ = [("digest_method", ctypes.c_ubyte), 60 ("sign_method", ctypes.c_ubyte), 61 ("pkg_type", ctypes.c_ubyte), 62 ("pkg_flags", ctypes.c_ubyte), 63 ("entry_count", ctypes.c_int), 64 ("update_file_version", ctypes.c_int), 65 ("product_update_id", ctypes.c_char_p), 66 ("software_version", ctypes.c_char_p), 67 ("date", ctypes.c_char_p), 68 ("time", ctypes.c_char_p), 69 ("describe_package_id", ctypes.c_char_p)] 70 71 72class PkgComponent(ctypes.Structure): 73 _fields_ = [("digest", ctypes.c_ubyte * DIGEST_LEN), 74 ("file_path", ctypes.c_char_p), 75 ("component_addr", ctypes.c_char_p), 76 ("version", ctypes.c_char_p), 77 ("size", ctypes.c_int), 78 ("id", ctypes.c_int), 79 ("original_size", ctypes.c_int), 80 ("res_type", ctypes.c_ubyte), 81 ("type", ctypes.c_ubyte), 82 ("flags", ctypes.c_ubyte)] 83 84 85class SignInfo(ctypes.Structure): 86 _fields_ = [("sign_offset", ctypes.c_int), 87 ("hash_len", ctypes.c_int), 88 ("hash_code", ctypes.c_ubyte * (HASH_VALUE_MAX_LEN + 1))] 89 90 91def create_update_bin(): 92 """ 93 Call the interface to generate the update.bin file. 94 :return update_bin_obj: Update file object. 95 If exception occurs, return False. 96 """ 97 update_bin_obj = tempfile.NamedTemporaryFile( 98 dir=OPTIONS_MANAGER.update_package, prefix="update_bin-") 99 100 head_value_list = OPTIONS_MANAGER.head_info_list 101 component_dict = OPTIONS_MANAGER.component_info_dict 102 full_image_file_obj_list = OPTIONS_MANAGER.full_image_file_obj_list 103 full_img_list = OPTIONS_MANAGER.full_img_list 104 incremental_img_list = OPTIONS_MANAGER.incremental_img_list 105 incremental_image_file_obj_list = \ 106 OPTIONS_MANAGER.incremental_image_file_obj_list 107 108 all_image_file_obj_list = \ 109 incremental_image_file_obj_list + full_image_file_obj_list 110 if not OPTIONS_MANAGER.not_l2: 111 if OPTIONS_MANAGER.partition_file_obj is not None: 112 all_image_name = \ 113 EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST + \ 114 incremental_img_list + full_img_list 115 else: 116 all_image_name = \ 117 EXTEND_COMPONENT_LIST + incremental_img_list + full_img_list 118 else: 119 all_image_name = \ 120 incremental_img_list + full_img_list 121 sort_component_dict = collect.OrderedDict() 122 for each_image_name in all_image_name: 123 sort_component_dict[each_image_name] = \ 124 component_dict.get(each_image_name) 125 component_dict = copy.deepcopy(sort_component_dict) 126 head_list = get_head_list(len(component_dict), head_value_list) 127 128 component_list = get_component_list( 129 all_image_file_obj_list, component_dict) 130 131 save_patch = update_bin_obj.name.encode("utf-8") 132 if OPTIONS_MANAGER.private_key == ON_SERVER: 133 private_key = "./update_package.py" 134 else: 135 private_key = OPTIONS_MANAGER.private_key.encode("utf-8") 136 137 if OPTIONS_MANAGER.not_l2: 138 sign_algo = SIGN_ALGO_PSS 139 else: 140 sign_algo = SIGN_ALGO_RSA 141 142 # create bin 143 package = CreatePackage(head_list, component_list, save_patch, 144 OPTIONS_MANAGER.private_key) 145 if not package.create_package(): 146 UPDATE_LOGGER.print_log( 147 "Create update package .bin failed!", UPDATE_LOGGER.ERROR_LOG) 148 return False 149 if not package.sign(sign_algo): 150 UPDATE_LOGGER.print_log(".bin package signing failed", UPDATE_LOGGER.ERROR_LOG) 151 return False 152 153 UPDATE_LOGGER.print_log( 154 "Create update package .bin complete! path: %s" % update_bin_obj.name) 155 return update_bin_obj 156 157 158def get_component_list(all_image_file_obj_list, component_dict): 159 """ 160 Get the list of component information according to 161 the component information structure. 162 :param all_image_file_obj_list: all image object file list 163 :param component_dict: Component information content dict 164 :return component_list: List of component information. 165 If exception occurs, return False. 166 """ 167 pkg_components = PkgComponent * len(component_dict) 168 component_list = pkg_components() 169 if not OPTIONS_MANAGER.not_l2: 170 if OPTIONS_MANAGER.partition_file_obj is not None: 171 extend_component_list = \ 172 EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST 173 extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path, 174 OPTIONS_MANAGER.board_list_file_path, 175 OPTIONS_MANAGER.partition_file_obj.name] 176 else: 177 extend_component_list = EXTEND_COMPONENT_LIST 178 extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path, 179 OPTIONS_MANAGER.board_list_file_path] 180 else: 181 extend_component_list = [] 182 extend_path_list = [] 183 idx = 0 184 for key, component in component_dict.items(): 185 if idx < len(extend_component_list): 186 file_path = extend_path_list[idx] 187 else: 188 file_path = \ 189 all_image_file_obj_list[idx - len(extend_component_list)].name 190 digest = get_hash_content(file_path, OPTIONS_MANAGER.hash_algorithm) 191 if not digest: 192 return 193 if component is None: 194 component = copy.copy(COMPONENT_INFO_INNIT) 195 component[0] = key 196 component_list[idx].digest = (ctypes.c_ubyte * 32).from_buffer_copy( 197 binascii.a2b_hex(digest.encode('utf-8'))) 198 component_list[idx].file_path = file_path.encode("utf-8") 199 if not OPTIONS_MANAGER.not_l2: 200 component_list[idx].component_addr = \ 201 ('/%s' % component[0]).encode("utf-8") 202 else: 203 component_list[idx].component_addr = \ 204 ('%s' % component[0]).encode("utf-8") 205 component_list[idx].version = component[4].encode("utf-8") 206 component_list[idx].size = os.path.getsize(file_path) 207 component_list[idx].id = int(component[1]) 208 if component[3] == 1: 209 component_list[idx].original_size = os.path.getsize(file_path) 210 else: 211 component_list[idx].original_size = 0 212 component_list[idx].res_type = int(component[2]) 213 component_list[idx].type = int(component[3]) 214 component_list[idx].flags = IS_DEL 215 216 idx += 1 217 return component_list 218 219 220def get_head_list(component_count, head_value_list): 221 """ 222 According to the header structure, get the list of HEAD headers. 223 :param component_count: number of components 224 :param head_value_list: list of header values 225 :return head_list: header list 226 """ 227 head_list = PkgHeader() 228 if OPTIONS_MANAGER.signing_length != SIGNING_LENGTH_256: 229 # PKG_DIGEST_TYPE_SHA384 3,use sha384 230 head_list.digest_method = 3 231 else: 232 # PKG_DIGEST_TYPE_SHA256 2,use sha256 233 head_list.digest_method = 2 234 if OPTIONS_MANAGER.private_key == ON_SERVER: 235 head_list.sign_method = 0 236 else: 237 if OPTIONS_MANAGER.signing_algorithm == "ECC": 238 # signing algorithm use ECC 239 head_list.sign_method = SignMethod.ECC.value 240 else: 241 # signing algorithm use RSA 242 head_list.sign_method = SignMethod.RSA.value 243 head_list.pkg_type = 1 244 if OPTIONS_MANAGER.not_l2: 245 head_list.pkg_flags = 1 246 else: 247 head_list.pkg_flags = 0 248 head_list.entry_count = component_count 249 head_list.update_file_version = int(head_value_list[0]) 250 head_list.product_update_id = head_value_list[1].encode("utf-8") 251 head_list.software_version = head_value_list[2].encode("utf-8") 252 head_list.date = head_value_list[3].encode("utf-8") 253 head_list.time = head_value_list[4].encode("utf-8") 254 head_list.describe_package_id = ctypes.c_char_p("update/info.bin".encode()) 255 return head_list 256 257 258def get_tools_component_list(count, opera_script_dict): 259 """ 260 Get the list of component information according to 261 the component information structure. 262 :param count: number of components 263 :param opera_script_dict: script file name and path dict 264 :return component_list: list of component information. 265 If exception occurs, return False. 266 """ 267 pkg_components = PkgComponent * count 268 component_list = pkg_components() 269 component_value_list = list(opera_script_dict.keys()) 270 component_num = 0 271 for i, component in enumerate(component_value_list): 272 component_list[i].file_path = component.encode("utf-8") 273 component_list[i].component_addr = \ 274 (opera_script_dict[component]).encode("utf-8") 275 component_num += 1 276 return component_list, component_num 277 278 279def get_tools_head_list(component_count): 280 """ 281 According to the header structure, get the list of HEAD headers. 282 :param component_count: number of components 283 :return head_list: header list 284 """ 285 head_list = PkgHeader() 286 head_list.digest_method = 0 287 head_list.sign_method = 0 288 head_list.pkg_type = 2 289 head_list.pkg_flags = 0 290 head_list.entry_count = component_count 291 return head_list 292 293 294def get_signing_from_server(package_path, hash_algorithm, hash_code=None): 295 """ 296 Server update package signature requires the vendor to 297 implement its own service signature interface, as shown below: 298 ip = "" 299 user_name = "" 300 pass_word = "" 301 signe_jar = "" 302 signing_config = [signe_jar, ip, user_name, pass_word, 303 hash_code, hash_algorithm] 304 cmd = ' '.join(signing_config) 305 subprocess.Popen( 306 cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 307 :param package_path: update package file path 308 :param hash_algorithm: hash algorithm 309 :param hash_code: hash code 310 :return: 311 """ 312 UPDATE_LOGGER.print_log("Signing %s, hash algorithm is: %s, " 313 "Signing hash code: %s" % 314 (package_path, hash_algorithm, hash_code)) 315 signing_content = "" 316 return signing_content.encode() 317 318 319def create_build_tools_zip(): 320 """ 321 Create the update package file. 322 :param lib: lib object 323 :return: 324 """ 325 opera_script_file_name_dict = OPTIONS_MANAGER.opera_script_file_name_dict 326 tmp_dict = {} 327 for each in SCRIPT_KEY_LIST: 328 tmp_dict[each] = [] 329 if opera_script_file_name_dict == tmp_dict: 330 UPDATE_LOGGER.print_log( 331 "Script dict is null!", 332 log_type=UPDATE_LOGGER.ERROR_LOG) 333 return False 334 count = 0 335 opera_script_dict = {} 336 for each_value in opera_script_file_name_dict.values(): 337 for each in each_value: 338 opera_script_dict[each[1].name] = each[0] 339 count += 1 340 # other_file_count --> 1(updater_binary) + 1(loadScript.us) 341 other_file_count = 2 342 count += other_file_count 343 if OPTIONS_MANAGER.register_script_file_obj is not None: 344 count += 1 345 head_list = get_tools_head_list(count) 346 component_list, num = \ 347 get_tools_component_list(count, opera_script_dict) 348 total_script_file_obj = OPTIONS_MANAGER.total_script_file_obj 349 register_script_file_obj = OPTIONS_MANAGER.register_script_file_obj 350 update_exe_path = os.path.join(OPTIONS_MANAGER.target_package_dir, 351 UPDATE_EXE_FILE_NAME) 352 if not os.path.exists(update_exe_path): 353 UPDATE_LOGGER.print_log( 354 "updater_binary file does not exist!path: %s" % update_exe_path, 355 log_type=UPDATE_LOGGER.ERROR_LOG) 356 return False 357 358 file_obj = tempfile.NamedTemporaryFile( 359 dir=OPTIONS_MANAGER.update_package, prefix="build_tools-") 360 zip_file = zipfile.ZipFile(file_obj.name, 'w', zipfile.ZIP_DEFLATED) 361 # add opera_script to build_tools.zip 362 for key, value in opera_script_dict.items(): 363 zip_file.write(key, value) 364 365 # add update_binary to build_tools.zip 366 zip_file.write(update_exe_path, UPDATE_EXE_FILE_NAME) 367 368 # add loadScript to build_tools.zip 369 zip_file.write(total_script_file_obj.name, TOTAL_SCRIPT_FILE_NAME) 370 371 if OPTIONS_MANAGER.register_script_file_obj is not None: 372 zip_file.write(register_script_file_obj.name, REGISTER_SCRIPT_FILE_NAME) 373 zip_file.close() 374 375 return file_obj 376 377 378def build_update_package(no_zip, update_package, prelude_script, 379 verse_script, refrain_script, ending_script): 380 """ 381 Create the update package file. 382 :param no_zip: no zip 383 :param update_package: update package path 384 :param prelude_script: prelude object 385 :param verse_script: verse object 386 :param refrain_script: refrain object 387 :param ending_script: ending object 388 :return: If exception occurs, return False. 389 """ 390 update_bin_obj = create_update_bin() 391 if update_bin_obj: 392 OPTIONS_MANAGER.update_bin_obj = update_bin_obj 393 else: 394 return False 395 396 if OPTIONS_MANAGER.sd_card : 397 package_type = "sd" 398 elif OPTIONS_MANAGER.source_package : 399 package_type = "diff" 400 else : 401 package_type = "full" 402 if OPTIONS_MANAGER.not_l2: 403 update_file_name = ''.join( 404 ["updater_", OPTIONS_MANAGER.target_package_version.replace(" ", "_")]) 405 else : 406 update_file_name = ''.join( 407 ["updater_", package_type]) 408 409 if not no_zip: 410 update_package_path = os.path.join( 411 update_package, '%s_unsigned.zip' % update_file_name) 412 OPTIONS_MANAGER.update_package_file_path = update_package_path 413 414 create_script(prelude_script, verse_script, 415 refrain_script, ending_script) 416 417 build_tools_zip_obj = create_build_tools_zip() 418 if build_tools_zip_obj is False: 419 UPDATE_LOGGER.print_log( 420 "Create build tools zip failed!", 421 log_type=UPDATE_LOGGER.ERROR_LOG) 422 return False 423 OPTIONS_MANAGER.build_tools_zip_obj = build_tools_zip_obj 424 425 zip_file = zipfile.ZipFile(update_package_path, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) 426 # add update.bin to update package 427 zip_file.write(OPTIONS_MANAGER.update_bin_obj.name, "update.bin") 428 # add build_tools.zip to update package 429 zip_file.write(OPTIONS_MANAGER.build_tools_zip_obj.name, BUILD_TOOLS_FILE_NAME) 430 zip_file.close() 431 432 signed_package = os.path.join( 433 update_package, "%s.zip" % update_file_name) 434 OPTIONS_MANAGER.signed_package = signed_package 435 if os.path.exists(signed_package): 436 os.remove(signed_package) 437 438 sign_ota_package = \ 439 OPTIONS_MANAGER.init.invoke_event(SIGN_PACKAGE_EVENT) 440 if sign_ota_package: 441 sign_result = sign_ota_package() 442 else: 443 sign_result = sign_package() 444 445 if not sign_result: 446 UPDATE_LOGGER.print_log("Sign ota package fail", UPDATE_LOGGER.ERROR_LOG) 447 return False 448 if os.path.exists(update_package_path): 449 os.remove(update_package_path) 450 else: 451 update_package_path = os.path.join( 452 update_package, '%s.bin' % update_file_name) 453 if os.path.exists(update_package_path): 454 os.remove(update_package_path) 455 OPTIONS_MANAGER.update_package_file_path = update_package_path 456 with open(OPTIONS_MANAGER.update_bin_obj.name, 'rb') as r_f: 457 content = r_f.read() 458 with open(update_package_path, 'wb') as w_f: 459 w_f.write(content) 460 return True 461 462 463def get_hash_content(file_path, hash_algorithm): 464 """ 465 Use SHA256SUM to get the hash value of the file. 466 :param file_path : file path 467 :param hash_algorithm: hash algorithm 468 :return hash_content: hash value 469 """ 470 try: 471 cmd = [LINUX_HASH_ALGORITHM_DICT[hash_algorithm], file_path] 472 except KeyError: 473 UPDATE_LOGGER.print_log( 474 "Unsupported hash algorithm! %s" % hash_algorithm, 475 log_type=UPDATE_LOGGER.ERROR_LOG) 476 return False 477 if not os.path.exists(file_path): 478 UPDATE_LOGGER.print_log( 479 "%s failed!" % LINUX_HASH_ALGORITHM_DICT[hash_algorithm], 480 UPDATE_LOGGER.ERROR_LOG) 481 raise RuntimeError 482 process_obj = subprocess.Popen( 483 cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 484 process_obj.wait() 485 hash_content = \ 486 process_obj.stdout.read().decode(encoding='gbk').split(' ')[0] 487 if len(hash_content) != HASH_CONTENT_LEN_DICT.get(hash_algorithm): 488 UPDATE_LOGGER.print_log( 489 "Get hash content failed! The length of the hash_content is 0!", 490 UPDATE_LOGGER.ERROR_LOG) 491 raise RuntimeError 492 if process_obj.returncode == 0: 493 UPDATE_LOGGER.print_log( 494 "Get hash content success! path: %s" % file_path) 495 return hash_content 496