• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2021 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16import binascii
17import copy
18import os
19import subprocess
20import tempfile
21import time
22import collections as collect
23import enum
24import ctypes
25import zipfile
26
27from log_exception import UPDATE_LOGGER
28from script_generator import create_script
29from utils import sign_package
30from utils import HASH_CONTENT_LEN_DICT
31from utils import OPTIONS_MANAGER
32from utils import REGISTER_SCRIPT_FILE_NAME
33from utils import ON_SERVER
34from utils import SCRIPT_KEY_LIST
35from utils import EXTEND_OPTIONAL_COMPONENT_LIST
36from utils import COMPONENT_INFO_INNIT
37from utils import UPDATE_EXE_FILE_NAME
38from utils import TOTAL_SCRIPT_FILE_NAME
39from utils import EXTEND_PATH_EVENT
40from utils import LINUX_HASH_ALGORITHM_DICT
41from utils import UPDATE_BIN_FILE_NAME
42from utils import BUILD_TOOLS_FILE_NAME
43from utils import SIGN_PACKAGE_EVENT
44from utils import CHECK_BINARY_EVENT
45from utils import ZIP_EVENT
46from utils import GENERATE_SIGNED_DATA_EVENT
47from utils import DECOUPLED_EVENT
48from utils import get_extend_path_list
49from create_update_package import CreatePackage
50from create_update_package import SIGN_ALGO_RSA
51from create_update_package import SIGN_ALGO_PSS
52from create_signed_data import generate_signed_data_default
53
54IS_DEL = 0
55SIGNING_LENGTH_256 = 256
56DIGEST_LEN = 32
57HASH_VALUE_MAX_LEN = 128
58
59
60class SignMethod(enum.Enum):
61    RSA = 1
62    ECC = 2
63
64
65class PkgHeader(ctypes.Structure):
66    _fields_ = [("digest_method", ctypes.c_ubyte),
67                ("sign_method", ctypes.c_ubyte),
68                ("pkg_type", ctypes.c_ubyte),
69                ("pkg_flags", ctypes.c_ubyte),
70                ("entry_count", ctypes.c_int),
71                ("update_file_version", ctypes.c_int),
72                ("product_update_id", ctypes.c_char_p),
73                ("software_version", ctypes.c_char_p),
74                ("date", ctypes.c_char_p),
75                ("time", ctypes.c_char_p),
76                ("describe_package_id", ctypes.c_char_p)]
77
78
79class PkgComponent(ctypes.Structure):
80    _fields_ = [("digest", ctypes.c_ubyte * DIGEST_LEN),
81                ("file_path", ctypes.c_char_p),
82                ("component_addr", ctypes.c_char_p),
83                ("version", ctypes.c_char_p),
84                ("size", ctypes.c_int),
85                ("id", ctypes.c_int),
86                ("original_size", ctypes.c_int),
87                ("res_type", ctypes.c_ubyte),
88                ("type", ctypes.c_ubyte),
89                ("flags", ctypes.c_ubyte)]
90
91
92class SignInfo(ctypes.Structure):
93    _fields_ = [("sign_offset", ctypes.c_int),
94                ("hash_len", ctypes.c_int),
95                ("hash_code", ctypes.c_ubyte * (HASH_VALUE_MAX_LEN + 1))]
96
97
98def create_update_bin():
99    """
100    Call the interface to generate the update.bin file.
101    :return update_bin_obj: Update file object.
102                            If exception occurs, return False.
103    """
104    update_bin_obj = tempfile.NamedTemporaryFile(dir=OPTIONS_MANAGER.update_package, prefix="update_bin-")
105
106    head_value_list = OPTIONS_MANAGER.head_info_list
107    component_dict = OPTIONS_MANAGER.component_info_dict
108    full_image_file_obj_list = OPTIONS_MANAGER.full_image_file_obj_list
109    full_img_list = OPTIONS_MANAGER.full_img_list
110
111    extend_component_list = get_extend_path_list()
112    if not OPTIONS_MANAGER.not_l2:
113        if OPTIONS_MANAGER.partition_file_obj is not None:
114            all_image_name = \
115                extend_component_list + EXTEND_OPTIONAL_COMPONENT_LIST + full_img_list
116        else:
117            all_image_name = extend_component_list + full_img_list
118    else:
119        all_image_name = full_img_list
120    sort_component_dict = collect.OrderedDict()
121    for each_image_name in all_image_name:
122        sort_component_dict[each_image_name] = component_dict.get(each_image_name)
123    component_dict = copy.deepcopy(sort_component_dict)
124    head_list = get_head_list(len(component_dict), head_value_list)
125
126    component_list = get_component_list(
127        full_image_file_obj_list, component_dict)
128
129    save_patch = update_bin_obj.name.encode("utf-8")
130    if OPTIONS_MANAGER.private_key == ON_SERVER:
131        private_key = "./update_package.py"
132    else:
133        private_key = OPTIONS_MANAGER.private_key.encode("utf-8")
134
135    if OPTIONS_MANAGER.not_l2:
136        sign_algo = SIGN_ALGO_PSS
137    else:
138        sign_algo = SIGN_ALGO_RSA
139
140    # create bin
141    package = CreatePackage(head_list, component_list, save_patch, OPTIONS_MANAGER.private_key)
142    if not package.create_package():
143        UPDATE_LOGGER.print_log("Create update package .bin failed!", UPDATE_LOGGER.ERROR_LOG)
144        return False
145
146    UPDATE_LOGGER.print_log("Create update package .bin complete! path: %s" % update_bin_obj.name)
147    return update_bin_obj
148
149
150def get_component_list(all_image_file_obj_list, component_dict):
151    """
152    Get the list of component information according to
153    the component information structure.
154    :param all_image_file_obj_list: all image object file list
155    :param component_dict: Component information content dict
156    :return component_list: List of component information.
157                            If exception occurs, return False.
158    """
159    pkg_components = PkgComponent * len(component_dict)
160    component_list = pkg_components()
161    extend_list = get_extend_path_list()
162    if not OPTIONS_MANAGER.not_l2:
163        if OPTIONS_MANAGER.partition_file_obj is not None:
164            extend_component_list = extend_list + EXTEND_OPTIONAL_COMPONENT_LIST
165            extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path,
166                                OPTIONS_MANAGER.board_list_file_path,
167                                OPTIONS_MANAGER.partition_file_obj.name]
168        else:
169            extend_component_list = extend_list
170            extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path,
171                                OPTIONS_MANAGER.board_list_file_path]
172    else:
173        extend_component_list = []
174        extend_path_list = []
175    get_path_list = OPTIONS_MANAGER.init.invoke_event(EXTEND_PATH_EVENT)
176    if get_path_list:
177        extend_path_list = get_path_list()
178    idx = 0
179    for key, component in component_dict.items():
180        if idx < len(extend_component_list):
181            file_path = extend_path_list[idx]
182        else:
183            file_path = all_image_file_obj_list[idx - len(extend_component_list)].name
184        digest = get_hash_content(file_path, OPTIONS_MANAGER.hash_algorithm)
185        if not digest:
186            return
187        if component is None:
188            component = copy.copy(COMPONENT_INFO_INNIT)
189            component[0] = key
190        component_list[idx].digest = (ctypes.c_ubyte * 32).from_buffer_copy(
191            binascii.a2b_hex(digest.encode('utf-8')))
192        component_list[idx].file_path = file_path.encode("utf-8")
193        if not OPTIONS_MANAGER.not_l2:
194            component_list[idx].component_addr = ('/%s' % component[0]).encode("utf-8")
195        else:
196            component_list[idx].component_addr = ('%s' % component[0]).encode("utf-8")
197        component_list[idx].version = component[4].encode("utf-8")
198        component_list[idx].size = os.path.getsize(file_path)
199        component_list[idx].id = int(component[1])
200        if component[3] == 1:
201            component_list[idx].original_size = os.path.getsize(file_path)
202        else:
203            component_list[idx].original_size = 0
204        component_list[idx].res_type = int(component[2])
205        component_list[idx].type = int(component[3])
206        component_list[idx].flags = IS_DEL
207
208        idx += 1
209    return component_list
210
211
212def get_head_list(component_count, head_value_list):
213    """
214    According to the header structure, get the list of HEAD headers.
215    :param component_count: number of components
216    :param head_value_list: list of header values
217    :return head_list: header list
218    """
219    head_list = PkgHeader()
220    if OPTIONS_MANAGER.signing_length != SIGNING_LENGTH_256:
221        # PKG_DIGEST_TYPE_SHA384   3,use sha384
222        head_list.digest_method = 3
223    else:
224        # PKG_DIGEST_TYPE_SHA256   2,use sha256
225        head_list.digest_method = 2
226    if OPTIONS_MANAGER.private_key == ON_SERVER:
227        head_list.sign_method = 0
228    else:
229        if OPTIONS_MANAGER.signing_algorithm == "ECC":
230            # signing algorithm use ECC
231            head_list.sign_method = SignMethod.ECC.value
232        else:
233            # signing algorithm use RSA
234            head_list.sign_method = SignMethod.RSA.value
235    head_list.pkg_type = 1
236    if OPTIONS_MANAGER.not_l2:
237        head_list.pkg_flags = 1
238    else:
239        head_list.pkg_flags = 0
240    head_list.entry_count = component_count
241    head_list.update_file_version = int(head_value_list[0])
242    head_list.product_update_id = head_value_list[1].encode("utf-8")
243    head_list.software_version = head_value_list[2].encode("utf-8")
244    head_list.date = head_value_list[3].encode("utf-8")
245    head_list.time = head_value_list[4].encode("utf-8")
246    head_list.describe_package_id = ctypes.c_char_p("update/info.bin".encode())
247    return head_list
248
249
250def get_tools_component_list(count, opera_script_dict):
251    """
252    Get the list of component information according to
253    the component information structure.
254    :param count: number of components
255    :param opera_script_dict: script file name and path dict
256    :return component_list: list of component information.
257                            If exception occurs, return False.
258    """
259    pkg_components = PkgComponent * count
260    component_list = pkg_components()
261    component_value_list = list(opera_script_dict.keys())
262    component_num = 0
263    for i, component in enumerate(component_value_list):
264        component_list[i].file_path = component.encode("utf-8")
265        component_list[i].component_addr = \
266            (opera_script_dict[component]).encode("utf-8")
267        component_num += 1
268    return component_list, component_num
269
270
271def get_tools_head_list(component_count):
272    """
273    According to the header structure, get the list of HEAD headers.
274    :param component_count: number of components
275    :return head_list: header list
276    """
277    head_list = PkgHeader()
278    head_list.digest_method = 0
279    head_list.sign_method = 0
280    head_list.pkg_type = 2
281    head_list.pkg_flags = 0
282    head_list.entry_count = component_count
283    return head_list
284
285
286def get_signing_from_server(package_path, hash_algorithm, hash_code=None):
287    """
288    Server update package signature requires the vendor to
289    implement its own service signature interface, as shown below:
290    ip = ""
291    user_name = ""
292    pass_word = ""
293    signe_jar = ""
294    signing_config = [signe_jar, ip, user_name, pass_word,
295                      hash_code, hash_algorithm]
296    cmd = ' '.join(signing_config)
297    subprocess.Popen(
298        cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
299    :param package_path: update package file path
300    :param hash_algorithm: hash algorithm
301    :param hash_code: hash code
302    :return:
303    """
304    UPDATE_LOGGER.print_log("Signing %s, hash algorithm is: %s, "
305                            "Signing hash code: %s" %
306                            (package_path, hash_algorithm, hash_code))
307    signing_content = ""
308    return signing_content.encode()
309
310
311def create_build_tools_zip():
312    """
313    Create the update package file.
314    :param lib: lib object
315    :return:
316    """
317    opera_script_file_name_dict = OPTIONS_MANAGER.opera_script_file_name_dict
318    tmp_dict = {}
319    for each in SCRIPT_KEY_LIST:
320        tmp_dict[each] = []
321    if opera_script_file_name_dict == tmp_dict:
322        UPDATE_LOGGER.print_log(
323            "Script dict is null!",
324            log_type=UPDATE_LOGGER.ERROR_LOG)
325        return False
326    count = 0
327    opera_script_dict = {}
328    for each_value in opera_script_file_name_dict.values():
329        for each in each_value:
330            opera_script_dict[each[1].name] = each[0]
331            count += 1
332    # other_file_count --> 1(updater_binary) + 1(loadScript.us)
333    other_file_count = 2
334    count += other_file_count
335    if OPTIONS_MANAGER.register_script_file_obj is not None:
336        count += 1
337    head_list = get_tools_head_list(count)
338    component_list, num = get_tools_component_list(count, opera_script_dict)
339    total_script_file_obj = OPTIONS_MANAGER.total_script_file_obj
340    register_script_file_obj = OPTIONS_MANAGER.register_script_file_obj
341    update_exe_path = os.path.join(OPTIONS_MANAGER.target_package_dir, UPDATE_EXE_FILE_NAME)
342
343    file_obj = tempfile.NamedTemporaryFile(dir=OPTIONS_MANAGER.update_package, prefix="build_tools-")
344    files_to_sign = []
345    zip_file = zipfile.ZipFile(file_obj.name, 'w', zipfile.ZIP_DEFLATED)
346    # file name will be prefixed by build_tools in hash signed data
347    name_format_str = "build_tools/{}"
348    # add opera_script to build_tools.zip
349    for key, value in opera_script_dict.items():
350        zip_file.write(key, value)
351        files_to_sign += [(key, name_format_str.format(value))]
352    binary_check = OPTIONS_MANAGER.init.invoke_event(CHECK_BINARY_EVENT)
353    if callable(binary_check) is False or (callable(binary_check) and binary_check() is False):
354        if not os.path.exists(update_exe_path):
355            UPDATE_LOGGER.print_log("updater_binary file does not exist!path: %s" % update_exe_path,
356                log_type=UPDATE_LOGGER.ERROR_LOG)
357            return False
358        # add update_binary to build_tools.zip
359        zip_file.write(update_exe_path, UPDATE_EXE_FILE_NAME)
360        files_to_sign += [(update_exe_path, name_format_str.format(UPDATE_EXE_FILE_NAME))]
361
362    # add loadScript to build_tools.zip
363    zip_file.write(total_script_file_obj.name, TOTAL_SCRIPT_FILE_NAME)
364    files_to_sign += [(total_script_file_obj.name, name_format_str.format(TOTAL_SCRIPT_FILE_NAME))]
365    if OPTIONS_MANAGER.register_script_file_obj is not None:
366        zip_file.write(register_script_file_obj.name, REGISTER_SCRIPT_FILE_NAME)
367        files_to_sign += [(register_script_file_obj.name, name_format_str.format(REGISTER_SCRIPT_FILE_NAME))]
368
369    if create_hsd_for_build_tools(zip_file, files_to_sign) is False:
370        zip_file.close()
371        return False
372    zip_file.close()
373    return file_obj
374
375
376def do_sign_package(update_package, update_file_name):
377    signed_package = os.path.join(
378            update_package, "%s.zip" % update_file_name)
379    OPTIONS_MANAGER.signed_package = signed_package
380    if os.path.exists(signed_package):
381        os.remove(signed_package)
382
383    sign_ota_package = \
384        OPTIONS_MANAGER.init.invoke_event(SIGN_PACKAGE_EVENT)
385    if sign_ota_package:
386        return sign_ota_package()
387    else:
388        return sign_package()
389
390
391def get_update_file_name():
392    if OPTIONS_MANAGER.sd_card :
393        package_type = "sd"
394    elif OPTIONS_MANAGER.source_package :
395        package_type = "diff"
396    else :
397        package_type = "full"
398    if OPTIONS_MANAGER.not_l2:
399        update_file_name = ''.join(
400            ["updater_", OPTIONS_MANAGER.target_package_version.replace(" ", "_")])
401    else :
402        update_file_name = ''.join(
403            ["updater_", package_type])
404    return update_file_name
405
406
407def do_zip_update_package():
408    zip_file = zipfile.ZipFile(OPTIONS_MANAGER.update_package_file_path,
409                               'w', zipfile.ZIP_DEFLATED, allowZip64=True)
410    # add files to update package
411    do_add_files = OPTIONS_MANAGER.init.invoke_event(ZIP_EVENT)
412    if callable(do_add_files) and do_add_files(zip_file) is False:
413        UPDATE_LOGGER.print_log("add files fail", UPDATE_LOGGER.ERROR_LOG)
414        zip_file.close()
415        return False
416    # add update.bin to update package
417    zip_file.write(OPTIONS_MANAGER.update_bin_obj.name, "update.bin")
418    # add build_tools.zip to update package
419    zip_file.write(OPTIONS_MANAGER.build_tools_zip_obj.name, BUILD_TOOLS_FILE_NAME)
420
421    zip_file.write(OPTIONS_MANAGER.board_list_file_path, "board_list")
422    decouple_res = OPTIONS_MANAGER.init.invoke_event(DECOUPLED_EVENT)
423    if decouple_res is False:
424        zip_file.write(OPTIONS_MANAGER.version_mbn_file_path, "version_list")
425
426    if OPTIONS_MANAGER.max_stash_size != 0:
427        max_stash_file_obj = tempfile.NamedTemporaryFile(mode="w+")
428        max_stash_file_obj.write(str(OPTIONS_MANAGER.max_stash_size))
429        max_stash_file_obj.flush()
430        zip_file.write(max_stash_file_obj.name, "all_max_stash")
431
432    for package_patch_zip in OPTIONS_MANAGER.incremental_block_file_obj_dict.values():
433        package_patch_zip.package_block_patch(zip_file)
434
435    for partition, patch_obj in OPTIONS_MANAGER.incremental_image_file_obj_dict.items():
436        zip_file.write(patch_obj.name, "%s.patch.dat" % partition)
437
438    zip_file.close()
439    return True
440
441
442def create_hsd_for_build_tools(zip_file, files_to_sign):
443    """
444    generate hash signed data for build_tools.zip
445    """
446    generate_signed_data_ext = OPTIONS_MANAGER.init.invoke_event(GENERATE_SIGNED_DATA_EVENT)
447    signed_data = ""
448    # add hash signed data to build_tools.zip
449    if generate_signed_data_ext is False:
450        signed_data = generate_signed_data_default(files_to_sign)
451    else:
452        signed_data = generate_signed_data_ext(files_to_sign)
453    if signed_data == "":
454        UPDATE_LOGGER.print_log("generate_signed_data failed", log_type=UPDATE_LOGGER.ERROR_LOG)
455        zip_file.close()
456        return False
457    zip_file.writestr("hash_signed_data", signed_data)
458    return True
459
460
461def build_update_package(no_zip, update_package, prelude_script,
462                         verse_script, refrain_script, ending_script):
463    """
464    Create the update package file.
465    :param no_zip: no zip
466    :param update_package: update package path
467    :param prelude_script: prelude object
468    :param verse_script: verse object
469    :param refrain_script: refrain object
470    :param ending_script: ending object
471    :return: If exception occurs, return False.
472    """
473    update_bin_obj = create_update_bin()
474    if update_bin_obj:
475        OPTIONS_MANAGER.update_bin_obj = update_bin_obj
476    else:
477        return False
478
479    update_file_name = get_update_file_name()
480
481    if not no_zip:
482        update_package_path = os.path.join(
483            update_package, '%s_unsigned.zip' % update_file_name)
484        OPTIONS_MANAGER.update_package_file_path = update_package_path
485
486        create_script(prelude_script, verse_script,
487                      refrain_script, ending_script)
488
489        build_tools_zip_obj = create_build_tools_zip()
490        if build_tools_zip_obj is False:
491            UPDATE_LOGGER.print_log(
492                "Create build tools zip failed!",
493                log_type=UPDATE_LOGGER.ERROR_LOG)
494            return False
495        OPTIONS_MANAGER.build_tools_zip_obj = build_tools_zip_obj
496
497        if not do_zip_update_package():
498            UPDATE_LOGGER.print_log("Zip update package fail", UPDATE_LOGGER.ERROR_LOG)
499            return False
500
501        sign_result = do_sign_package(update_package, update_file_name)
502
503        if not sign_result:
504            UPDATE_LOGGER.print_log("Sign ota package fail", UPDATE_LOGGER.ERROR_LOG)
505            return False
506        if os.path.exists(update_package_path):
507            os.remove(update_package_path)
508    else:
509        update_package_path = os.path.join(
510            update_package, '%s.bin' % update_file_name)
511        if os.path.exists(update_package_path):
512            os.remove(update_package_path)
513        OPTIONS_MANAGER.update_package_file_path = update_package_path
514        with open(OPTIONS_MANAGER.update_bin_obj.name, 'rb') as r_f:
515            content = r_f.read()
516        with open(update_package_path, 'wb') as w_f:
517            w_f.write(content)
518    return True
519
520
521def get_hash_content(file_path, hash_algorithm):
522    """
523    Use SHA256SUM to get the hash value of the file.
524    :param file_path : file path
525    :param hash_algorithm: hash algorithm
526    :return hash_content: hash value
527    """
528    try:
529        cmd = [LINUX_HASH_ALGORITHM_DICT[hash_algorithm], file_path]
530    except KeyError:
531        UPDATE_LOGGER.print_log(
532            "Unsupported hash algorithm! %s" % hash_algorithm,
533            log_type=UPDATE_LOGGER.ERROR_LOG)
534        return False
535    if not os.path.exists(file_path):
536        UPDATE_LOGGER.print_log(
537            "%s failed!" % LINUX_HASH_ALGORITHM_DICT[hash_algorithm],
538            UPDATE_LOGGER.ERROR_LOG)
539        raise RuntimeError
540    process_obj = subprocess.Popen(
541        cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
542    process_obj.wait()
543    hash_content = \
544        process_obj.stdout.read().decode(encoding='gbk').split(' ')[0]
545    if len(hash_content) != HASH_CONTENT_LEN_DICT.get(hash_algorithm):
546        UPDATE_LOGGER.print_log(
547            "Get hash content failed! The length of the hash_content is 0!",
548            UPDATE_LOGGER.ERROR_LOG)
549        raise RuntimeError
550    if process_obj.returncode == 0:
551        UPDATE_LOGGER.print_log(
552            "Get hash content success! path: %s" % file_path)
553    return hash_content
554