• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2021 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16import binascii
17import copy
18import os
19import subprocess
20import tempfile
21import time
22import collections as collect
23import enum
24import ctypes
25import zipfile
26
27from log_exception import UPDATE_LOGGER
28from script_generator import create_script
29from utils import sign_package
30from utils import HASH_CONTENT_LEN_DICT
31from utils import OPTIONS_MANAGER
32from utils import REGISTER_SCRIPT_FILE_NAME
33from utils import ON_SERVER
34from utils import SCRIPT_KEY_LIST
35from utils import EXTEND_OPTIONAL_COMPONENT_LIST
36from utils import COMPONENT_INFO_INNIT
37from utils import UPDATE_EXE_FILE_NAME
38from utils import TOTAL_SCRIPT_FILE_NAME
39from utils import EXTEND_COMPONENT_LIST
40from utils import LINUX_HASH_ALGORITHM_DICT
41from utils import UPDATE_BIN_FILE_NAME
42from utils import BUILD_TOOLS_FILE_NAME
43from utils import SIGN_PACKAGE_EVENT
44from utils import GENERATE_SIGNED_DATA_EVENT
45from create_update_package import CreatePackage
46from create_update_package import SIGN_ALGO_RSA
47from create_update_package import SIGN_ALGO_PSS
48from create_signed_data import generate_signed_data_default
49
50IS_DEL = 0
51SIGNING_LENGTH_256 = 256
52DIGEST_LEN = 32
53HASH_VALUE_MAX_LEN = 128
54
55
56class SignMethod(enum.Enum):
57    RSA = 1
58    ECC = 2
59
60
61class PkgHeader(ctypes.Structure):
62    _fields_ = [("digest_method", ctypes.c_ubyte),
63                ("sign_method", ctypes.c_ubyte),
64                ("pkg_type", ctypes.c_ubyte),
65                ("pkg_flags", ctypes.c_ubyte),
66                ("entry_count", ctypes.c_int),
67                ("update_file_version", ctypes.c_int),
68                ("product_update_id", ctypes.c_char_p),
69                ("software_version", ctypes.c_char_p),
70                ("date", ctypes.c_char_p),
71                ("time", ctypes.c_char_p),
72                ("describe_package_id", ctypes.c_char_p)]
73
74
75class PkgComponent(ctypes.Structure):
76    _fields_ = [("digest", ctypes.c_ubyte * DIGEST_LEN),
77                ("file_path", ctypes.c_char_p),
78                ("component_addr", ctypes.c_char_p),
79                ("version", ctypes.c_char_p),
80                ("size", ctypes.c_int),
81                ("id", ctypes.c_int),
82                ("original_size", ctypes.c_int),
83                ("res_type", ctypes.c_ubyte),
84                ("type", ctypes.c_ubyte),
85                ("flags", ctypes.c_ubyte)]
86
87
88class SignInfo(ctypes.Structure):
89    _fields_ = [("sign_offset", ctypes.c_int),
90                ("hash_len", ctypes.c_int),
91                ("hash_code", ctypes.c_ubyte * (HASH_VALUE_MAX_LEN + 1))]
92
93
94def create_update_bin():
95    """
96    Call the interface to generate the update.bin file.
97    :return update_bin_obj: Update file object.
98                            If exception occurs, return False.
99    """
100    update_bin_obj = tempfile.NamedTemporaryFile(
101        dir=OPTIONS_MANAGER.update_package, prefix="update_bin-")
102
103    head_value_list = OPTIONS_MANAGER.head_info_list
104    component_dict = OPTIONS_MANAGER.component_info_dict
105    full_image_file_obj_list = OPTIONS_MANAGER.full_image_file_obj_list
106    full_img_list = OPTIONS_MANAGER.full_img_list
107
108    if not OPTIONS_MANAGER.not_l2:
109        if OPTIONS_MANAGER.partition_file_obj is not None:
110            all_image_name = \
111                EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST + full_img_list
112        else:
113            all_image_name = EXTEND_COMPONENT_LIST + full_img_list
114    else:
115        all_image_name = full_img_list
116    sort_component_dict = collect.OrderedDict()
117    for each_image_name in all_image_name:
118        sort_component_dict[each_image_name] = \
119            component_dict.get(each_image_name)
120    component_dict = copy.deepcopy(sort_component_dict)
121    head_list = get_head_list(len(component_dict), head_value_list)
122
123    component_list = get_component_list(
124        full_image_file_obj_list, component_dict)
125
126    save_patch = update_bin_obj.name.encode("utf-8")
127    if OPTIONS_MANAGER.private_key == ON_SERVER:
128        private_key = "./update_package.py"
129    else:
130        private_key = OPTIONS_MANAGER.private_key.encode("utf-8")
131
132    if OPTIONS_MANAGER.not_l2:
133        sign_algo = SIGN_ALGO_PSS
134    else:
135        sign_algo = SIGN_ALGO_RSA
136
137    # create bin
138    package = CreatePackage(head_list, component_list, save_patch,
139        OPTIONS_MANAGER.private_key)
140    if not package.create_package():
141        UPDATE_LOGGER.print_log(
142            "Create update package .bin failed!", UPDATE_LOGGER.ERROR_LOG)
143        return False
144
145    UPDATE_LOGGER.print_log(
146        "Create update package .bin complete! path: %s" % update_bin_obj.name)
147    return update_bin_obj
148
149
150def get_component_list(all_image_file_obj_list, component_dict):
151    """
152    Get the list of component information according to
153    the component information structure.
154    :param all_image_file_obj_list: all image object file list
155    :param component_dict: Component information content dict
156    :return component_list: List of component information.
157                            If exception occurs, return False.
158    """
159    pkg_components = PkgComponent * len(component_dict)
160    component_list = pkg_components()
161    if not OPTIONS_MANAGER.not_l2:
162        if OPTIONS_MANAGER.partition_file_obj is not None:
163            extend_component_list = \
164                EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST
165            extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path,
166                                OPTIONS_MANAGER.board_list_file_path,
167                                OPTIONS_MANAGER.partition_file_obj.name]
168        else:
169            extend_component_list = EXTEND_COMPONENT_LIST
170            extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path,
171                                OPTIONS_MANAGER.board_list_file_path]
172    else:
173        extend_component_list = []
174        extend_path_list = []
175    idx = 0
176    for key, component in component_dict.items():
177        if idx < len(extend_component_list):
178            file_path = extend_path_list[idx]
179        else:
180            file_path = \
181                all_image_file_obj_list[idx - len(extend_component_list)].name
182        digest = get_hash_content(file_path, OPTIONS_MANAGER.hash_algorithm)
183        if not digest:
184            return
185        if component is None:
186            component = copy.copy(COMPONENT_INFO_INNIT)
187            component[0] = key
188        component_list[idx].digest = (ctypes.c_ubyte * 32).from_buffer_copy(
189            binascii.a2b_hex(digest.encode('utf-8')))
190        component_list[idx].file_path = file_path.encode("utf-8")
191        if not OPTIONS_MANAGER.not_l2:
192            component_list[idx].component_addr = \
193                ('/%s' % component[0]).encode("utf-8")
194        else:
195            component_list[idx].component_addr = \
196                ('%s' % component[0]).encode("utf-8")
197        component_list[idx].version = component[4].encode("utf-8")
198        component_list[idx].size = os.path.getsize(file_path)
199        component_list[idx].id = int(component[1])
200        if component[3] == 1:
201            component_list[idx].original_size = os.path.getsize(file_path)
202        else:
203            component_list[idx].original_size = 0
204        component_list[idx].res_type = int(component[2])
205        component_list[idx].type = int(component[3])
206        component_list[idx].flags = IS_DEL
207
208        idx += 1
209    return component_list
210
211
212def get_head_list(component_count, head_value_list):
213    """
214    According to the header structure, get the list of HEAD headers.
215    :param component_count: number of components
216    :param head_value_list: list of header values
217    :return head_list: header list
218    """
219    head_list = PkgHeader()
220    if OPTIONS_MANAGER.signing_length != SIGNING_LENGTH_256:
221        # PKG_DIGEST_TYPE_SHA384   3,use sha384
222        head_list.digest_method = 3
223    else:
224        # PKG_DIGEST_TYPE_SHA256   2,use sha256
225        head_list.digest_method = 2
226    if OPTIONS_MANAGER.private_key == ON_SERVER:
227        head_list.sign_method = 0
228    else:
229        if OPTIONS_MANAGER.signing_algorithm == "ECC":
230            # signing algorithm use ECC
231            head_list.sign_method = SignMethod.ECC.value
232        else:
233            # signing algorithm use RSA
234            head_list.sign_method = SignMethod.RSA.value
235    head_list.pkg_type = 1
236    if OPTIONS_MANAGER.not_l2:
237        head_list.pkg_flags = 1
238    else:
239        head_list.pkg_flags = 0
240    head_list.entry_count = component_count
241    head_list.update_file_version = int(head_value_list[0])
242    head_list.product_update_id = head_value_list[1].encode("utf-8")
243    head_list.software_version = head_value_list[2].encode("utf-8")
244    head_list.date = head_value_list[3].encode("utf-8")
245    head_list.time = head_value_list[4].encode("utf-8")
246    head_list.describe_package_id = ctypes.c_char_p("update/info.bin".encode())
247    return head_list
248
249
250def get_tools_component_list(count, opera_script_dict):
251    """
252    Get the list of component information according to
253    the component information structure.
254    :param count: number of components
255    :param opera_script_dict: script file name and path dict
256    :return component_list: list of component information.
257                            If exception occurs, return False.
258    """
259    pkg_components = PkgComponent * count
260    component_list = pkg_components()
261    component_value_list = list(opera_script_dict.keys())
262    component_num = 0
263    for i, component in enumerate(component_value_list):
264        component_list[i].file_path = component.encode("utf-8")
265        component_list[i].component_addr = \
266            (opera_script_dict[component]).encode("utf-8")
267        component_num += 1
268    return component_list, component_num
269
270
271def get_tools_head_list(component_count):
272    """
273    According to the header structure, get the list of HEAD headers.
274    :param component_count: number of components
275    :return head_list: header list
276    """
277    head_list = PkgHeader()
278    head_list.digest_method = 0
279    head_list.sign_method = 0
280    head_list.pkg_type = 2
281    head_list.pkg_flags = 0
282    head_list.entry_count = component_count
283    return head_list
284
285
286def get_signing_from_server(package_path, hash_algorithm, hash_code=None):
287    """
288    Server update package signature requires the vendor to
289    implement its own service signature interface, as shown below:
290    ip = ""
291    user_name = ""
292    pass_word = ""
293    signe_jar = ""
294    signing_config = [signe_jar, ip, user_name, pass_word,
295                      hash_code, hash_algorithm]
296    cmd = ' '.join(signing_config)
297    subprocess.Popen(
298        cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
299    :param package_path: update package file path
300    :param hash_algorithm: hash algorithm
301    :param hash_code: hash code
302    :return:
303    """
304    UPDATE_LOGGER.print_log("Signing %s, hash algorithm is: %s, "
305                            "Signing hash code: %s" %
306                            (package_path, hash_algorithm, hash_code))
307    signing_content = ""
308    return signing_content.encode()
309
310
311def create_build_tools_zip():
312    """
313    Create the update package file.
314    :param lib: lib object
315    :return:
316    """
317    opera_script_file_name_dict = OPTIONS_MANAGER.opera_script_file_name_dict
318    tmp_dict = {}
319    for each in SCRIPT_KEY_LIST:
320        tmp_dict[each] = []
321    if opera_script_file_name_dict == tmp_dict:
322        UPDATE_LOGGER.print_log(
323            "Script dict is null!",
324            log_type=UPDATE_LOGGER.ERROR_LOG)
325        return False
326    count = 0
327    opera_script_dict = {}
328    for each_value in opera_script_file_name_dict.values():
329        for each in each_value:
330            opera_script_dict[each[1].name] = each[0]
331            count += 1
332    # other_file_count --> 1(updater_binary) + 1(loadScript.us)
333    other_file_count = 2
334    count += other_file_count
335    if OPTIONS_MANAGER.register_script_file_obj is not None:
336        count += 1
337    head_list = get_tools_head_list(count)
338    component_list, num = \
339        get_tools_component_list(count, opera_script_dict)
340    total_script_file_obj = OPTIONS_MANAGER.total_script_file_obj
341    register_script_file_obj = OPTIONS_MANAGER.register_script_file_obj
342    update_exe_path = os.path.join(OPTIONS_MANAGER.target_package_dir,
343                                   UPDATE_EXE_FILE_NAME)
344    if not os.path.exists(update_exe_path):
345        UPDATE_LOGGER.print_log(
346            "updater_binary file does not exist!path: %s" % update_exe_path,
347            log_type=UPDATE_LOGGER.ERROR_LOG)
348        return False
349
350    file_obj = tempfile.NamedTemporaryFile(
351        dir=OPTIONS_MANAGER.update_package, prefix="build_tools-")
352    files_to_sign = []
353    zip_file = zipfile.ZipFile(file_obj.name, 'w', zipfile.ZIP_DEFLATED)
354    # file name will be prefixed by build_tools in hash signed data
355    name_format_str = "build_tools/{}"
356    # add opera_script to build_tools.zip
357    for key, value in opera_script_dict.items():
358        zip_file.write(key, value)
359        files_to_sign += [(key, name_format_str.format(value))]
360
361    # add update_binary to build_tools.zip
362    zip_file.write(update_exe_path, UPDATE_EXE_FILE_NAME)
363    files_to_sign += [(update_exe_path, name_format_str.format(UPDATE_EXE_FILE_NAME))]
364
365    # add loadScript to build_tools.zip
366    zip_file.write(total_script_file_obj.name, TOTAL_SCRIPT_FILE_NAME)
367    files_to_sign += [(total_script_file_obj.name, name_format_str.format(TOTAL_SCRIPT_FILE_NAME))]
368    if OPTIONS_MANAGER.register_script_file_obj is not None:
369        zip_file.write(register_script_file_obj.name, REGISTER_SCRIPT_FILE_NAME)
370        files_to_sign += [(register_script_file_obj.name, name_format_str.format(REGISTER_SCRIPT_FILE_NAME))]
371
372    if create_hsd_for_build_tools(zip_file, files_to_sign) is False:
373        return False
374    return file_obj
375
376
377def do_zip_update_package():
378    zip_file = zipfile.ZipFile(OPTIONS_MANAGER.update_package_file_path,
379                               'w', zipfile.ZIP_DEFLATED, allowZip64=True)
380    # add update.bin to update package
381    zip_file.write(OPTIONS_MANAGER.update_bin_obj.name, "update.bin")
382    # add build_tools.zip to update package
383    zip_file.write(OPTIONS_MANAGER.build_tools_zip_obj.name, BUILD_TOOLS_FILE_NAME)
384
385    zip_file.write(OPTIONS_MANAGER.version_mbn_file_path, "version_list")
386    zip_file.write(OPTIONS_MANAGER.board_list_file_path, "board_list")
387
388    if OPTIONS_MANAGER.max_stash_size != 0:
389        max_stash_file_obj = tempfile.NamedTemporaryFile(mode="w+")
390        max_stash_file_obj.write(str(OPTIONS_MANAGER.max_stash_size))
391        max_stash_file_obj.flush()
392        zip_file.write(max_stash_file_obj.name, "all_max_stash")
393
394    for package_patch_zip in OPTIONS_MANAGER.incremental_block_file_obj_dict.values():
395        package_patch_zip.package_block_patch(zip_file)
396
397    for partition, patch_obj in OPTIONS_MANAGER.incremental_image_file_obj_dict.items():
398        zip_file.write(patch_obj.name, "%s.patch.dat" % partition)
399
400    zip_file.close()
401
402
403def create_hsd_for_build_tools(zip_file, files_to_sign):
404    """
405    generate hash signed data for build_tools.zip
406    """
407    generate_signed_data_ext = OPTIONS_MANAGER.init.invoke_event(GENERATE_SIGNED_DATA_EVENT)
408    signed_data = ""
409    # add hash signed data to build_tools.zip
410    if generate_signed_data_ext is False:
411        signed_data = generate_signed_data_default(files_to_sign)
412    else:
413        signed_data = generate_signed_data_ext(files_to_sign)
414    if signed_data == "":
415        UPDATE_LOGGER.print_log("generate_signed_data failed", log_type=UPDATE_LOGGER.ERROR_LOG)
416        zip_file.close()
417        return False
418    zip_file.writestr("hash_signed_data", signed_data)
419    zip_file.close()
420    return True
421
422
423def build_update_package(no_zip, update_package, prelude_script,
424                         verse_script, refrain_script, ending_script):
425    """
426    Create the update package file.
427    :param no_zip: no zip
428    :param update_package: update package path
429    :param prelude_script: prelude object
430    :param verse_script: verse object
431    :param refrain_script: refrain object
432    :param ending_script: ending object
433    :return: If exception occurs, return False.
434    """
435    update_bin_obj = create_update_bin()
436    if update_bin_obj:
437        OPTIONS_MANAGER.update_bin_obj = update_bin_obj
438    else:
439        return False
440
441    if OPTIONS_MANAGER.sd_card :
442        package_type = "sd"
443    elif OPTIONS_MANAGER.source_package :
444        package_type = "diff"
445    else :
446        package_type = "full"
447    if OPTIONS_MANAGER.not_l2:
448        update_file_name = ''.join(
449            ["updater_", OPTIONS_MANAGER.target_package_version.replace(" ", "_")])
450    else :
451        update_file_name = ''.join(
452            ["updater_", package_type])
453
454    if not no_zip:
455        update_package_path = os.path.join(
456            update_package, '%s_unsigned.zip' % update_file_name)
457        OPTIONS_MANAGER.update_package_file_path = update_package_path
458
459        create_script(prelude_script, verse_script,
460                      refrain_script, ending_script)
461
462        build_tools_zip_obj = create_build_tools_zip()
463        if build_tools_zip_obj is False:
464            UPDATE_LOGGER.print_log(
465                "Create build tools zip failed!",
466                log_type=UPDATE_LOGGER.ERROR_LOG)
467            return False
468        OPTIONS_MANAGER.build_tools_zip_obj = build_tools_zip_obj
469
470        do_zip_update_package()
471
472        signed_package = os.path.join(
473            update_package, "%s.zip" % update_file_name)
474        OPTIONS_MANAGER.signed_package = signed_package
475        if os.path.exists(signed_package):
476            os.remove(signed_package)
477
478        sign_ota_package = \
479            OPTIONS_MANAGER.init.invoke_event(SIGN_PACKAGE_EVENT)
480        if sign_ota_package:
481            sign_result = sign_ota_package()
482        else:
483            sign_result = sign_package()
484
485        if not sign_result:
486            UPDATE_LOGGER.print_log("Sign ota package fail", UPDATE_LOGGER.ERROR_LOG)
487            return False
488        if os.path.exists(update_package_path):
489            os.remove(update_package_path)
490    else:
491        update_package_path = os.path.join(
492            update_package, '%s.bin' % update_file_name)
493        if os.path.exists(update_package_path):
494            os.remove(update_package_path)
495        OPTIONS_MANAGER.update_package_file_path = update_package_path
496        with open(OPTIONS_MANAGER.update_bin_obj.name, 'rb') as r_f:
497            content = r_f.read()
498        with open(update_package_path, 'wb') as w_f:
499            w_f.write(content)
500    return True
501
502
503def get_hash_content(file_path, hash_algorithm):
504    """
505    Use SHA256SUM to get the hash value of the file.
506    :param file_path : file path
507    :param hash_algorithm: hash algorithm
508    :return hash_content: hash value
509    """
510    try:
511        cmd = [LINUX_HASH_ALGORITHM_DICT[hash_algorithm], file_path]
512    except KeyError:
513        UPDATE_LOGGER.print_log(
514            "Unsupported hash algorithm! %s" % hash_algorithm,
515            log_type=UPDATE_LOGGER.ERROR_LOG)
516        return False
517    if not os.path.exists(file_path):
518        UPDATE_LOGGER.print_log(
519            "%s failed!" % LINUX_HASH_ALGORITHM_DICT[hash_algorithm],
520            UPDATE_LOGGER.ERROR_LOG)
521        raise RuntimeError
522    process_obj = subprocess.Popen(
523        cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
524    process_obj.wait()
525    hash_content = \
526        process_obj.stdout.read().decode(encoding='gbk').split(' ')[0]
527    if len(hash_content) != HASH_CONTENT_LEN_DICT.get(hash_algorithm):
528        UPDATE_LOGGER.print_log(
529            "Get hash content failed! The length of the hash_content is 0!",
530            UPDATE_LOGGER.ERROR_LOG)
531        raise RuntimeError
532    if process_obj.returncode == 0:
533        UPDATE_LOGGER.print_log(
534            "Get hash content success! path: %s" % file_path)
535    return hash_content
536