• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2022 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16"""
17Description : Generate the update.bin file
18"""
19import os
20import struct
21import hashlib
22import subprocess
23from log_exception import UPDATE_LOGGER
24from utils import OPTIONS_MANAGER
25from create_hashdata import HashType
26from create_hashdata import CreateHash
27from create_hashdata import HASH_BLOCK_SIZE
28from cryptography.hazmat.primitives import serialization
29from cryptography.hazmat.primitives import hashes
30from cryptography.hazmat.backends import default_backend
31from cryptography.hazmat.primitives.asymmetric import padding
32
33UPGRADE_FILE_HEADER_LEN = 180
34UPGRADE_RESERVE_LEN = 16
35SIGN_SHA256_LEN = 256
36SIGN_SHA384_LEN = 384
37UPGRADE_SIGNATURE_LEN = SIGN_SHA256_LEN + SIGN_SHA384_LEN
38TLV_SIZE = 4
39UPGRADE_PKG_HEADER_SIZE = 136
40UPGRADE_PKG_TIME_SIZE = 32
41UPGRADE_COMPINFO_SIZE = 71
42UPGRADE_COMPINFO_SIZE_L2 = 87
43COMPONENT_ADDR_SIZE = 16
44COMPONENT_ADDR_SIZE_L2 = 32
45COMPONENT_INFO_FMT_SIZE = 5
46COMPONENT_VERSION_SIZE = 10
47COMPONENT_SIZE_FMT_SIZE = 8
48COMPONENT_DIGEST_SIZE = 32
49BLOCK_SIZE = 8192
50HEADER_TLV_TYPE = 0x11
51HEADER_TLV_TYPE_L2 = 0x01
52# signature algorithm
53SIGN_ALGO_RSA = "SHA256withRSA"
54SIGN_ALGO_PSS = "SHA256withPSS"
55
56"""
57Format
58H: unsigned short
59I: unsigned int
60B: unsigned char
61s: char[]
62"""
63TLV_FMT = "2H"
64UPGRADE_PKG_HEADER_FMT = "2I64s64s"
65UPGRADE_PKG_TIME_FMT = "16s16s"
66COMPONENT_INFO_FMT = "H3B"
67COMPONENT_SIZE_FMT = "iI"
68
69
70class CreatePackage(object):
71    """
72    Create the update.bin file
73    """
74
75    def __init__(self, head_list, component_list, save_path, key_path):
76        self.head_list = head_list
77        self.component_list = component_list
78        self.save_path = save_path
79        self.key_path = key_path
80        self.compinfo_offset = 0
81        self.component_offset = 0
82        self.sign_offset = 0
83        self.hash_info_offset = 0
84
85        if OPTIONS_MANAGER.not_l2:
86            self.upgrade_compinfo_size = UPGRADE_COMPINFO_SIZE
87            self.header_tlv_type = HEADER_TLV_TYPE
88        else:
89            self.upgrade_compinfo_size = UPGRADE_COMPINFO_SIZE_L2
90            self.header_tlv_type = HEADER_TLV_TYPE_L2
91
92    def verify_param(self):
93        if self.head_list is None or self.component_list is None or \
94            self.save_path is None or self.key_path is None:
95            UPDATE_LOGGER.print_log("Check param failed!", UPDATE_LOGGER.ERROR_LOG)
96            return False
97        if os.path.isdir(self.key_path):
98            UPDATE_LOGGER.print_log("Invalid keyname", UPDATE_LOGGER.ERROR_LOG)
99            return False
100        if self.head_list.__sizeof__() <= 0 or self.component_list.__sizeof__() <= 0:
101            UPDATE_LOGGER.print_log("Invalid param", UPDATE_LOGGER.ERROR_LOG)
102            return False
103        return True
104
105    def write_pkginfo(self, package_file):
106        try:
107            # Type is 1 for package header in TLV format
108            header_tlv = struct.pack(TLV_FMT, self.header_tlv_type, UPGRADE_PKG_HEADER_SIZE)
109            pkg_info_length = \
110                UPGRADE_RESERVE_LEN + TLV_SIZE + TLV_SIZE + TLV_SIZE + \
111                UPGRADE_PKG_HEADER_SIZE + UPGRADE_PKG_TIME_SIZE + \
112                self.upgrade_compinfo_size * self.head_list.entry_count
113            upgrade_pkg_header = struct.pack(
114                UPGRADE_PKG_HEADER_FMT, pkg_info_length, self.head_list.update_file_version,
115                self.head_list.product_update_id, self.head_list.software_version)
116
117            # Type is 2 for time in TLV format
118            time_tlv = struct.pack(TLV_FMT, 0x02, UPGRADE_PKG_TIME_SIZE)
119            upgrade_pkg_time = struct.pack(
120                UPGRADE_PKG_TIME_FMT, self.head_list.date, self.head_list.time)
121
122            # Type is 5 for component in TLV format
123            component_tlv = struct.pack(
124                TLV_FMT, 0x05, self.upgrade_compinfo_size * self.head_list.entry_count)
125        except struct.error:
126            UPDATE_LOGGER.print_log("Pack fail!", log_type=UPDATE_LOGGER.ERROR_LOG)
127            return False
128
129        # write pkginfo
130        pkginfo = header_tlv + upgrade_pkg_header + time_tlv + upgrade_pkg_time + component_tlv
131        try:
132            package_file.write(pkginfo)
133        except IOError:
134            UPDATE_LOGGER.print_log("write fail!", log_type=UPDATE_LOGGER.ERROR_LOG)
135            return False
136        UPDATE_LOGGER.print_log("Write package header complete")
137        return True
138
139    def write_component_info(self, component, package_file):
140        UPDATE_LOGGER.print_log("component information  StartOffset:%s"\
141            % self.compinfo_offset)
142        if OPTIONS_MANAGER.not_l2:
143            component_addr_size = COMPONENT_ADDR_SIZE
144        else:
145            component_addr_size = COMPONENT_ADDR_SIZE_L2
146
147        try:
148            package_file.seek(self.compinfo_offset)
149            package_file.write(component.component_addr)
150            self.compinfo_offset += component_addr_size
151
152            package_file.seek(self.compinfo_offset)
153            component_info = struct.pack(
154                COMPONENT_INFO_FMT, component.id, component.res_type,
155                component.flags, component.type)
156            package_file.write(component_info)
157            self.compinfo_offset += COMPONENT_INFO_FMT_SIZE
158
159            package_file.seek(self.compinfo_offset)
160            package_file.write(component.version)
161            self.compinfo_offset += COMPONENT_VERSION_SIZE
162
163            package_file.seek(self.compinfo_offset)
164            component_size = struct.pack(
165                COMPONENT_SIZE_FMT, component.size, component.original_size)
166            package_file.write(component_size)
167            self.compinfo_offset += COMPONENT_SIZE_FMT_SIZE
168
169            package_file.seek(self.compinfo_offset)
170            package_file.write(component.digest)
171            self.compinfo_offset += COMPONENT_DIGEST_SIZE
172        except (struct.error, IOError):
173            return False
174        return True
175
176    def write_component(self, component, package_file):
177        UPDATE_LOGGER.print_log("Add component to package  StartOffset:%s"\
178            % self.component_offset)
179        try:
180            with open(component.file_path, "rb") as component_file:
181                component_data = component_file.read()
182                package_file.seek(self.component_offset)
183                package_file.write(component_data)
184                component_len = len(component_data)
185                self.component_offset += component_len
186        except IOError:
187            return False
188        UPDATE_LOGGER.print_log("Write component complete  ComponentSize:%s"\
189            % component_len)
190        return True
191
192    def calculate_hash(self, package_file):
193        hash_sha256 = hashlib.sha256()
194        remain_len = self.component_offset
195
196        package_file.seek(0)
197        while remain_len > BLOCK_SIZE:
198            hash_sha256.update(package_file.read(BLOCK_SIZE))
199            remain_len -= BLOCK_SIZE
200        if remain_len > 0:
201            hash_sha256.update(package_file.read(remain_len))
202        return hash_sha256.digest()
203
204    def calculate_header_hash(self, package_file):
205        hash_sha256 = hashlib.sha256()
206        remain_len = self.hash_info_offset
207
208        package_file.seek(0)
209        while remain_len > BLOCK_SIZE:
210            hash_sha256.update(package_file.read(BLOCK_SIZE))
211            remain_len -= BLOCK_SIZE
212        if remain_len > 0:
213            hash_sha256.update(package_file.read(remain_len))
214        return hash_sha256.digest()
215
216    def sign_digest_with_pss(self, digest):
217        try:
218            with open(self.key_path, 'rb') as f_r:
219                key_data = f_r.read()
220            private_key = serialization.load_pem_private_key(
221                key_data,
222                password=None,
223                backend=default_backend())
224
225            pad = padding.PSS(
226                mgf=padding.MGF1(hashes.SHA256()),
227                salt_length=padding.PSS.MAX_LENGTH)
228
229            signature = private_key.sign(digest, pad, hashes.SHA256())
230        except (OSError, ValueError):
231            return False
232        return signature
233
234    def sign_digest(self, digest):
235        try:
236            with open(self.key_path, 'rb') as f_r:
237                key_data = f_r.read()
238            private_key = serialization.load_pem_private_key(
239                key_data,
240                password=None,
241                backend=default_backend())
242            signature = private_key.sign(digest, padding.PKCS1v15(), hashes.SHA256())
243        except (OSError, ValueError):
244            return False
245        return signature
246
247    def sign(self, sign_algo):
248        with open(self.save_path, "rb+") as package_file:
249            # calculate hash for .bin package
250            digest = self.calculate_hash(package_file)
251            if not digest:
252                UPDATE_LOGGER.print_log("calculate hash for .bin package failed",
253                    log_type=UPDATE_LOGGER.ERROR_LOG)
254                return False
255
256            # sign .bin package
257            if sign_algo == SIGN_ALGO_RSA:
258                signature = self.sign_digest(digest)
259            elif sign_algo == SIGN_ALGO_PSS:
260                signature = self.sign_digest_with_pss(digest)
261            else:
262                UPDATE_LOGGER.print_log("invalid sign_algo!", log_type=UPDATE_LOGGER.ERROR_LOG)
263                return False
264            if not signature:
265                UPDATE_LOGGER.print_log("sign .bin package failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
266                return False
267
268            if len(signature) == SIGN_SHA384_LEN:
269                self.sign_offset += SIGN_SHA256_LEN
270
271            # write signed .bin package
272            package_file.seek(self.sign_offset)
273            package_file.write(signature)
274            UPDATE_LOGGER.print_log(
275                ".bin package signing success! SignOffset: %s" % self.sign_offset)
276            return True
277
278    def sign_header(self, sign_algo, hash_check_data, package_file):
279        # calculate hash for .bin package
280        digest = self.calculate_header_hash(package_file)
281        if not digest:
282            UPDATE_LOGGER.print_log("calculate hash for .bin package failed",
283                log_type=UPDATE_LOGGER.ERROR_LOG)
284            return False
285
286        # sign .bin header
287        if sign_algo == SIGN_ALGO_RSA:
288            signature = self.sign_digest(digest)
289        elif sign_algo == SIGN_ALGO_PSS:
290            signature = self.sign_digest_with_pss(digest)
291        else:
292            UPDATE_LOGGER.print_log("invalid sign_algo!", log_type=UPDATE_LOGGER.ERROR_LOG)
293            return False
294        if not signature:
295            UPDATE_LOGGER.print_log("sign .bin package failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
296            return False
297
298        # write signed .bin header
299        hash_check_data.write_signdata(signature)
300        package_file.seek(self.hash_info_offset)
301        package_file.write(hash_check_data.signdata)
302        self.hash_info_offset += len(hash_check_data.signdata)
303        UPDATE_LOGGER.print_log(
304            ".bin package header signing success! SignOffset: %s" % self.hash_info_offset)
305        return True
306
307    def create_package(self):
308        """
309        Create the update.bin file
310        return: update package creation result
311        """
312        if not self.verify_param():
313            UPDATE_LOGGER.print_log("verify param failed!", UPDATE_LOGGER.ERROR_LOG)
314            return False
315
316        hash_check_data = CreateHash(HashType.SHA256, self.head_list.entry_count)
317        hash_check_data.write_hashinfo()
318        package_fd = os.open(self.save_path, os.O_RDWR | os.O_CREAT, 0o755)
319        with os.fdopen(package_fd, "wb+") as package_file:
320            # Add information to package
321            if not self.write_pkginfo(package_file):
322                UPDATE_LOGGER.print_log(
323                    "Write pkginfo failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
324                return False
325            # Add component to package
326            self.compinfo_offset = UPGRADE_FILE_HEADER_LEN
327            self.component_offset = UPGRADE_FILE_HEADER_LEN + \
328                self.head_list.entry_count * self.upgrade_compinfo_size + \
329                UPGRADE_RESERVE_LEN + SIGN_SHA256_LEN + SIGN_SHA384_LEN
330            for i in range(0, self.head_list.entry_count):
331                UPDATE_LOGGER.print_log("Add component %s"
332                    % self.component_list[i].component_addr)
333                if not self.write_component_info(self.component_list[i], package_file):
334                    UPDATE_LOGGER.print_log("write component info failed: %s"
335                        % self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG)
336                    return False
337                if OPTIONS_MANAGER.sd_card and (not hash_check_data.write_component_hash_data(self.component_list[i])):
338                    UPDATE_LOGGER.print_log("write component hash data failed: %s"
339                        % self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG)
340                    return False
341
342            try:
343                # Add descriptPackageId to package
344                package_file.seek(self.compinfo_offset)
345                package_file.write(
346                    (self.head_list.describe_package_id.decode().ljust(UPGRADE_RESERVE_LEN, "\0")).encode())
347            except IOError:
348                UPDATE_LOGGER.print_log(
349                    "Add descriptPackageId failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
350                return False
351
352            self.hash_info_offset = self.compinfo_offset + UPGRADE_RESERVE_LEN
353            if OPTIONS_MANAGER.sd_card:
354                try:
355                    # Add hash check data to package
356                    hash_check_data.write_hashdata()
357                    package_file.seek(self.hash_info_offset)
358                    package_file.write(hash_check_data.hashinfo_value + hash_check_data.hashdata)
359                    self.hash_info_offset += len(hash_check_data.hashinfo_value + hash_check_data.hashdata)
360
361                except IOError:
362                    UPDATE_LOGGER.print_log(
363                        "Add hash check data failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
364                    return False
365            self.sign_header(SIGN_ALGO_RSA, hash_check_data, package_file)
366            self.component_offset = self.hash_info_offset
367            for i in range(0, self.head_list.entry_count):
368                if not self.write_component(self.component_list[i], package_file):
369                    UPDATE_LOGGER.print_log("write component failed: %s"
370                        % self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG)
371                    return False
372
373        UPDATE_LOGGER.print_log("Write update package complete")
374        return True