• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2021 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16import binascii
17import copy
18import os
19import subprocess
20import tempfile
21import time
22import collections as collect
23import enum
24import ctypes
25
26from log_exception import UPDATE_LOGGER
27from script_generator import create_script
28from utils import HASH_CONTENT_LEN_DICT
29from utils import OPTIONS_MANAGER
30from utils import REGISTER_SCRIPT_FILE_NAME
31from utils import ON_SERVER
32from utils import SCRIPT_KEY_LIST
33from utils import EXTEND_OPTIONAL_COMPONENT_LIST
34from utils import COMPONENT_INFO_INNIT
35from utils import UPDATE_EXE_FILE_NAME
36from utils import TOTAL_SCRIPT_FILE_NAME
37from utils import EXTEND_COMPONENT_LIST
38from utils import LINUX_HASH_ALGORITHM_DICT
39from utils import BUILD_TOOLS_FILE_NAME
40from utils import get_lib_api
41
42IS_DEL = 0
43SIGNING_LENGTH_256 = 256
44DIGEST_LEN = 32
45HASH_VALUE_MAX_LEN = 128
46
47
48class SignMethod(enum.Enum):
49    RSA = 1
50    ECC = 2
51
52
53class PkgHeader(ctypes.Structure):
54    _fields_ = [("digest_method", ctypes.c_ubyte),
55                ("sign_method", ctypes.c_ubyte),
56                ("pkg_type", ctypes.c_ubyte),
57                ("pkg_flags", ctypes.c_ubyte),
58                ("entry_count", ctypes.c_int),
59                ("update_file_version", ctypes.c_int),
60                ("product_update_id", ctypes.c_char_p),
61                ("software_version", ctypes.c_char_p),
62                ("date", ctypes.c_char_p),
63                ("time", ctypes.c_char_p),
64                ("describe_package_id", ctypes.c_char_p)]
65
66
67class PkgComponent(ctypes.Structure):
68    _fields_ = [("digest", ctypes.c_ubyte * DIGEST_LEN),
69                ("file_path", ctypes.c_char_p),
70                ("component_addr", ctypes.c_char_p),
71                ("version", ctypes.c_char_p),
72                ("size", ctypes.c_int),
73                ("id", ctypes.c_int),
74                ("original_size", ctypes.c_int),
75                ("res_type", ctypes.c_ubyte),
76                ("type", ctypes.c_ubyte),
77                ("flags", ctypes.c_ubyte)]
78
79
80class SignInfo(ctypes.Structure):
81    _fields_ = [("sign_offset", ctypes.c_int),
82                ("hash_len", ctypes.c_int),
83                ("hash_code", ctypes.c_ubyte * (HASH_VALUE_MAX_LEN + 1))]
84
85
86def create_update_bin():
87    """
88    Call the interface to generate the update.bin file.
89    :return update_bin_obj: Update file object.
90                            If exception occurs, return False.
91    """
92    update_bin_obj = tempfile.NamedTemporaryFile(prefix="update_bin-")
93
94    head_value_list = OPTIONS_MANAGER.head_info_list
95    component_dict = OPTIONS_MANAGER.component_info_dict
96    full_image_file_obj_list = OPTIONS_MANAGER.full_image_file_obj_list
97    full_img_list = OPTIONS_MANAGER.full_img_list
98    incremental_img_list = OPTIONS_MANAGER.incremental_img_list
99    incremental_image_file_obj_list = \
100        OPTIONS_MANAGER.incremental_image_file_obj_list
101
102    all_image_file_obj_list = \
103        incremental_image_file_obj_list + full_image_file_obj_list
104    if not OPTIONS_MANAGER.not_l2:
105        if OPTIONS_MANAGER.partition_file_obj is not None:
106            all_image_name = \
107                EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST + \
108                incremental_img_list + full_img_list
109        else:
110            all_image_name = \
111                EXTEND_COMPONENT_LIST + incremental_img_list + full_img_list
112    else:
113        all_image_name = \
114            incremental_img_list + full_img_list
115    sort_component_dict = collect.OrderedDict()
116    for each_image_name in all_image_name:
117        sort_component_dict[each_image_name] = \
118            component_dict.get(each_image_name)
119    component_dict = copy.deepcopy(sort_component_dict)
120    head_list = get_head_list(len(component_dict), head_value_list)
121
122    component_list = get_component_list(
123        all_image_file_obj_list, component_dict)
124
125    save_patch = update_bin_obj.name.encode("utf-8")
126    sign_info = SignInfo()
127    if OPTIONS_MANAGER.private_key == ON_SERVER:
128        private_key = "./update_package.py"
129    else:
130        private_key = OPTIONS_MANAGER.private_key.encode("utf-8")
131    lib = get_lib_api()
132    lib_l1 = get_lib_api(is_l2=False)
133    if OPTIONS_MANAGER.not_l2:
134        lib_l1.CreatePackageWithSignInfo(
135            ctypes.pointer(head_list), component_list, save_patch,
136            private_key, ctypes.pointer(sign_info))
137
138        offset = sign_info.sign_offset
139        hash_code = bytes(sign_info.hash_code).decode('ascii')
140    else:
141        lib.CreatePackage(
142            ctypes.pointer(head_list), component_list, save_patch,
143            OPTIONS_MANAGER.private_key.encode("utf-8"))
144        offset = 0
145        hash_code = b""
146
147    if OPTIONS_MANAGER.private_key == ON_SERVER:
148        signing_package(update_bin_obj.name,
149                        OPTIONS_MANAGER.hash_algorithm, hash_code=hash_code,
150                        position=offset)
151
152    UPDATE_LOGGER.print_log(".bin package signing success!")
153    UPDATE_LOGGER.print_log(
154        "Create update package .bin complete! path: %s" % update_bin_obj.name)
155    return update_bin_obj, lib
156
157
158def get_component_list(all_image_file_obj_list, component_dict):
159    """
160    Get the list of component information according to
161    the component information structure.
162    :param all_image_file_obj_list: all image object file list
163    :param component_dict: Component information content dict
164    :return component_list: List of component information.
165                            If exception occurs, return False.
166    """
167    pkg_components = PkgComponent * len(component_dict)
168    component_list = pkg_components()
169    if not OPTIONS_MANAGER.not_l2:
170        if OPTIONS_MANAGER.partition_file_obj is not None:
171            extend_component_list = \
172                EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST
173            extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path,
174                                OPTIONS_MANAGER.board_list_file_path,
175                                OPTIONS_MANAGER.partition_file_obj.name]
176        else:
177            extend_component_list = EXTEND_COMPONENT_LIST
178            extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path,
179                                OPTIONS_MANAGER.board_list_file_path]
180    else:
181        extend_component_list = []
182        extend_path_list = []
183    idx = 0
184    for key, component in component_dict.items():
185        if idx < len(extend_component_list):
186            file_path = extend_path_list[idx]
187        else:
188            file_path = \
189                all_image_file_obj_list[idx - len(extend_component_list)].name
190        digest = get_hash_content(file_path, OPTIONS_MANAGER.hash_algorithm)
191        if digest is None:
192            return
193        if component is None:
194            component = copy.copy(COMPONENT_INFO_INNIT)
195            component[0] = key
196        component_list[idx].digest = (ctypes.c_ubyte * 32).from_buffer_copy(
197            binascii.a2b_hex(digest.encode('utf-8')))
198        component_list[idx].file_path = file_path.encode("utf-8")
199        if not OPTIONS_MANAGER.not_l2:
200            component_list[idx].component_addr = \
201                ('/%s' % component[0]).encode("utf-8")
202        else:
203            component_list[idx].component_addr = \
204                ('%s' % component[0]).encode("utf-8")
205        component_list[idx].version = component[4].encode("utf-8")
206        component_list[idx].size = os.path.getsize(file_path)
207        component_list[idx].id = int(component[1])
208        if component[3] == 1:
209            component_list[idx].original_size = os.path.getsize(file_path)
210        else:
211            component_list[idx].original_size = 0
212        component_list[idx].res_type = int(component[2])
213        component_list[idx].type = int(component[3])
214        component_list[idx].flags = IS_DEL
215
216        idx += 1
217    return component_list
218
219
220def get_head_list(component_count, head_value_list):
221    """
222    According to the header structure, get the list of HEAD headers.
223    :param component_count: number of components
224    :param head_value_list: list of header values
225    :return head_list: header list
226    """
227    head_list = PkgHeader()
228    if OPTIONS_MANAGER.signing_length != SIGNING_LENGTH_256:
229        # PKG_DIGEST_TYPE_SHA384   3,use sha384
230        head_list.digest_method = 3
231    else:
232        # PKG_DIGEST_TYPE_SHA256   2,use sha256
233        head_list.digest_method = 2
234    if OPTIONS_MANAGER.private_key == ON_SERVER:
235        head_list.sign_method = 0
236    else:
237        if OPTIONS_MANAGER.signing_algorithm == "ECC":
238            # signing algorithm use ECC
239            head_list.sign_method = SignMethod.ECC.value
240        else:
241            # signing algorithm use RSA
242            head_list.sign_method = SignMethod.RSA.value
243    head_list.pkg_type = 1
244    if OPTIONS_MANAGER.not_l2:
245        head_list.pkg_flags = 1
246    else:
247        head_list.pkg_flags = 0
248    head_list.entry_count = component_count
249    head_list.update_file_version = int(head_value_list[0])
250    head_list.product_update_id = head_value_list[1].encode("utf-8")
251    head_list.software_version = head_value_list[2].encode("utf-8")
252    head_list.date = head_value_list[3].encode("utf-8")
253    head_list.time = head_value_list[4].encode("utf-8")
254    head_list.describe_package_id = ctypes.c_char_p("update/info.bin".encode())
255    return head_list
256
257
258def get_tools_component_list(count, opera_script_dict):
259    """
260    Get the list of component information according to
261    the component information structure.
262    :param count: number of components
263    :param opera_script_dict: script file name and path dict
264    :return component_list: list of component information.
265                            If exception occurs, return False.
266    """
267    pkg_components = PkgComponent * count
268    component_list = pkg_components()
269    component_value_list = list(opera_script_dict.keys())
270    component_num = 0
271    for i, component in enumerate(component_value_list):
272        component_list[i].file_path = component.encode("utf-8")
273        component_list[i].component_addr = \
274            (opera_script_dict[component]).encode("utf-8")
275        component_num += 1
276    return component_list, component_num
277
278
279def get_tools_head_list(component_count):
280    """
281    According to the header structure, get the list of HEAD headers.
282    :param component_count: number of components
283    :return head_list: header list
284    """
285    head_list = PkgHeader()
286    head_list.digest_method = 0
287    head_list.sign_method = 0
288    head_list.pkg_type = 2
289    head_list.pkg_flags = 0
290    head_list.entry_count = component_count
291    return head_list
292
293
294def get_signing_from_server(package_path, hash_algorithm, hash_code=None):
295    """
296    Server update package signature requires the vendor to
297    implement its own service signature interface, as shown below:
298    ip = ""
299    user_name = ""
300    pass_word = ""
301    signe_jar = ""
302    signing_config = [signe_jar, ip, user_name, pass_word,
303                      hash_code, hash_algorithm]
304    cmd = ' '.join(signing_config)
305    subprocess.Popen(
306        cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
307    :param package_path: update package file path
308    :param hash_algorithm: hash algorithm
309    :param hash_code: hash code
310    :return:
311    """
312    UPDATE_LOGGER.print_log("Signing %s, hash algorithm is: %s, "
313                            "Signing hash code: %s" %
314                            (package_path, hash_algorithm, hash_code))
315    signing_content = ""
316    return signing_content.encode()
317
318
319def signing_package(package_path, hash_algorithm, hash_code=None,
320                    position=0, package_type='.bin'):
321    """
322    Update package signature.
323    :param package_path: update package file path
324    :param hash_algorithm: hash algorithm
325    :param position: signature write location
326    :param hash_code: hash code
327    :param package_type: the type of package,.bin/.zip
328    :return:
329    """
330    try:
331        signing_content = get_signing_from_server(
332            package_path, hash_algorithm, hash_code)
333        if position != 0:
334            with open(package_path, mode='rb+') as f_r:
335                f_r.seek(position)
336                f_r.write(signing_content)
337        else:
338            with open(package_path, mode='ab') as f_w:
339                f_w.write(signing_content)
340        return True
341    except (OSError, TypeError):
342        UPDATE_LOGGER.print_log("%s package signing failed!" % package_type)
343        raise OSError
344
345
346def create_build_tools_zip(lib):
347    """
348    Create the update package file.
349    :param lib: lib object
350    :return:
351    """
352    opera_script_file_name_dict = OPTIONS_MANAGER.opera_script_file_name_dict
353    tmp_dict = {}
354    for each in SCRIPT_KEY_LIST:
355        tmp_dict[each] = []
356    if opera_script_file_name_dict == tmp_dict:
357        UPDATE_LOGGER.print_log(
358            "Script dict is null!",
359            log_type=UPDATE_LOGGER.ERROR_LOG)
360        return False
361    count = 0
362    opera_script_dict = {}
363    for each_value in opera_script_file_name_dict.values():
364        for each in each_value:
365            opera_script_dict[each[1].name] = each[0]
366            count += 1
367    # other_file_count --> 1(updater_binary) + 1(loadScript.us)
368    other_file_count = 2
369    count += other_file_count
370    if OPTIONS_MANAGER.register_script_file_obj is not None:
371        count += 1
372    head_list = get_tools_head_list(count)
373    component_list, num = \
374        get_tools_component_list(count, opera_script_dict)
375    total_script_file_obj = OPTIONS_MANAGER.total_script_file_obj
376    register_script_file_obj = OPTIONS_MANAGER.register_script_file_obj
377    update_exe_path = os.path.join(OPTIONS_MANAGER.target_package_dir,
378                                   UPDATE_EXE_FILE_NAME)
379    if not os.path.exists(update_exe_path):
380        UPDATE_LOGGER.print_log(
381            "updater_binary file does not exist!path: %s" % update_exe_path,
382            log_type=UPDATE_LOGGER.ERROR_LOG)
383        return False
384    file_obj = tempfile.NamedTemporaryFile(prefix="build_tools-")
385    file_save_patch = file_obj.name.encode("utf-8")
386    component_list[num].file_path = update_exe_path.encode("utf-8")
387    component_list[num].component_addr = \
388        UPDATE_EXE_FILE_NAME.encode("utf-8")
389    component_list[num + 1].file_path = \
390        total_script_file_obj.name.encode("utf-8")
391    component_list[num + 1].component_addr = \
392        TOTAL_SCRIPT_FILE_NAME.encode("utf-8")
393
394    if OPTIONS_MANAGER.register_script_file_obj is not None:
395        component_list[num + 2].file_path = \
396            register_script_file_obj.name.encode("utf-8")
397        component_list[num + 2].component_addr = \
398            REGISTER_SCRIPT_FILE_NAME.encode("utf-8")
399
400    if OPTIONS_MANAGER.private_key == ON_SERVER:
401        private_key = "./update_package.py"
402    else:
403        private_key = OPTIONS_MANAGER.private_key.encode("utf-8")
404
405    lib.CreatePackage(
406        ctypes.pointer(head_list), component_list, file_save_patch,
407        private_key)
408    return file_obj
409
410
411def build_update_package(no_zip, update_package, prelude_script,
412                         verse_script, refrain_script, ending_script):
413    """
414    Create the update package file.
415    :param no_zip: no zip
416    :param update_package: update package path
417    :param prelude_script: prelude object
418    :param verse_script: verse object
419    :param refrain_script: refrain object
420    :param ending_script: ending object
421    :return: If exception occurs, return False.
422    """
423    update_bin_obj, lib = create_update_bin()
424    OPTIONS_MANAGER.update_bin_obj = update_bin_obj
425
426    update_file_name = ''.join(
427        [OPTIONS_MANAGER.product, '_ota_',
428         time.strftime("%H%M%S", time.localtime())])
429    if not no_zip:
430        update_package_path = os.path.join(
431            update_package, '%s.zip' % update_file_name)
432        OPTIONS_MANAGER.update_package_file_path = update_package_path
433
434        create_script(prelude_script, verse_script,
435                      refrain_script, ending_script)
436
437        build_tools_zip_obj = create_build_tools_zip(lib)
438        if build_tools_zip_obj is False:
439            UPDATE_LOGGER.print_log(
440                "Create build tools zip failed!",
441                log_type=UPDATE_LOGGER.ERROR_LOG)
442            return False
443        OPTIONS_MANAGER.build_tools_zip_obj = build_tools_zip_obj
444        head_list = PkgHeader()
445        if OPTIONS_MANAGER.signing_length != SIGNING_LENGTH_256:
446            # PKG_DIGEST_TYPE_SHA384   3,use sha384
447            head_list.digest_method = 3
448        else:
449            # PKG_DIGEST_TYPE_SHA256   2,use sha256
450            head_list.digest_method = 2
451        if OPTIONS_MANAGER.private_key == ON_SERVER:
452            head_list.sign_method = 0
453        else:
454            if OPTIONS_MANAGER.signing_algorithm == "ECC":
455                # signing algorithm use ECC
456                head_list.sign_method = SignMethod.ECC.value
457            else:
458                # signing algorithm use RSA
459                head_list.sign_method = SignMethod.RSA.value
460        head_list.pkg_type = 2
461        head_list.pkg_flags = 0
462        head_list.entry_count = 2
463        pkg_components = PkgComponent * 2
464        component_list = pkg_components()
465        component_list[0].file_path = \
466            OPTIONS_MANAGER.update_bin_obj.name.encode("utf-8")
467        component_list[0].component_addr = 'update.bin'.encode("utf-8")
468        component_list[1].file_path = \
469            OPTIONS_MANAGER.build_tools_zip_obj.name.encode("utf-8")
470        component_list[1].component_addr = \
471            BUILD_TOOLS_FILE_NAME.encode("utf-8")
472
473        sign_info = SignInfo()
474        if OPTIONS_MANAGER.private_key == ON_SERVER:
475            private_key = "./update_package.py"
476        else:
477            private_key = OPTIONS_MANAGER.private_key.encode("utf-8")
478        lib = get_lib_api()
479        lib_l1 = get_lib_api(is_l2=False)
480        if OPTIONS_MANAGER.not_l2:
481            lib_l1.CreatePackageWithSignInfo(
482                ctypes.pointer(head_list), component_list,
483                update_package_path.encode("utf-8"),
484                private_key, ctypes.pointer(sign_info))
485        else:
486            lib.CreatePackage(
487                ctypes.pointer(head_list), component_list,
488                update_package_path.encode("utf-8"),
489                OPTIONS_MANAGER.private_key.encode("utf-8"))
490
491        if OPTIONS_MANAGER.private_key == ON_SERVER:
492            hash_code = "".join(["%x" % each for each in sign_info.hash_code])
493            signing_package(update_bin_obj.name,
494                            OPTIONS_MANAGER.hash_algorithm, hash_code,
495                            package_type='.zip')
496
497        UPDATE_LOGGER.print_log(".zip package signing success!")
498        UPDATE_LOGGER.print_log(
499            "Create update package .bin complete! path: %s" %
500            update_package_path)
501    else:
502        update_package_path = os.path.join(
503            update_package, '%s.bin' % update_file_name)
504        OPTIONS_MANAGER.update_package_file_path = update_package_path
505        with open(OPTIONS_MANAGER.update_bin_obj.name, 'rb') as r_f:
506            content = r_f.read()
507        with open(update_package_path, 'wb') as w_f:
508            w_f.write(content)
509    return True
510
511
512def get_hash_content(file_path, hash_algorithm):
513    """
514    Use SHA256SUM to get the hash value of the file.
515    :param file_path : file path
516    :param hash_algorithm: hash algorithm
517    :return hash_content: hash value
518    """
519    try:
520        cmd = [LINUX_HASH_ALGORITHM_DICT[hash_algorithm], file_path]
521    except KeyError:
522        UPDATE_LOGGER.print_log(
523            "Unsupported hash algorithm! %s" % hash_algorithm,
524            log_type=UPDATE_LOGGER.ERROR_LOG)
525        return None
526    if not os.path.exists(file_path):
527        UPDATE_LOGGER.print_log(
528            "%s failed!" % LINUX_HASH_ALGORITHM_DICT.get(hash_algorithm),
529            UPDATE_LOGGER.ERROR_LOG)
530        raise RuntimeError
531    process_obj = subprocess.Popen(
532        cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
533    process_obj.wait()
534    hash_content = \
535        process_obj.stdout.read().decode(encoding='gbk').split(' ')[0]
536    if len(hash_content) != HASH_CONTENT_LEN_DICT.get(hash_algorithm):
537        UPDATE_LOGGER.print_log(
538            "Get hash content failed! The length of the hash_content is 0!",
539            UPDATE_LOGGER.ERROR_LOG)
540        raise RuntimeError
541    if process_obj.returncode == 0:
542        UPDATE_LOGGER.print_log(
543            "Get hash content success! path: %s" % file_path)
544    return hash_content
545