1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2021 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16""" 17Description : Defining constants, common interface 18""" 19 20import json 21import os 22import shutil 23import tempfile 24import zipfile 25from collections import OrderedDict 26 27from copy import copy 28from ctypes import cdll 29import xmltodict 30from cryptography.hazmat.primitives import hashes 31from log_exception import UPDATE_LOGGER 32 33operation_path = os.path.dirname(os.path.realpath(__file__)) 34PRODUCT = 'hi3516' 35BUILD_TOOLS_FILE_NAME = 'build_tools.zip' 36UPDATE_EXE_FILE_NAME = "updater_binary" 37 38SCRIPT_KEY_LIST = ['prelude', 'verse', 'refrain', 'ending'] 39TOTAL_SCRIPT_FILE_NAME = "loadScript.us" 40REGISTER_SCRIPT_FILE_NAME = "registerCmd.us" 41SCRIPT_FILE_NAME = '-script.us' 42 43UPDATER_CONFIG = "updater_config" 44XML_FILE_PATH = "updater_specified_config.xml" 45SO_PATH = os.path.join(operation_path, 'lib/libpackage.so') 46SO_PATH_L1 = os.path.join(operation_path, 'lib/libpackageL1.so') 47DIFF_EXE_PATH = os.path.join(operation_path, 'lib/diff') 48MISC_INFO_PATH = "misc_info.txt" 49VERSION_MBN_PATH = "VERSION.mbn" 50BOARD_LIST_PATH = "BOARD.list" 51EXTEND_COMPONENT_LIST = ["version_list", "board_list"] 52EXTEND_OPTIONAL_COMPONENT_LIST = ["partitions_file"] 53PARTITION_FILE = "partitions_file" 54IGNORED_PARTITION_LIST = ['fastboot', 'boot', 'kernel', 'misc', 55 'updater', 'userdata'] 56 57SPARSE_IMAGE_MAGIC = 0xED26FF3A 58# The related data is the original data of blocks set by chunk_size. 59CHUNK_TYPE_RAW = 0xCAC1 60# The related data is 4-byte fill data. 61CHUNK_TYPE_FILL = 0xCAC2 62# The related data is empty. 63CHUNK_TYPE_DONT_CARE = 0xCAC3 64# CRC32 block 65CHUNK_TYPE_CRC32 = 0xCAC4 66 67HASH_ALGORITHM_DICT = {'sha256': hashes.SHA256, 'sha384': hashes.SHA384} 68LINUX_HASH_ALGORITHM_DICT = {'sha256': 'sha256sum', 'sha384': 'sha384sum'} 69HASH_CONTENT_LEN_DICT = {'sha256': 64, 'sha384': 96} 70 71COMPONENT_INFO_INNIT = ['', '000', '00', '0', '0o00'] 72 73ON_SERVER = "ON_SERVER" 74 75# The length of the header information of the sparse image is 28 bytes. 76HEADER_INFO_FORMAT = "<I4H4I" 77HEADER_INFO_LEN = 28 78# The length of the chunk information of the sparse image is 12 bytes. 79CHUNK_INFO_FORMAT = "<2H2I" 80CHUNK_INFO_LEN = 12 81 82EXTEND_VALUE = 512 83 84FILE_MAP_ZERO_KEY = "__ZERO" 85FILE_MAP_NONZERO_KEY = "__NONZERO" 86FILE_MAP_COPY_KEY = "__COPY" 87 88MAX_BLOCKS_PER_GROUP = BLOCK_LIMIT = 1024 89PER_BLOCK_SIZE = 4096 90TWO_STEP = "updater" 91 92SD_CARD_IMAGE_LIST = ["system", "vendor", "userdata"] 93# Image file mount to partition, Correspondence dict. 94IMAGE_FILE_MOUNT_TO_PARTITION_DICT = {"userdata": "data"} 95 96 97def singleton(cls): 98 _instance = {} 99 100 def _singleton(*args, **kargs): 101 if cls not in _instance: 102 _instance[cls] = cls(*args, **kargs) 103 return _instance[cls] 104 105 return _singleton 106 107 108@singleton 109class OptionsManager: 110 """ 111 Options management class 112 """ 113 114 def __init__(self): 115 116 # Own parameters 117 self.product = None 118 119 # Entry parameters 120 self.source_package = None 121 self.target_package = None 122 self.update_package = None 123 self.no_zip = False 124 self.partition_file = None 125 self.signing_algorithm = None 126 self.hash_algorithm = None 127 self.private_key = None 128 self.not_l2 = False 129 self.signing_length = 256 130 self.xml_path = None 131 self.sd_card = False 132 133 self.make_dir_path = None 134 135 # Parsed package parameters 136 self.target_package_dir = None 137 self.target_package_config_dir = None 138 self.target_package_temp_obj = None 139 self.misc_info_dict = {} 140 self.version_mbn_file_path = None 141 self.version_mbn_content = None 142 self.board_list_file_path = None 143 self.board_list_content = None 144 145 self.source_package_dir = None 146 self.source_package_temp_obj = None 147 148 # XML parsing parameters 149 self.head_info_list = [] 150 self.component_info_dict = {} 151 self.full_img_list = [] 152 self.incremental_img_list = [] 153 self.target_package_version = None 154 self.source_package_version = None 155 self.two_step = False 156 self.full_image_path_list = [] 157 158 self.partition_file_obj = None 159 160 # Full processing parameters 161 self.full_image_content_len_list = [] 162 self.full_image_file_obj_list = [] 163 164 # Incremental processing parameters 165 self.incremental_content_len_list = [] 166 self.incremental_image_file_obj_list = [] 167 self.incremental_temp_file_obj_list = [] 168 169 # Script parameters 170 self.opera_script_file_name_dict = {} 171 for each in SCRIPT_KEY_LIST: 172 self.opera_script_file_name_dict[each] = [] 173 self.total_script_file_obj = None 174 175 self.register_script_file_obj = None 176 177 # Update package parameters 178 self.update_bin_obj = None 179 self.build_tools_zip_obj = None 180 self.update_package_file_path = None 181 182 183OPTIONS_MANAGER = OptionsManager() 184 185 186def unzip_package(package_path, origin='target'): 187 """ 188 Decompress the zip package. 189 :param package_path: zip package path 190 :param origin: package origin, which indicates 191 whether the zip package is a source package or target package 192 :return: Temporary directory (tmp_dir) and zip_data package; 193 false if an exception occurs. 194 """ 195 try: 196 tmp_dir_obj = tempfile.TemporaryDirectory(prefix="%sfiles-" % origin) 197 tmp_dir = tmp_dir_obj.name 198 199 zf_obj = zipfile.ZipFile(package_path) 200 for name in zf_obj.namelist(): 201 if name.endswith('/'): 202 os.mkdir(os.path.join(tmp_dir, name)) 203 else: 204 ext_filename = os.path.join( 205 tmp_dir, name) 206 with open(ext_filename, 'wb') as f_w: 207 f_w.write(zf_obj.read(name)) 208 except OSError: 209 UPDATE_LOGGER.print_log( 210 "Unzip package failed! path: %s" % package_path, 211 log_type=UPDATE_LOGGER.ERROR_LOG) 212 return False, False 213 tmp_dir_list = os.listdir(tmp_dir) 214 if len(tmp_dir_list) == 1: 215 unzip_dir = os.path.join(tmp_dir, tmp_dir_list[0]) 216 if UPDATER_CONFIG not in \ 217 os.listdir(unzip_dir): 218 UPDATE_LOGGER.print_log( 219 'Unsupported zip package structure!', UPDATE_LOGGER.ERROR_LOG) 220 return False, False 221 elif UPDATER_CONFIG in tmp_dir_list: 222 unzip_dir = tmp_dir 223 else: 224 UPDATE_LOGGER.print_log( 225 'Unsupported zip package structure!', UPDATE_LOGGER.ERROR_LOG) 226 return False, False 227 UPDATE_LOGGER.print_log( 228 '%s package unzip complete! path: %s' % (origin.title(), unzip_dir)) 229 230 return tmp_dir_obj, unzip_dir 231 232 233def parse_update_config(xml_path): 234 """ 235 Parse the XML configuration file. 236 :param xml_path: XML configuration file path 237 :return head_info_dict: header information dict of the update package 238 component_info_dict: component information dict 239 full_img_list: full image list 240 incremental_img_list: incremental image list 241 """ 242 two_step = False 243 if os.path.exists(xml_path): 244 with open(xml_path, 'r') as xml_file: 245 xml_str = xml_file.read() 246 else: 247 UPDATE_LOGGER.print_log("XML file does not exist! xml path: %s" % 248 xml_path, UPDATE_LOGGER.ERROR_LOG) 249 ret_params = [False, False, False, False, False, False, False] 250 return ret_params 251 xml_content_dict = xmltodict.parse(xml_str, encoding='utf-8') 252 package_dict = xml_content_dict.get('package', {}) 253 head_dict = package_dict.get('head', {}).get('info') 254 package_version = head_dict.get("@softVersion") 255 # component 256 component_info = package_dict.get('group', {}).get('component') 257 head_list = list(head_dict.values()) 258 head_list.pop() 259 whole_list = [] 260 difference_list = [] 261 component_dict = {} 262 full_image_path_list = [] 263 264 if not OPTIONS_MANAGER.not_l2: 265 expand_component(component_dict) 266 if isinstance(component_info, OrderedDict): 267 component_info = [component_info] 268 if component_info is None: 269 ret_params = [[], {}, [], [], '', [], False] 270 return ret_params 271 for component in component_info: 272 component_list = list(component.values()) 273 component_list.pop() 274 component_dict[component['@compAddr']] = component_list 275 276 if component['@compAddr'] in (whole_list + difference_list): 277 UPDATE_LOGGER.print_log("This component %s repeats!" % 278 component['@compAddr'], 279 UPDATE_LOGGER.ERROR_LOG) 280 ret_params = [False, False, False, False, False, False, False] 281 return ret_params 282 283 if component['@compType'] == '0': 284 whole_list.append(component['@compAddr']) 285 tem_path = os.path.join(OPTIONS_MANAGER.target_package_dir, 286 component.get("#text", None)) 287 full_image_path_list.append(tem_path) 288 elif component['@compType'] == '1': 289 difference_list.append(component['@compAddr']) 290 291 if component['@compAddr'] == TWO_STEP: 292 two_step = True 293 294 UPDATE_LOGGER.print_log('XML file parsing completed!') 295 if OPTIONS_MANAGER.sd_card: 296 whole_list = SD_CARD_IMAGE_LIST 297 difference_list = [] 298 full_image_path_list = \ 299 [os.path.join(OPTIONS_MANAGER.target_package_dir, "%s.img" % each) 300 for each in SD_CARD_IMAGE_LIST] 301 ret_params = [head_list, component_dict, 302 whole_list, difference_list, package_version, 303 full_image_path_list, two_step] 304 return ret_params 305 306 307def partitions_conversion(data): 308 """ 309 Convert the start or length data in the partition table through 310 multiply 1024 * 1024 and return the data. 311 :param data: start or length data 312 :return : 313 """ 314 if data == '0': 315 return 0 316 elif data.endswith('M'): 317 return int(data[0:-1]) * 1024 * 1024 // 512 318 else: 319 return False 320 321 322def parse_partition_file_xml(xml_path): 323 """ 324 Parse the XML configuration file. 325 :param xml_path: XML configuration file path 326 :return part_json: partition table information in JSON format 327 """ 328 if os.path.exists(xml_path): 329 with open(xml_path, 'r') as xml_file: 330 xml_str = xml_file.read() 331 else: 332 UPDATE_LOGGER.print_log("XML file does not exist! xml path: %s" % 333 xml_path, UPDATE_LOGGER.ERROR_LOG) 334 return False, False, False 335 partitions_list = [] 336 partitions_file_path_list = [] 337 xml_content_dict = xmltodict.parse(xml_str, encoding='utf-8') 338 part_list = xml_content_dict['Partition_Info']['Part'] 339 new_part_list = [] 340 for i, part in enumerate(part_list): 341 start_value = partitions_conversion(part.get('@Start')) 342 length_value = partitions_conversion(part.get('@Length')) 343 if start_value is False or length_value is False: 344 UPDATE_LOGGER.print_log( 345 "Partition file parsing failed! part_name: %s, xml_path: %s" % 346 (part.get('@PartitionName'), xml_path), 347 UPDATE_LOGGER.ERROR_LOG) 348 return False, False, False 349 350 if part.get('@PartitionName') not in IGNORED_PARTITION_LIST: 351 partitions_list.append(part.get('@PartitionName')) 352 partitions_file_path_list.append( 353 os.path.join(OPTIONS_MANAGER.target_package_dir, 354 "%s.img" % part.get('@PartitionName'))) 355 part_dict = {'start': start_value, 356 'length': length_value, 357 'partName': part.get('@PartitionName'), 358 'fsType': part.get('@FlashType')} 359 new_part_list.append(part_dict) 360 part_json = json.dumps(new_part_list) 361 part_json = '{"Partition": %s}' % part_json 362 file_obj = tempfile.NamedTemporaryFile(prefix="partition_file-", mode='wb') 363 file_obj.write(part_json.encode()) 364 file_obj.seek(0) 365 return file_obj, partitions_list, partitions_file_path_list 366 367 368def expand_component(component_dict): 369 """ 370 Append components such as VERSION.mbn and board list. 371 :param component_dict: component information dict 372 :return: 373 """ 374 if OPTIONS_MANAGER.partition_file is not None: 375 extend_component_list = \ 376 EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST 377 else: 378 extend_component_list = EXTEND_COMPONENT_LIST 379 for each in extend_component_list: 380 tmp_info_list = copy(COMPONENT_INFO_INNIT) 381 tmp_info_list[0] = each 382 component_dict[each] = tmp_info_list 383 384 385def clear_options(): 386 """ 387 Clear OPTIONS_MANAGER. 388 """ 389 OPTIONS_MANAGER.product = None 390 391 # Entry parameters 392 OPTIONS_MANAGER.source_package = None 393 OPTIONS_MANAGER.target_package = None 394 OPTIONS_MANAGER.update_package = None 395 OPTIONS_MANAGER.no_zip = False 396 OPTIONS_MANAGER.partition_file = None 397 OPTIONS_MANAGER.signing_algorithm = None 398 OPTIONS_MANAGER.hash_algorithm = None 399 OPTIONS_MANAGER.private_key = None 400 OPTIONS_MANAGER.not_l2 = False 401 OPTIONS_MANAGER.signing_length = 256 402 OPTIONS_MANAGER.xml_path = None 403 OPTIONS_MANAGER.sd_card = False 404 405 OPTIONS_MANAGER.full_image_path_list = [] 406 407 OPTIONS_MANAGER.make_dir_path = None 408 409 # Parsed package parameters 410 OPTIONS_MANAGER.target_package_dir = None 411 OPTIONS_MANAGER.target_package_config_dir = None 412 OPTIONS_MANAGER.target_package_temp_obj = None 413 OPTIONS_MANAGER.misc_info_dict = {} 414 OPTIONS_MANAGER.version_mbn_file_path = None 415 OPTIONS_MANAGER.version_mbn_content = None 416 OPTIONS_MANAGER.board_list_file_path = None 417 OPTIONS_MANAGER.board_list_content = None 418 419 OPTIONS_MANAGER.source_package_dir = None 420 OPTIONS_MANAGER.source_package_temp_obj = None 421 422 # XML parsing parameters 423 OPTIONS_MANAGER.head_info_list = [] 424 OPTIONS_MANAGER.component_info_dict = {} 425 OPTIONS_MANAGER.full_img_list = [] 426 OPTIONS_MANAGER.incremental_img_list = [] 427 OPTIONS_MANAGER.target_package_version = None 428 OPTIONS_MANAGER.source_package_version = None 429 OPTIONS_MANAGER.partition_file_obj = None 430 431 # Global processing parameters 432 OPTIONS_MANAGER.full_image_content_len_list = [] 433 OPTIONS_MANAGER.full_image_file_obj_list = [] 434 435 # Incremental processing parameters 436 OPTIONS_MANAGER.incremental_content_len_list = [] 437 OPTIONS_MANAGER.incremental_image_file_obj_list = [] 438 OPTIONS_MANAGER.incremental_temp_file_obj_list = [] 439 440 # Script parameters 441 OPTIONS_MANAGER.opera_script_file_name_dict = {} 442 for each in SCRIPT_KEY_LIST: 443 OPTIONS_MANAGER.opera_script_file_name_dict[each] = [] 444 OPTIONS_MANAGER.total_script_file_obj = None 445 446 OPTIONS_MANAGER.register_script_file_obj = None 447 448 # Update package parameters 449 OPTIONS_MANAGER.update_bin_obj = None 450 OPTIONS_MANAGER.build_tools_zip_obj = None 451 OPTIONS_MANAGER.update_package_file_path = None 452 453 454def clear_resource(err_clear=False): 455 """ 456 Clear resources, close temporary files, and clear temporary paths. 457 :param err_clear: whether to clear errors 458 :return: 459 """ 460 target_package_temp_obj = OPTIONS_MANAGER.target_package_temp_obj 461 if target_package_temp_obj is not None: 462 target_package_temp_obj.cleanup() 463 source_package_temp_obj = OPTIONS_MANAGER.source_package_temp_obj 464 if source_package_temp_obj is not None: 465 source_package_temp_obj.cleanup() 466 467 partition_file_obj = OPTIONS_MANAGER.partition_file_obj 468 if partition_file_obj is not None: 469 partition_file_obj.close() 470 471 build_tools_zip_obj = OPTIONS_MANAGER.build_tools_zip_obj 472 if build_tools_zip_obj is not None: 473 build_tools_zip_obj.close() 474 update_bin_obj = OPTIONS_MANAGER.update_bin_obj 475 if update_bin_obj is not None: 476 update_bin_obj.close() 477 total_script_file_obj = OPTIONS_MANAGER.total_script_file_obj 478 if total_script_file_obj is not None: 479 total_script_file_obj.close() 480 481 register_script_file_obj = OPTIONS_MANAGER.register_script_file_obj 482 if register_script_file_obj is not None: 483 register_script_file_obj.close() 484 485 full_image_file_obj_list = OPTIONS_MANAGER.full_image_file_obj_list 486 if len(full_image_file_obj_list) != 0: 487 for each_full_obj in full_image_file_obj_list: 488 each_full_obj.close() 489 490 clear_file_obj(err_clear) 491 clear_options() 492 493 494def clear_file_obj(err_clear): 495 """ 496 Clear resources and temporary file objects. 497 :param err_clear: whether to clear errors 498 :return: 499 """ 500 incremental_temp_file_obj_list = \ 501 OPTIONS_MANAGER.incremental_temp_file_obj_list 502 if len(incremental_temp_file_obj_list) != 0: 503 for each_incremental_temp_obj in incremental_temp_file_obj_list: 504 if each_incremental_temp_obj is not None: 505 each_incremental_temp_obj.close() 506 incremental_image_file_obj_list = \ 507 OPTIONS_MANAGER.incremental_image_file_obj_list 508 if len(incremental_image_file_obj_list) != 0: 509 for each_incremental_obj in incremental_image_file_obj_list: 510 if each_incremental_obj is not None: 511 each_incremental_obj.close() 512 opera_script_file_name_dict = OPTIONS_MANAGER.opera_script_file_name_dict 513 for each_value in opera_script_file_name_dict.values(): 514 for each in each_value: 515 each[1].close() 516 if err_clear: 517 make_dir_path = OPTIONS_MANAGER.make_dir_path 518 if make_dir_path is not None and os.path.exists(make_dir_path): 519 shutil.rmtree(make_dir_path) 520 update_package_file_path = OPTIONS_MANAGER.update_package_file_path 521 if update_package_file_path is not None and \ 522 os.path.exists(update_package_file_path): 523 os.remove(update_package_file_path) 524 UPDATE_LOGGER.print_log( 525 'Exception occurred, Resource cleaning completed!') 526 else: 527 UPDATE_LOGGER.print_log('Resource cleaning completed!') 528 529 530def get_file_content(file_path, file_name=None): 531 """ 532 Read the file content. 533 :param file_path: file path 534 :param file_name: file name 535 :return: file content 536 """ 537 if not os.path.exists(file_path): 538 UPDATE_LOGGER.print_log( 539 "%s is not exist! path: %s" % (file_name, file_path), 540 log_type=UPDATE_LOGGER.ERROR_LOG) 541 return False 542 with open(file_path, 'r') as r_f: 543 file_content = r_f.read() 544 UPDATE_LOGGER.print_log( 545 "%s file parsing complete! path: %s" % (file_name, file_path)) 546 return file_content 547 548 549def get_update_info(): 550 """ 551 Parse the configuration file to obtain the update information. 552 :return: update information if any; false otherwise. 553 """ 554 if not OPTIONS_MANAGER.not_l2: 555 OPTIONS_MANAGER.version_mbn_file_path = os.path.join( 556 OPTIONS_MANAGER.target_package_config_dir, VERSION_MBN_PATH) 557 version_mbn_content = \ 558 get_file_content( 559 OPTIONS_MANAGER.version_mbn_file_path, os.path.basename( 560 os.path.join(OPTIONS_MANAGER.target_package_config_dir, 561 VERSION_MBN_PATH))) 562 if version_mbn_content is False: 563 UPDATE_LOGGER.print_log( 564 "Get version mbn content failed!", 565 log_type=UPDATE_LOGGER.ERROR_LOG) 566 return False 567 OPTIONS_MANAGER.version_mbn_content = version_mbn_content 568 OPTIONS_MANAGER.board_list_file_path = os.path.join( 569 OPTIONS_MANAGER.target_package_config_dir, BOARD_LIST_PATH) 570 board_list_content = \ 571 get_file_content( 572 OPTIONS_MANAGER.board_list_file_path, os.path.basename( 573 os.path.join(OPTIONS_MANAGER.target_package_config_dir, 574 BOARD_LIST_PATH))) 575 if board_list_content is False: 576 UPDATE_LOGGER.print_log( 577 "Get board list content failed!", 578 log_type=UPDATE_LOGGER.ERROR_LOG) 579 return False 580 OPTIONS_MANAGER.board_list_content = board_list_content 581 582 if OPTIONS_MANAGER.xml_path is None: 583 xml_file_path = os.path.join( 584 OPTIONS_MANAGER.target_package_config_dir, XML_FILE_PATH) 585 else: 586 xml_file_path = OPTIONS_MANAGER.xml_path 587 588 # Parse the XML configuration file. 589 head_info_list, component_info_dict, \ 590 full_img_list, incremental_img_list, \ 591 OPTIONS_MANAGER.target_package_version, \ 592 OPTIONS_MANAGER.full_image_path_list, \ 593 OPTIONS_MANAGER.two_step = \ 594 parse_update_config(xml_file_path) 595 if head_info_list is False or component_info_dict is False or \ 596 full_img_list is False or incremental_img_list is False: 597 UPDATE_LOGGER.print_log( 598 "Get parse update config xml failed!", 599 log_type=UPDATE_LOGGER.ERROR_LOG) 600 return False 601 OPTIONS_MANAGER.head_info_list, OPTIONS_MANAGER.component_info_dict, \ 602 OPTIONS_MANAGER.full_img_list, OPTIONS_MANAGER.incremental_img_list = \ 603 head_info_list, component_info_dict, \ 604 full_img_list, incremental_img_list 605 return True 606 607 608def get_lib_api(is_l2=True): 609 """ 610 Get the so API. 611 :param is_l2: Is it L2 so 612 :return: 613 """ 614 if is_l2: 615 so_path = SO_PATH 616 else: 617 so_path = SO_PATH_L1 618 if not os.path.exists(so_path): 619 UPDATE_LOGGER.print_log( 620 "So does not exist! so path: %s" % so_path, 621 UPDATE_LOGGER.ERROR_LOG) 622 raise RuntimeError 623 lib = cdll.LoadLibrary(so_path) 624 return lib 625