• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2021 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16import binascii
17import copy
18import os
19import subprocess
20import tempfile
21import time
22import collections as collect
23import enum
24import ctypes
25import zipfile
26
27from log_exception import UPDATE_LOGGER
28from script_generator import create_script
29from utils import sign_package
30from utils import HASH_CONTENT_LEN_DICT
31from utils import OPTIONS_MANAGER
32from utils import REGISTER_SCRIPT_FILE_NAME
33from utils import ON_SERVER
34from utils import SCRIPT_KEY_LIST
35from utils import EXTEND_OPTIONAL_COMPONENT_LIST
36from utils import COMPONENT_INFO_INNIT
37from utils import UPDATE_EXE_FILE_NAME
38from utils import TOTAL_SCRIPT_FILE_NAME
39from utils import EXTEND_COMPONENT_LIST
40from utils import LINUX_HASH_ALGORITHM_DICT
41from utils import BUILD_TOOLS_FILE_NAME
42from utils import SIGN_PACKAGE_EVENT
43from create_update_package import CreatePackage
44from create_update_package import SIGN_ALGO_RSA
45from create_update_package import SIGN_ALGO_PSS
46
47IS_DEL = 0
48SIGNING_LENGTH_256 = 256
49DIGEST_LEN = 32
50HASH_VALUE_MAX_LEN = 128
51
52
53class SignMethod(enum.Enum):
54    RSA = 1
55    ECC = 2
56
57
58class PkgHeader(ctypes.Structure):
59    _fields_ = [("digest_method", ctypes.c_ubyte),
60                ("sign_method", ctypes.c_ubyte),
61                ("pkg_type", ctypes.c_ubyte),
62                ("pkg_flags", ctypes.c_ubyte),
63                ("entry_count", ctypes.c_int),
64                ("update_file_version", ctypes.c_int),
65                ("product_update_id", ctypes.c_char_p),
66                ("software_version", ctypes.c_char_p),
67                ("date", ctypes.c_char_p),
68                ("time", ctypes.c_char_p),
69                ("describe_package_id", ctypes.c_char_p)]
70
71
72class PkgComponent(ctypes.Structure):
73    _fields_ = [("digest", ctypes.c_ubyte * DIGEST_LEN),
74                ("file_path", ctypes.c_char_p),
75                ("component_addr", ctypes.c_char_p),
76                ("version", ctypes.c_char_p),
77                ("size", ctypes.c_int),
78                ("id", ctypes.c_int),
79                ("original_size", ctypes.c_int),
80                ("res_type", ctypes.c_ubyte),
81                ("type", ctypes.c_ubyte),
82                ("flags", ctypes.c_ubyte)]
83
84
85class SignInfo(ctypes.Structure):
86    _fields_ = [("sign_offset", ctypes.c_int),
87                ("hash_len", ctypes.c_int),
88                ("hash_code", ctypes.c_ubyte * (HASH_VALUE_MAX_LEN + 1))]
89
90
91def create_update_bin():
92    """
93    Call the interface to generate the update.bin file.
94    :return update_bin_obj: Update file object.
95                            If exception occurs, return False.
96    """
97    update_bin_obj = tempfile.NamedTemporaryFile(
98        dir=OPTIONS_MANAGER.update_package, prefix="update_bin-")
99
100    head_value_list = OPTIONS_MANAGER.head_info_list
101    component_dict = OPTIONS_MANAGER.component_info_dict
102    full_image_file_obj_list = OPTIONS_MANAGER.full_image_file_obj_list
103    full_img_list = OPTIONS_MANAGER.full_img_list
104    incremental_img_list = OPTIONS_MANAGER.incremental_img_list
105    incremental_image_file_obj_list = \
106        OPTIONS_MANAGER.incremental_image_file_obj_list
107
108    all_image_file_obj_list = \
109        incremental_image_file_obj_list + full_image_file_obj_list
110    if not OPTIONS_MANAGER.not_l2:
111        if OPTIONS_MANAGER.partition_file_obj is not None:
112            all_image_name = \
113                EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST + \
114                incremental_img_list + full_img_list
115        else:
116            all_image_name = \
117                EXTEND_COMPONENT_LIST + incremental_img_list + full_img_list
118    else:
119        all_image_name = \
120            incremental_img_list + full_img_list
121    sort_component_dict = collect.OrderedDict()
122    for each_image_name in all_image_name:
123        sort_component_dict[each_image_name] = \
124            component_dict.get(each_image_name)
125    component_dict = copy.deepcopy(sort_component_dict)
126    head_list = get_head_list(len(component_dict), head_value_list)
127
128    component_list = get_component_list(
129        all_image_file_obj_list, component_dict)
130
131    save_patch = update_bin_obj.name.encode("utf-8")
132    if OPTIONS_MANAGER.private_key == ON_SERVER:
133        private_key = "./update_package.py"
134    else:
135        private_key = OPTIONS_MANAGER.private_key.encode("utf-8")
136
137    if OPTIONS_MANAGER.not_l2:
138        sign_algo = SIGN_ALGO_PSS
139    else:
140        sign_algo = SIGN_ALGO_RSA
141
142    # create bin
143    package = CreatePackage(head_list, component_list, save_patch,
144        OPTIONS_MANAGER.private_key)
145    if not package.create_package():
146        UPDATE_LOGGER.print_log(
147            "Create update package .bin failed!", UPDATE_LOGGER.ERROR_LOG)
148        return False
149    if not package.sign(sign_algo):
150        UPDATE_LOGGER.print_log(".bin package signing failed", UPDATE_LOGGER.ERROR_LOG)
151        return False
152
153    UPDATE_LOGGER.print_log(
154        "Create update package .bin complete! path: %s" % update_bin_obj.name)
155    return update_bin_obj
156
157
158def get_component_list(all_image_file_obj_list, component_dict):
159    """
160    Get the list of component information according to
161    the component information structure.
162    :param all_image_file_obj_list: all image object file list
163    :param component_dict: Component information content dict
164    :return component_list: List of component information.
165                            If exception occurs, return False.
166    """
167    pkg_components = PkgComponent * len(component_dict)
168    component_list = pkg_components()
169    if not OPTIONS_MANAGER.not_l2:
170        if OPTIONS_MANAGER.partition_file_obj is not None:
171            extend_component_list = \
172                EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST
173            extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path,
174                                OPTIONS_MANAGER.board_list_file_path,
175                                OPTIONS_MANAGER.partition_file_obj.name]
176        else:
177            extend_component_list = EXTEND_COMPONENT_LIST
178            extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path,
179                                OPTIONS_MANAGER.board_list_file_path]
180    else:
181        extend_component_list = []
182        extend_path_list = []
183    idx = 0
184    for key, component in component_dict.items():
185        if idx < len(extend_component_list):
186            file_path = extend_path_list[idx]
187        else:
188            file_path = \
189                all_image_file_obj_list[idx - len(extend_component_list)].name
190        digest = get_hash_content(file_path, OPTIONS_MANAGER.hash_algorithm)
191        if not digest:
192            return
193        if component is None:
194            component = copy.copy(COMPONENT_INFO_INNIT)
195            component[0] = key
196        component_list[idx].digest = (ctypes.c_ubyte * 32).from_buffer_copy(
197            binascii.a2b_hex(digest.encode('utf-8')))
198        component_list[idx].file_path = file_path.encode("utf-8")
199        if not OPTIONS_MANAGER.not_l2:
200            component_list[idx].component_addr = \
201                ('/%s' % component[0]).encode("utf-8")
202        else:
203            component_list[idx].component_addr = \
204                ('%s' % component[0]).encode("utf-8")
205        component_list[idx].version = component[4].encode("utf-8")
206        component_list[idx].size = os.path.getsize(file_path)
207        component_list[idx].id = int(component[1])
208        if component[3] == 1:
209            component_list[idx].original_size = os.path.getsize(file_path)
210        else:
211            component_list[idx].original_size = 0
212        component_list[idx].res_type = int(component[2])
213        component_list[idx].type = int(component[3])
214        component_list[idx].flags = IS_DEL
215
216        idx += 1
217    return component_list
218
219
220def get_head_list(component_count, head_value_list):
221    """
222    According to the header structure, get the list of HEAD headers.
223    :param component_count: number of components
224    :param head_value_list: list of header values
225    :return head_list: header list
226    """
227    head_list = PkgHeader()
228    if OPTIONS_MANAGER.signing_length != SIGNING_LENGTH_256:
229        # PKG_DIGEST_TYPE_SHA384   3,use sha384
230        head_list.digest_method = 3
231    else:
232        # PKG_DIGEST_TYPE_SHA256   2,use sha256
233        head_list.digest_method = 2
234    if OPTIONS_MANAGER.private_key == ON_SERVER:
235        head_list.sign_method = 0
236    else:
237        if OPTIONS_MANAGER.signing_algorithm == "ECC":
238            # signing algorithm use ECC
239            head_list.sign_method = SignMethod.ECC.value
240        else:
241            # signing algorithm use RSA
242            head_list.sign_method = SignMethod.RSA.value
243    head_list.pkg_type = 1
244    if OPTIONS_MANAGER.not_l2:
245        head_list.pkg_flags = 1
246    else:
247        head_list.pkg_flags = 0
248    head_list.entry_count = component_count
249    head_list.update_file_version = int(head_value_list[0])
250    head_list.product_update_id = head_value_list[1].encode("utf-8")
251    head_list.software_version = head_value_list[2].encode("utf-8")
252    head_list.date = head_value_list[3].encode("utf-8")
253    head_list.time = head_value_list[4].encode("utf-8")
254    head_list.describe_package_id = ctypes.c_char_p("update/info.bin".encode())
255    return head_list
256
257
258def get_tools_component_list(count, opera_script_dict):
259    """
260    Get the list of component information according to
261    the component information structure.
262    :param count: number of components
263    :param opera_script_dict: script file name and path dict
264    :return component_list: list of component information.
265                            If exception occurs, return False.
266    """
267    pkg_components = PkgComponent * count
268    component_list = pkg_components()
269    component_value_list = list(opera_script_dict.keys())
270    component_num = 0
271    for i, component in enumerate(component_value_list):
272        component_list[i].file_path = component.encode("utf-8")
273        component_list[i].component_addr = \
274            (opera_script_dict[component]).encode("utf-8")
275        component_num += 1
276    return component_list, component_num
277
278
279def get_tools_head_list(component_count):
280    """
281    According to the header structure, get the list of HEAD headers.
282    :param component_count: number of components
283    :return head_list: header list
284    """
285    head_list = PkgHeader()
286    head_list.digest_method = 0
287    head_list.sign_method = 0
288    head_list.pkg_type = 2
289    head_list.pkg_flags = 0
290    head_list.entry_count = component_count
291    return head_list
292
293
294def get_signing_from_server(package_path, hash_algorithm, hash_code=None):
295    """
296    Server update package signature requires the vendor to
297    implement its own service signature interface, as shown below:
298    ip = ""
299    user_name = ""
300    pass_word = ""
301    signe_jar = ""
302    signing_config = [signe_jar, ip, user_name, pass_word,
303                      hash_code, hash_algorithm]
304    cmd = ' '.join(signing_config)
305    subprocess.Popen(
306        cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
307    :param package_path: update package file path
308    :param hash_algorithm: hash algorithm
309    :param hash_code: hash code
310    :return:
311    """
312    UPDATE_LOGGER.print_log("Signing %s, hash algorithm is: %s, "
313                            "Signing hash code: %s" %
314                            (package_path, hash_algorithm, hash_code))
315    signing_content = ""
316    return signing_content.encode()
317
318
319def create_build_tools_zip():
320    """
321    Create the update package file.
322    :param lib: lib object
323    :return:
324    """
325    opera_script_file_name_dict = OPTIONS_MANAGER.opera_script_file_name_dict
326    tmp_dict = {}
327    for each in SCRIPT_KEY_LIST:
328        tmp_dict[each] = []
329    if opera_script_file_name_dict == tmp_dict:
330        UPDATE_LOGGER.print_log(
331            "Script dict is null!",
332            log_type=UPDATE_LOGGER.ERROR_LOG)
333        return False
334    count = 0
335    opera_script_dict = {}
336    for each_value in opera_script_file_name_dict.values():
337        for each in each_value:
338            opera_script_dict[each[1].name] = each[0]
339            count += 1
340    # other_file_count --> 1(updater_binary) + 1(loadScript.us)
341    other_file_count = 2
342    count += other_file_count
343    if OPTIONS_MANAGER.register_script_file_obj is not None:
344        count += 1
345    head_list = get_tools_head_list(count)
346    component_list, num = \
347        get_tools_component_list(count, opera_script_dict)
348    total_script_file_obj = OPTIONS_MANAGER.total_script_file_obj
349    register_script_file_obj = OPTIONS_MANAGER.register_script_file_obj
350    update_exe_path = os.path.join(OPTIONS_MANAGER.target_package_dir,
351                                   UPDATE_EXE_FILE_NAME)
352    if not os.path.exists(update_exe_path):
353        UPDATE_LOGGER.print_log(
354            "updater_binary file does not exist!path: %s" % update_exe_path,
355            log_type=UPDATE_LOGGER.ERROR_LOG)
356        return False
357
358    file_obj = tempfile.NamedTemporaryFile(
359        dir=OPTIONS_MANAGER.update_package, prefix="build_tools-")
360    zip_file = zipfile.ZipFile(file_obj.name, 'w', zipfile.ZIP_DEFLATED)
361    # add opera_script to build_tools.zip
362    for key, value in opera_script_dict.items():
363        zip_file.write(key, value)
364
365    # add update_binary to build_tools.zip
366    zip_file.write(update_exe_path, UPDATE_EXE_FILE_NAME)
367
368    # add loadScript to build_tools.zip
369    zip_file.write(total_script_file_obj.name, TOTAL_SCRIPT_FILE_NAME)
370
371    if OPTIONS_MANAGER.register_script_file_obj is not None:
372        zip_file.write(register_script_file_obj.name, REGISTER_SCRIPT_FILE_NAME)
373    zip_file.close()
374
375    return file_obj
376
377
378def build_update_package(no_zip, update_package, prelude_script,
379                         verse_script, refrain_script, ending_script):
380    """
381    Create the update package file.
382    :param no_zip: no zip
383    :param update_package: update package path
384    :param prelude_script: prelude object
385    :param verse_script: verse object
386    :param refrain_script: refrain object
387    :param ending_script: ending object
388    :return: If exception occurs, return False.
389    """
390    update_bin_obj = create_update_bin()
391    if update_bin_obj:
392        OPTIONS_MANAGER.update_bin_obj = update_bin_obj
393    else:
394        return False
395
396    if OPTIONS_MANAGER.sd_card :
397        package_type = "sd"
398    elif OPTIONS_MANAGER.source_package :
399        package_type = "diff"
400    else :
401        package_type = "full"
402    if OPTIONS_MANAGER.not_l2:
403        update_file_name = ''.join(
404            ["updater_", OPTIONS_MANAGER.target_package_version.replace(" ", "_")])
405    else :
406        update_file_name = ''.join(
407            ["updater_", package_type])
408
409    if not no_zip:
410        update_package_path = os.path.join(
411            update_package, '%s_unsigned.zip' % update_file_name)
412        OPTIONS_MANAGER.update_package_file_path = update_package_path
413
414        create_script(prelude_script, verse_script,
415                      refrain_script, ending_script)
416
417        build_tools_zip_obj = create_build_tools_zip()
418        if build_tools_zip_obj is False:
419            UPDATE_LOGGER.print_log(
420                "Create build tools zip failed!",
421                log_type=UPDATE_LOGGER.ERROR_LOG)
422            return False
423        OPTIONS_MANAGER.build_tools_zip_obj = build_tools_zip_obj
424
425        zip_file = zipfile.ZipFile(update_package_path, 'w', zipfile.ZIP_DEFLATED, allowZip64=True)
426        # add update.bin to update package
427        zip_file.write(OPTIONS_MANAGER.update_bin_obj.name, "update.bin")
428        # add build_tools.zip to update package
429        zip_file.write(OPTIONS_MANAGER.build_tools_zip_obj.name, BUILD_TOOLS_FILE_NAME)
430        zip_file.close()
431
432        signed_package = os.path.join(
433            update_package, "%s.zip" % update_file_name)
434        OPTIONS_MANAGER.signed_package = signed_package
435        if os.path.exists(signed_package):
436            os.remove(signed_package)
437
438        sign_ota_package = \
439            OPTIONS_MANAGER.init.invoke_event(SIGN_PACKAGE_EVENT)
440        if sign_ota_package:
441            sign_result = sign_ota_package()
442        else:
443            sign_result = sign_package()
444
445        if not sign_result:
446            UPDATE_LOGGER.print_log("Sign ota package fail", UPDATE_LOGGER.ERROR_LOG)
447            return False
448        if os.path.exists(update_package_path):
449            os.remove(update_package_path)
450    else:
451        update_package_path = os.path.join(
452            update_package, '%s.bin' % update_file_name)
453        if os.path.exists(update_package_path):
454            os.remove(update_package_path)
455        OPTIONS_MANAGER.update_package_file_path = update_package_path
456        with open(OPTIONS_MANAGER.update_bin_obj.name, 'rb') as r_f:
457            content = r_f.read()
458        with open(update_package_path, 'wb') as w_f:
459            w_f.write(content)
460    return True
461
462
463def get_hash_content(file_path, hash_algorithm):
464    """
465    Use SHA256SUM to get the hash value of the file.
466    :param file_path : file path
467    :param hash_algorithm: hash algorithm
468    :return hash_content: hash value
469    """
470    try:
471        cmd = [LINUX_HASH_ALGORITHM_DICT[hash_algorithm], file_path]
472    except KeyError:
473        UPDATE_LOGGER.print_log(
474            "Unsupported hash algorithm! %s" % hash_algorithm,
475            log_type=UPDATE_LOGGER.ERROR_LOG)
476        return False
477    if not os.path.exists(file_path):
478        UPDATE_LOGGER.print_log(
479            "%s failed!" % LINUX_HASH_ALGORITHM_DICT[hash_algorithm],
480            UPDATE_LOGGER.ERROR_LOG)
481        raise RuntimeError
482    process_obj = subprocess.Popen(
483        cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
484    process_obj.wait()
485    hash_content = \
486        process_obj.stdout.read().decode(encoding='gbk').split(' ')[0]
487    if len(hash_content) != HASH_CONTENT_LEN_DICT.get(hash_algorithm):
488        UPDATE_LOGGER.print_log(
489            "Get hash content failed! The length of the hash_content is 0!",
490            UPDATE_LOGGER.ERROR_LOG)
491        raise RuntimeError
492    if process_obj.returncode == 0:
493        UPDATE_LOGGER.print_log(
494            "Get hash content success! path: %s" % file_path)
495    return hash_content
496