• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2022 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16"""
17Description : Generate the update.bin file
18"""
19import os
20import struct
21import hashlib
22import subprocess
23from log_exception import UPDATE_LOGGER
24from utils import OPTIONS_MANAGER
25from cryptography.hazmat.primitives import serialization
26from cryptography.hazmat.primitives import hashes
27from cryptography.hazmat.backends import default_backend
28from cryptography.hazmat.primitives.asymmetric import padding
29
30UPGRADE_FILE_HEADER_LEN = 180
31UPGRADE_RESERVE_LEN = 16
32SIGN_SHA256_LEN = 256
33SIGN_SHA384_LEN = 384
34UPGRADE_SIGNATURE_LEN = SIGN_SHA256_LEN + SIGN_SHA384_LEN
35TLV_SIZE = 4
36UPGRADE_PKG_HEADER_SIZE = 136
37UPGRADE_PKG_TIME_SIZE = 32
38UPGRADE_COMPINFO_SIZE = 71
39UPGRADE_COMPINFO_SIZE_L2 = 87
40COMPONEBT_ADDR_SIZE = 16
41COMPONEBT_ADDR_SIZE_L2 = 32
42COMPONENT_INFO_FMT_SIZE = 5
43COMPONENT_VERSION_SIZE = 10
44COMPONENT_SIZE_FMT_SIZE = 8
45COMPONENT_DIGEST_SIZE = 32
46BLCOK_SIZE = 8192
47HEADER_TLV_TYPE = 0x11
48HEADER_TLV_TYPE_L2 = 0x01
49# signature algorithm
50SIGN_ALGO_RSA = "SHA256withRSA"
51SIGN_ALGO_PSS = "SHA256withPSS"
52
53"""
54Format
55H: unsigned short
56I: unsigned int
57B: unsigned char
58s: char[]
59"""
60TLV_FMT = "2H"
61UPGRADE_PKG_HEADER_FMT = "2I64s64s"
62UPGRADE_PKG_TIME_FMT = "16s16s"
63COMPONENT_INFO_FMT = "H3B"
64COMPONENT_SIZE_FMT = "iI"
65
66
67class CreatePackage(object):
68    """
69    Create the update.bin file
70    """
71
72    def __init__(self, head_list, component_list, save_path, key_path):
73        self.head_list = head_list
74        self.component_list = component_list
75        self.save_path = save_path
76        self.key_path = key_path
77        self.compinfo_offset = 0
78        self.component_offset = 0
79        self.sign_offset = 0
80
81        if OPTIONS_MANAGER.not_l2:
82            self.upgrade_compinfo_size = UPGRADE_COMPINFO_SIZE
83            self.header_tlv_type = HEADER_TLV_TYPE
84        else:
85            self.upgrade_compinfo_size = UPGRADE_COMPINFO_SIZE_L2
86            self.header_tlv_type = HEADER_TLV_TYPE_L2
87
88    def verify_param(self):
89        if self.head_list is None or self.component_list is None or \
90            self.save_path is None or self.key_path is None:
91            UPDATE_LOGGER.print_log("Check param failed!", UPDATE_LOGGER.ERROR_LOG)
92            return False
93        if os.path.isdir(self.key_path):
94            UPDATE_LOGGER.print_log("Invalid keyname", UPDATE_LOGGER.ERROR_LOG)
95            return False
96        if self.head_list.__sizeof__() <= 0 or self.component_list.__sizeof__() <= 0:
97            UPDATE_LOGGER.print_log("Invalid param", UPDATE_LOGGER.ERROR_LOG)
98            return False
99        return True
100
101    def write_pkginfo(self, package_file):
102        try:
103            # Type is 1 for package header in TLV format
104            header_tlv = struct.pack(TLV_FMT, self.header_tlv_type, UPGRADE_PKG_HEADER_SIZE)
105            pkg_info_length = \
106                UPGRADE_RESERVE_LEN + TLV_SIZE + TLV_SIZE + TLV_SIZE + \
107                UPGRADE_PKG_HEADER_SIZE + UPGRADE_PKG_TIME_SIZE + \
108                self.upgrade_compinfo_size * self.head_list.entry_count
109            upgrade_pkg_header = struct.pack(
110                UPGRADE_PKG_HEADER_FMT, pkg_info_length, self.head_list.update_file_version,
111                self.head_list.product_update_id, self.head_list.software_version)
112
113            # Type is 2 for time in TLV format
114            time_tlv = struct.pack(TLV_FMT, 0x02, UPGRADE_PKG_TIME_SIZE)
115            upgrade_pkg_time = struct.pack(
116                UPGRADE_PKG_TIME_FMT, self.head_list.date, self.head_list.time)
117
118            # Type is 5 for component in TLV format
119            component_tlv = struct.pack(
120                TLV_FMT, 0x05, self.upgrade_compinfo_size * self.head_list.entry_count)
121        except struct.error:
122            UPDATE_LOGGER.print_log("Pack fail!", log_type=UPDATE_LOGGER.ERROR_LOG)
123            return False
124
125        # write pkginfo
126        pkginfo = header_tlv + upgrade_pkg_header + time_tlv + upgrade_pkg_time + component_tlv
127        try:
128            package_file.write(pkginfo)
129        except IOError:
130            UPDATE_LOGGER.print_log("write fail!", log_type=UPDATE_LOGGER.ERROR_LOG)
131            return False
132        UPDATE_LOGGER.print_log("Write package header complete")
133        return True
134
135    def write_component_info(self, component, package_file):
136        UPDATE_LOGGER.print_log("component information  StartOffset:%s"\
137            % self.compinfo_offset)
138        if OPTIONS_MANAGER.not_l2:
139            component_addr_size = COMPONEBT_ADDR_SIZE
140        else:
141            component_addr_size = COMPONEBT_ADDR_SIZE_L2
142
143        try:
144            package_file.seek(self.compinfo_offset)
145            package_file.write(component.component_addr)
146            self.compinfo_offset += component_addr_size
147
148            package_file.seek(self.compinfo_offset)
149            component_info = struct.pack(
150                COMPONENT_INFO_FMT, component.id, component.res_type,
151                component.flags, component.type)
152            package_file.write(component_info)
153            self.compinfo_offset += COMPONENT_INFO_FMT_SIZE
154
155            package_file.seek(self.compinfo_offset)
156            package_file.write(component.version)
157            self.compinfo_offset += COMPONENT_VERSION_SIZE
158
159            package_file.seek(self.compinfo_offset)
160            component_size = struct.pack(
161                COMPONENT_SIZE_FMT, component.size, component.original_size)
162            package_file.write(component_size)
163            self.compinfo_offset += COMPONENT_SIZE_FMT_SIZE
164
165            package_file.seek(self.compinfo_offset)
166            package_file.write(component.digest)
167            self.compinfo_offset += COMPONENT_DIGEST_SIZE
168        except (struct.error, IOError):
169            return False
170        return True
171
172    def write_component(self, component, package_file):
173        UPDATE_LOGGER.print_log("Add component to package  StartOffset:%s"\
174            % self.component_offset)
175        try:
176            with open(component.file_path, "rb") as component_file:
177                component_data = component_file.read()
178                package_file.seek(self.component_offset)
179                package_file.write(component_data)
180                component_len = len(component_data)
181                self.component_offset += component_len
182        except IOError:
183            return False
184        UPDATE_LOGGER.print_log("Write component complete  ComponentSize:%s"\
185            % component_len)
186        return True
187
188    def calculate_hash(self, package_file):
189        hash_sha256 = hashlib.sha256()
190        remain_len = self.component_offset
191
192        package_file.seek(0)
193        while remain_len > BLCOK_SIZE:
194            hash_sha256.update(package_file.read(BLCOK_SIZE))
195            remain_len -= BLCOK_SIZE
196        if remain_len > 0:
197            hash_sha256.update(package_file.read(remain_len))
198        return hash_sha256.digest()
199
200    def sign_digest_with_pss(self, digset):
201        try:
202            with open(self.key_path, 'rb') as f_r:
203                key_data = f_r.read()
204            private_key = serialization.load_pem_private_key(
205                key_data,
206                password=None,
207                backend=default_backend())
208
209            pad = padding.PSS(
210                mgf=padding.MGF1(hashes.SHA256()),
211                salt_length=padding.PSS.MAX_LENGTH)
212
213            signature = private_key.sign(digset, pad, hashes.SHA256())
214        except (OSError, ValueError):
215            return False
216        return signature
217
218    def sign_digest(self, digset):
219        try:
220            with open(self.key_path, 'rb') as f_r:
221                key_data = f_r.read()
222            private_key = serialization.load_pem_private_key(
223                key_data,
224                password=None,
225                backend=default_backend())
226            signature = private_key.sign(digset, padding.PKCS1v15(), hashes.SHA256())
227        except (OSError, ValueError):
228            return False
229        return signature
230
231    def sign(self, sign_algo):
232        with open(self.save_path, "rb+") as package_file:
233            # calculate hash for .bin package
234            digest = self.calculate_hash(package_file)
235            if not digest:
236                UPDATE_LOGGER.print_log("calculate hash for .bin package failed",
237                    log_type=UPDATE_LOGGER.ERROR_LOG)
238                return False
239
240            # sign .bin package
241            if sign_algo == SIGN_ALGO_RSA:
242                signature = self.sign_digest(digest)
243            elif sign_algo == SIGN_ALGO_PSS:
244                signature = self.sign_digest_with_pss(digest)
245            else:
246                UPDATE_LOGGER.print_log("invalid sign_algo!", log_type=UPDATE_LOGGER.ERROR_LOG)
247                return False
248            if not signature:
249                UPDATE_LOGGER.print_log("sign .bin package failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
250                return False
251
252            if len(signature) == SIGN_SHA384_LEN:
253                self.sign_offset += SIGN_SHA256_LEN
254
255            # write signed .bin package
256            package_file.seek(self.sign_offset)
257            package_file.write(signature)
258            UPDATE_LOGGER.print_log(
259                ".bin package signing success! SignOffset: %s" % self.sign_offset)
260            return True
261
262    def create_package(self):
263        """
264        Create the update.bin file
265        return: update package creation result
266        """
267        if not self.verify_param():
268            UPDATE_LOGGER.print_log("verify param failed!", UPDATE_LOGGER.ERROR_LOG)
269            return False
270        package_fd = os.open(self.save_path, os.O_RDWR | os.O_CREAT, 0o755)
271        with os.fdopen(package_fd, "wb+") as package_file:
272            # Add information to package
273            if not self.write_pkginfo(package_file):
274                UPDATE_LOGGER.print_log(
275                    "Write pkginfo failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
276                return False
277            # Add component to package
278            self.compinfo_offset = UPGRADE_FILE_HEADER_LEN
279            self.component_offset = UPGRADE_FILE_HEADER_LEN + \
280                self.head_list.entry_count * self.upgrade_compinfo_size + \
281                UPGRADE_RESERVE_LEN + SIGN_SHA256_LEN + SIGN_SHA384_LEN
282            for i in range(0, self.head_list.entry_count):
283                UPDATE_LOGGER.print_log("Add component %s"
284                    % self.component_list[i].component_addr)
285                if not self.write_component_info(self.component_list[i], package_file):
286                    UPDATE_LOGGER.print_log("write component info failed: %s"
287                        % self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG)
288                    return False
289                if not self.write_component(self.component_list[i], package_file):
290                    UPDATE_LOGGER.print_log("write component failed: %s"
291                        % self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG)
292                    return False
293            try:
294                # Add descriptPackageId to package
295                package_file.seek(self.compinfo_offset)
296                package_file.write(self.head_list.describe_package_id)
297            except IOError:
298                UPDATE_LOGGER.print_log(
299                    "Add descriptPackageId failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
300                return False
301            try:
302                # Sign
303                self.sign_offset = self.compinfo_offset + UPGRADE_RESERVE_LEN
304                package_file.seek(self.sign_offset)
305                sign_buffer = bytes(UPGRADE_SIGNATURE_LEN)
306                package_file.write(sign_buffer)
307            except IOError:
308                UPDATE_LOGGER.print_log(
309                    "Add Sign failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
310                return False
311        UPDATE_LOGGER.print_log("Write update package complete")
312        return True