• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2021 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16"""
17Description : Defining constants, common interface
18"""
19
20import json
21import os
22import shutil
23import tempfile
24import zipfile
25from collections import OrderedDict
26
27from copy import copy
28from ctypes import cdll
29import xmltodict
30from cryptography.hazmat.primitives import hashes
31from log_exception import UPDATE_LOGGER
32
33operation_path = os.path.dirname(os.path.realpath(__file__))
34PRODUCT = 'hi3516'
35BUILD_TOOLS_FILE_NAME = 'build_tools.zip'
36UPDATE_EXE_FILE_NAME = "updater_binary"
37
38SCRIPT_KEY_LIST = ['prelude', 'verse', 'refrain', 'ending']
39TOTAL_SCRIPT_FILE_NAME = "loadScript.us"
40REGISTER_SCRIPT_FILE_NAME = "registerCmd.us"
41SCRIPT_FILE_NAME = '-script.us'
42
43UPDATER_CONFIG = "updater_config"
44XML_FILE_PATH = "updater_specified_config.xml"
45SO_PATH = os.path.join(operation_path, 'lib/libpackage.so')
46SO_PATH_L1 = os.path.join(operation_path, 'lib/libpackageL1.so')
47DIFF_EXE_PATH = os.path.join(operation_path, 'lib/diff')
48MISC_INFO_PATH = "misc_info.txt"
49VERSION_MBN_PATH = "VERSION.mbn"
50BOARD_LIST_PATH = "BOARD.list"
51EXTEND_COMPONENT_LIST = ["version_list", "board_list"]
52EXTEND_OPTIONAL_COMPONENT_LIST = ["partitions_file"]
53PARTITION_FILE = "partitions_file"
54IGNORED_PARTITION_LIST = ['fastboot', 'boot', 'kernel', 'misc',
55                          'updater', 'userdata']
56
57SPARSE_IMAGE_MAGIC = 0xED26FF3A
58# The related data is the original data of blocks set by chunk_size.
59CHUNK_TYPE_RAW = 0xCAC1
60# The related data is 4-byte fill data.
61CHUNK_TYPE_FILL = 0xCAC2
62# The related data is empty.
63CHUNK_TYPE_DONT_CARE = 0xCAC3
64# CRC32 block
65CHUNK_TYPE_CRC32 = 0xCAC4
66
67HASH_ALGORITHM_DICT = {'sha256': hashes.SHA256, 'sha384': hashes.SHA384}
68LINUX_HASH_ALGORITHM_DICT = {'sha256': 'sha256sum', 'sha384': 'sha384sum'}
69HASH_CONTENT_LEN_DICT = {'sha256': 64, 'sha384': 96}
70
71COMPONENT_INFO_INNIT = ['', '000', '00', '0', '0o00']
72
73ON_SERVER = "ON_SERVER"
74
75# The length of the header information of the sparse image is 28 bytes.
76HEADER_INFO_FORMAT = "<I4H4I"
77HEADER_INFO_LEN = 28
78# The length of the chunk information of the sparse image is 12 bytes.
79CHUNK_INFO_FORMAT = "<2H2I"
80CHUNK_INFO_LEN = 12
81
82EXTEND_VALUE = 512
83
84FILE_MAP_ZERO_KEY = "__ZERO"
85FILE_MAP_NONZERO_KEY = "__NONZERO"
86FILE_MAP_COPY_KEY = "__COPY"
87
88MAX_BLOCKS_PER_GROUP = BLOCK_LIMIT = 1024
89PER_BLOCK_SIZE = 4096
90TWO_STEP = "updater"
91
92SD_CARD_IMAGE_LIST = ["system", "vendor", "userdata"]
93# Image file mount to partition, Correspondence dict.
94IMAGE_FILE_MOUNT_TO_PARTITION_DICT = {"userdata": "data"}
95
96
97def singleton(cls):
98    _instance = {}
99
100    def _singleton(*args, **kargs):
101        if cls not in _instance:
102            _instance[cls] = cls(*args, **kargs)
103        return _instance[cls]
104
105    return _singleton
106
107
108@singleton
109class OptionsManager:
110    """
111    Options management class
112    """
113
114    def __init__(self):
115
116        # Own parameters
117        self.product = None
118
119        # Entry parameters
120        self.source_package = None
121        self.target_package = None
122        self.update_package = None
123        self.no_zip = False
124        self.partition_file = None
125        self.signing_algorithm = None
126        self.hash_algorithm = None
127        self.private_key = None
128        self.not_l2 = False
129        self.signing_length = 256
130        self.xml_path = None
131        self.sd_card = False
132
133        self.make_dir_path = None
134
135        # Parsed package parameters
136        self.target_package_dir = None
137        self.target_package_config_dir = None
138        self.target_package_temp_obj = None
139        self.misc_info_dict = {}
140        self.version_mbn_file_path = None
141        self.version_mbn_content = None
142        self.board_list_file_path = None
143        self.board_list_content = None
144
145        self.source_package_dir = None
146        self.source_package_temp_obj = None
147
148        # XML parsing parameters
149        self.head_info_list = []
150        self.component_info_dict = {}
151        self.full_img_list = []
152        self.incremental_img_list = []
153        self.target_package_version = None
154        self.source_package_version = None
155        self.two_step = False
156        self.full_image_path_list = []
157
158        self.partition_file_obj = None
159
160        # Full processing parameters
161        self.full_image_content_len_list = []
162        self.full_image_file_obj_list = []
163
164        # Incremental processing parameters
165        self.incremental_content_len_list = []
166        self.incremental_image_file_obj_list = []
167        self.incremental_temp_file_obj_list = []
168
169        # Script parameters
170        self.opera_script_file_name_dict = {}
171        for each in SCRIPT_KEY_LIST:
172            self.opera_script_file_name_dict[each] = []
173        self.total_script_file_obj = None
174
175        self.register_script_file_obj = None
176
177        # Update package parameters
178        self.update_bin_obj = None
179        self.build_tools_zip_obj = None
180        self.update_package_file_path = None
181
182
183OPTIONS_MANAGER = OptionsManager()
184
185
186def unzip_package(package_path, origin='target'):
187    """
188    Decompress the zip package.
189    :param package_path: zip package path
190    :param origin: package origin, which indicates
191                whether the zip package is a source package or target package
192    :return: Temporary directory (tmp_dir) and zip_data package;
193            false if an exception occurs.
194    """
195    try:
196        tmp_dir_obj = tempfile.TemporaryDirectory(prefix="%sfiles-" % origin)
197        tmp_dir = tmp_dir_obj.name
198
199        zf_obj = zipfile.ZipFile(package_path)
200        for name in zf_obj.namelist():
201            if name.endswith('/'):
202                os.mkdir(os.path.join(tmp_dir, name))
203            else:
204                ext_filename = os.path.join(
205                    tmp_dir, name)
206                with open(ext_filename, 'wb') as f_w:
207                    f_w.write(zf_obj.read(name))
208    except OSError:
209        UPDATE_LOGGER.print_log(
210            "Unzip package failed! path: %s" % package_path,
211            log_type=UPDATE_LOGGER.ERROR_LOG)
212        return False, False
213    tmp_dir_list = os.listdir(tmp_dir)
214    if len(tmp_dir_list) == 1:
215        unzip_dir = os.path.join(tmp_dir, tmp_dir_list[0])
216        if UPDATER_CONFIG not in \
217                os.listdir(unzip_dir):
218            UPDATE_LOGGER.print_log(
219                'Unsupported zip package structure!', UPDATE_LOGGER.ERROR_LOG)
220            return False, False
221    elif UPDATER_CONFIG in tmp_dir_list:
222        unzip_dir = tmp_dir
223    else:
224        UPDATE_LOGGER.print_log(
225            'Unsupported zip package structure!', UPDATE_LOGGER.ERROR_LOG)
226        return False, False
227    UPDATE_LOGGER.print_log(
228        '%s package unzip complete! path: %s' % (origin.title(), unzip_dir))
229
230    return tmp_dir_obj, unzip_dir
231
232
233def parse_update_config(xml_path):
234    """
235    Parse the XML configuration file.
236    :param xml_path: XML configuration file path
237    :return head_info_dict: header information dict of the update package
238            component_info_dict: component information dict
239            full_img_list: full image list
240            incremental_img_list: incremental image list
241    """
242    two_step = False
243    if os.path.exists(xml_path):
244        with open(xml_path, 'r') as xml_file:
245            xml_str = xml_file.read()
246    else:
247        UPDATE_LOGGER.print_log("XML file does not exist! xml path: %s" %
248                                xml_path, UPDATE_LOGGER.ERROR_LOG)
249        ret_params = [False, False, False, False, False, False, False]
250        return ret_params
251    xml_content_dict = xmltodict.parse(xml_str, encoding='utf-8')
252    package_dict = xml_content_dict.get('package', {})
253    head_dict = package_dict.get('head', {}).get('info')
254    package_version = head_dict.get("@softVersion")
255    # component
256    component_info = package_dict.get('group', {}).get('component')
257    head_list = list(head_dict.values())
258    head_list.pop()
259    whole_list = []
260    difference_list = []
261    component_dict = {}
262    full_image_path_list = []
263
264    if not OPTIONS_MANAGER.not_l2:
265        expand_component(component_dict)
266    if isinstance(component_info, OrderedDict):
267        component_info = [component_info]
268    if component_info is None:
269        ret_params = [[], {}, [], [], '', [], False]
270        return ret_params
271    for component in component_info:
272        component_list = list(component.values())
273        component_list.pop()
274        component_dict[component['@compAddr']] = component_list
275
276        if component['@compAddr'] in (whole_list + difference_list):
277            UPDATE_LOGGER.print_log("This component %s  repeats!" %
278                                    component['@compAddr'],
279                                    UPDATE_LOGGER.ERROR_LOG)
280            ret_params = [False, False, False, False, False, False, False]
281            return ret_params
282
283        if component['@compType'] == '0':
284            whole_list.append(component['@compAddr'])
285            tem_path = os.path.join(OPTIONS_MANAGER.target_package_dir,
286                                    component.get("#text", None))
287            full_image_path_list.append(tem_path)
288        elif component['@compType'] == '1':
289            difference_list.append(component['@compAddr'])
290
291        if component['@compAddr'] == TWO_STEP:
292            two_step = True
293
294    UPDATE_LOGGER.print_log('XML file parsing completed!')
295    if OPTIONS_MANAGER.sd_card:
296        whole_list = SD_CARD_IMAGE_LIST
297        difference_list = []
298        full_image_path_list = \
299            [os.path.join(OPTIONS_MANAGER.target_package_dir, "%s.img" % each)
300             for each in SD_CARD_IMAGE_LIST]
301    ret_params = [head_list, component_dict,
302                  whole_list, difference_list, package_version,
303                  full_image_path_list, two_step]
304    return ret_params
305
306
307def partitions_conversion(data):
308    """
309    Convert the start or length data in the partition table through
310    multiply 1024 * 1024 and return the data.
311    :param data: start or length data
312    :return :
313    """
314    if data == '0':
315        return 0
316    elif data.endswith('M'):
317        return int(data[0:-1]) * 1024 * 1024 // 512
318    else:
319        return False
320
321
322def parse_partition_file_xml(xml_path):
323    """
324    Parse the XML configuration file.
325    :param xml_path: XML configuration file path
326    :return part_json: partition table information in JSON format
327    """
328    if os.path.exists(xml_path):
329        with open(xml_path, 'r') as xml_file:
330            xml_str = xml_file.read()
331    else:
332        UPDATE_LOGGER.print_log("XML file does not exist! xml path: %s" %
333                                xml_path, UPDATE_LOGGER.ERROR_LOG)
334        return False, False, False
335    partitions_list = []
336    partitions_file_path_list = []
337    xml_content_dict = xmltodict.parse(xml_str, encoding='utf-8')
338    part_list = xml_content_dict['Partition_Info']['Part']
339    new_part_list = []
340    for i, part in enumerate(part_list):
341        start_value = partitions_conversion(part.get('@Start'))
342        length_value = partitions_conversion(part.get('@Length'))
343        if start_value is False or length_value is False:
344            UPDATE_LOGGER.print_log(
345                "Partition file parsing failed! part_name: %s, xml_path: %s" %
346                (part.get('@PartitionName'), xml_path),
347                UPDATE_LOGGER.ERROR_LOG)
348            return False, False, False
349
350        if part.get('@PartitionName') not in IGNORED_PARTITION_LIST:
351            partitions_list.append(part.get('@PartitionName'))
352            partitions_file_path_list.append(
353                os.path.join(OPTIONS_MANAGER.target_package_dir,
354                             "%s.img" % part.get('@PartitionName')))
355        part_dict = {'start': start_value,
356                     'length': length_value,
357                     'partName': part.get('@PartitionName'),
358                     'fsType': part.get('@FlashType')}
359        new_part_list.append(part_dict)
360    part_json = json.dumps(new_part_list)
361    part_json = '{"Partition": %s}' % part_json
362    file_obj = tempfile.NamedTemporaryFile(prefix="partition_file-", mode='wb')
363    file_obj.write(part_json.encode())
364    file_obj.seek(0)
365    return file_obj, partitions_list, partitions_file_path_list
366
367
368def expand_component(component_dict):
369    """
370    Append components such as VERSION.mbn and board list.
371    :param component_dict: component information dict
372    :return:
373    """
374    if OPTIONS_MANAGER.partition_file is not None:
375        extend_component_list = \
376            EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST
377    else:
378        extend_component_list = EXTEND_COMPONENT_LIST
379    for each in extend_component_list:
380        tmp_info_list = copy(COMPONENT_INFO_INNIT)
381        tmp_info_list[0] = each
382        component_dict[each] = tmp_info_list
383
384
385def clear_options():
386    """
387    Clear OPTIONS_MANAGER.
388    """
389    OPTIONS_MANAGER.product = None
390
391    # Entry parameters
392    OPTIONS_MANAGER.source_package = None
393    OPTIONS_MANAGER.target_package = None
394    OPTIONS_MANAGER.update_package = None
395    OPTIONS_MANAGER.no_zip = False
396    OPTIONS_MANAGER.partition_file = None
397    OPTIONS_MANAGER.signing_algorithm = None
398    OPTIONS_MANAGER.hash_algorithm = None
399    OPTIONS_MANAGER.private_key = None
400    OPTIONS_MANAGER.not_l2 = False
401    OPTIONS_MANAGER.signing_length = 256
402    OPTIONS_MANAGER.xml_path = None
403    OPTIONS_MANAGER.sd_card = False
404
405    OPTIONS_MANAGER.full_image_path_list = []
406
407    OPTIONS_MANAGER.make_dir_path = None
408
409    # Parsed package parameters
410    OPTIONS_MANAGER.target_package_dir = None
411    OPTIONS_MANAGER.target_package_config_dir = None
412    OPTIONS_MANAGER.target_package_temp_obj = None
413    OPTIONS_MANAGER.misc_info_dict = {}
414    OPTIONS_MANAGER.version_mbn_file_path = None
415    OPTIONS_MANAGER.version_mbn_content = None
416    OPTIONS_MANAGER.board_list_file_path = None
417    OPTIONS_MANAGER.board_list_content = None
418
419    OPTIONS_MANAGER.source_package_dir = None
420    OPTIONS_MANAGER.source_package_temp_obj = None
421
422    # XML parsing parameters
423    OPTIONS_MANAGER.head_info_list = []
424    OPTIONS_MANAGER.component_info_dict = {}
425    OPTIONS_MANAGER.full_img_list = []
426    OPTIONS_MANAGER.incremental_img_list = []
427    OPTIONS_MANAGER.target_package_version = None
428    OPTIONS_MANAGER.source_package_version = None
429    OPTIONS_MANAGER.partition_file_obj = None
430
431    # Global processing parameters
432    OPTIONS_MANAGER.full_image_content_len_list = []
433    OPTIONS_MANAGER.full_image_file_obj_list = []
434
435    # Incremental processing parameters
436    OPTIONS_MANAGER.incremental_content_len_list = []
437    OPTIONS_MANAGER.incremental_image_file_obj_list = []
438    OPTIONS_MANAGER.incremental_temp_file_obj_list = []
439
440    # Script parameters
441    OPTIONS_MANAGER.opera_script_file_name_dict = {}
442    for each in SCRIPT_KEY_LIST:
443        OPTIONS_MANAGER.opera_script_file_name_dict[each] = []
444    OPTIONS_MANAGER.total_script_file_obj = None
445
446    OPTIONS_MANAGER.register_script_file_obj = None
447
448    # Update package parameters
449    OPTIONS_MANAGER.update_bin_obj = None
450    OPTIONS_MANAGER.build_tools_zip_obj = None
451    OPTIONS_MANAGER.update_package_file_path = None
452
453
454def clear_resource(err_clear=False):
455    """
456    Clear resources, close temporary files, and clear temporary paths.
457    :param err_clear: whether to clear errors
458    :return:
459    """
460    target_package_temp_obj = OPTIONS_MANAGER.target_package_temp_obj
461    if target_package_temp_obj is not None:
462        target_package_temp_obj.cleanup()
463    source_package_temp_obj = OPTIONS_MANAGER.source_package_temp_obj
464    if source_package_temp_obj is not None:
465        source_package_temp_obj.cleanup()
466
467    partition_file_obj = OPTIONS_MANAGER.partition_file_obj
468    if partition_file_obj is not None:
469        partition_file_obj.close()
470
471    build_tools_zip_obj = OPTIONS_MANAGER.build_tools_zip_obj
472    if build_tools_zip_obj is not None:
473        build_tools_zip_obj.close()
474    update_bin_obj = OPTIONS_MANAGER.update_bin_obj
475    if update_bin_obj is not None:
476        update_bin_obj.close()
477    total_script_file_obj = OPTIONS_MANAGER.total_script_file_obj
478    if total_script_file_obj is not None:
479        total_script_file_obj.close()
480
481    register_script_file_obj = OPTIONS_MANAGER.register_script_file_obj
482    if register_script_file_obj is not None:
483        register_script_file_obj.close()
484
485    full_image_file_obj_list = OPTIONS_MANAGER.full_image_file_obj_list
486    if len(full_image_file_obj_list) != 0:
487        for each_full_obj in full_image_file_obj_list:
488            each_full_obj.close()
489
490    clear_file_obj(err_clear)
491    clear_options()
492
493
494def clear_file_obj(err_clear):
495    """
496    Clear resources and temporary file objects.
497    :param err_clear: whether to clear errors
498    :return:
499    """
500    incremental_temp_file_obj_list = \
501        OPTIONS_MANAGER.incremental_temp_file_obj_list
502    if len(incremental_temp_file_obj_list) != 0:
503        for each_incremental_temp_obj in incremental_temp_file_obj_list:
504            if each_incremental_temp_obj is not None:
505                each_incremental_temp_obj.close()
506    incremental_image_file_obj_list = \
507        OPTIONS_MANAGER.incremental_image_file_obj_list
508    if len(incremental_image_file_obj_list) != 0:
509        for each_incremental_obj in incremental_image_file_obj_list:
510            if each_incremental_obj is not None:
511                each_incremental_obj.close()
512    opera_script_file_name_dict = OPTIONS_MANAGER.opera_script_file_name_dict
513    for each_value in opera_script_file_name_dict.values():
514        for each in each_value:
515            each[1].close()
516    if err_clear:
517        make_dir_path = OPTIONS_MANAGER.make_dir_path
518        if make_dir_path is not None and os.path.exists(make_dir_path):
519            shutil.rmtree(make_dir_path)
520        update_package_file_path = OPTIONS_MANAGER.update_package_file_path
521        if update_package_file_path is not None and \
522                os.path.exists(update_package_file_path):
523            os.remove(update_package_file_path)
524        UPDATE_LOGGER.print_log(
525            'Exception occurred, Resource cleaning completed!')
526    else:
527        UPDATE_LOGGER.print_log('Resource cleaning completed!')
528
529
530def get_file_content(file_path, file_name=None):
531    """
532    Read the file content.
533    :param file_path: file path
534    :param file_name: file name
535    :return: file content
536    """
537    if not os.path.exists(file_path):
538        UPDATE_LOGGER.print_log(
539            "%s is not exist! path: %s" % (file_name, file_path),
540            log_type=UPDATE_LOGGER.ERROR_LOG)
541        return False
542    with open(file_path, 'r') as r_f:
543        file_content = r_f.read()
544    UPDATE_LOGGER.print_log(
545        "%s file parsing complete! path: %s" % (file_name, file_path))
546    return file_content
547
548
549def get_update_info():
550    """
551    Parse the configuration file to obtain the update information.
552    :return: update information if any; false otherwise.
553    """
554    if not OPTIONS_MANAGER.not_l2:
555        OPTIONS_MANAGER.version_mbn_file_path = os.path.join(
556            OPTIONS_MANAGER.target_package_config_dir, VERSION_MBN_PATH)
557        version_mbn_content = \
558            get_file_content(
559                OPTIONS_MANAGER.version_mbn_file_path, os.path.basename(
560                    os.path.join(OPTIONS_MANAGER.target_package_config_dir,
561                                 VERSION_MBN_PATH)))
562        if version_mbn_content is False:
563            UPDATE_LOGGER.print_log(
564                "Get version mbn content failed!",
565                log_type=UPDATE_LOGGER.ERROR_LOG)
566            return False
567        OPTIONS_MANAGER.version_mbn_content = version_mbn_content
568        OPTIONS_MANAGER.board_list_file_path = os.path.join(
569            OPTIONS_MANAGER.target_package_config_dir, BOARD_LIST_PATH)
570        board_list_content = \
571            get_file_content(
572                OPTIONS_MANAGER.board_list_file_path, os.path.basename(
573                    os.path.join(OPTIONS_MANAGER.target_package_config_dir,
574                                 BOARD_LIST_PATH)))
575        if board_list_content is False:
576            UPDATE_LOGGER.print_log(
577                "Get board list content failed!",
578                log_type=UPDATE_LOGGER.ERROR_LOG)
579            return False
580        OPTIONS_MANAGER.board_list_content = board_list_content
581
582    if OPTIONS_MANAGER.xml_path is None:
583        xml_file_path = os.path.join(
584            OPTIONS_MANAGER.target_package_config_dir, XML_FILE_PATH)
585    else:
586        xml_file_path = OPTIONS_MANAGER.xml_path
587
588    # Parse the XML configuration file.
589    head_info_list, component_info_dict, \
590        full_img_list, incremental_img_list, \
591        OPTIONS_MANAGER.target_package_version, \
592        OPTIONS_MANAGER.full_image_path_list, \
593        OPTIONS_MANAGER.two_step = \
594        parse_update_config(xml_file_path)
595    if head_info_list is False or component_info_dict is False or \
596            full_img_list is False or incremental_img_list is False:
597        UPDATE_LOGGER.print_log(
598            "Get parse update config xml failed!",
599            log_type=UPDATE_LOGGER.ERROR_LOG)
600        return False
601    OPTIONS_MANAGER.head_info_list, OPTIONS_MANAGER.component_info_dict, \
602        OPTIONS_MANAGER.full_img_list, OPTIONS_MANAGER.incremental_img_list = \
603        head_info_list, component_info_dict, \
604        full_img_list, incremental_img_list
605    return True
606
607
608def get_lib_api(is_l2=True):
609    """
610    Get the so API.
611    :param is_l2: Is it L2 so
612    :return:
613    """
614    if is_l2:
615        so_path = SO_PATH
616    else:
617        so_path = SO_PATH_L1
618    if not os.path.exists(so_path):
619        UPDATE_LOGGER.print_log(
620            "So does not exist! so path: %s" % so_path,
621            UPDATE_LOGGER.ERROR_LOG)
622        raise RuntimeError
623    lib = cdll.LoadLibrary(so_path)
624    return lib
625