• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2022 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import io
18import os
19import hashlib
20import struct
21
22from log_exception import UPDATE_LOGGER
23from asn1crypto import cms
24from asn1crypto import pem
25from asn1crypto import util
26from asn1crypto import x509
27from cryptography.hazmat.backends import default_backend
28from cryptography.hazmat.primitives import serialization
29from cryptography.hazmat.primitives.asymmetric import padding
30from cryptography.hazmat.primitives import hashes
31
32operation_path = os.path.dirname(os.path.realpath(__file__))
33CERT_PATH = os.path.join(operation_path, 'sign_cert/signing_cert.crt')
34BLCOK_SIZE = 8192
35FOOTER_LENGTH = 6
36ZIP_ECOD_LENGTH = 22
37DIGEST_SHA256 = 672
38SHA256_HASH_LEN = 32
39
40CONTENT_INFO_FORMAT = "<2H32s"
41# the length of zip eocd comment
42ZIP_EOCD_COMMENT_LEN_FORMAT = "<H"
43# signed package footer
44SIGANTURE_FOOTER_FORMAT = "<3H"
45
46
47def load_public_cert():
48    with open(CERT_PATH, 'rb') as cert_file:
49        der_bytes = cert_file.read()
50        if pem.detect(der_bytes):
51            type_name, headers, der_bytes = pem.unarmor(der_bytes)
52
53    return x509.Certificate.load(der_bytes)
54
55
56def calculate_package_hash(package_path):
57    """
58    :return: (hash) for path using hashlib.sha256()
59    """
60    hash_sha256 = hashlib.sha256()
61    length = 0
62
63    remain_len = os.path.getsize(package_path) - ZIP_ECOD_LENGTH
64    with open(package_path, 'rb') as package_file:
65        while remain_len > BLCOK_SIZE:
66            hash_sha256.update(package_file.read(BLCOK_SIZE))
67            remain_len -= BLCOK_SIZE
68        if remain_len > 0:
69            hash_sha256.update(package_file.read(remain_len))
70
71    return hash_sha256.digest()
72
73
74def sign_digest_with_pss(digset, private_key_file):
75    # read private key from pem file
76    try:
77        with open(private_key_file, 'rb') as f_r:
78            key_data = f_r.read()
79
80        private_key = serialization.load_pem_private_key(
81            key_data,
82            password=None,
83            backend=default_backend())
84        pad = padding.PSS(
85            mgf=padding.MGF1(hashes.SHA256()),
86            salt_length=padding.PSS.MAX_LENGTH)
87
88        signature = private_key.sign(
89            digset,
90            pad,
91            hashes.SHA256()
92        )
93    except (OSError, ValueError):
94        return False
95    return signature
96
97
98def sign_digest(digset, private_key_file):
99    # read private key from pem file
100    try:
101        with open(private_key_file, 'rb') as f_r:
102            key_data = f_r.read()
103
104        private_key = serialization.load_pem_private_key(
105            key_data,
106            password=None,
107            backend=default_backend())
108
109        signature = private_key.sign(
110            digset,
111            padding.PKCS1v15(),
112            hashes.SHA256()
113        )
114    except (OSError, ValueError):
115        return False
116    return signature
117
118
119def create_encap_content_info(diget):
120    if not diget:
121        UPDATE_LOGGER.print_log("calc package hash failed! file: %s",
122            log_type=UPDATE_LOGGER.ERROR_LOG)
123        return False
124    content_header = struct.pack(CONTENT_INFO_FORMAT, DIGEST_SHA256,
125        SHA256_HASH_LEN, diget)
126    return content_header
127
128
129def write_signed_package(unsigned_package, signature, signed_package):
130    """
131    :Write signature to signed package
132    """
133    signature_size = len(signature)
134    signature_total_size = signature_size + FOOTER_LENGTH
135
136    package_fd = os.open(signed_package, os.O_RDWR | os.O_CREAT, 0o755)
137    f_signed = os.fdopen(package_fd, 'wb')
138
139    remain_len = os.path.getsize(unsigned_package) - 2
140    with open(unsigned_package, 'rb') as f_unsign:
141        while remain_len > BLCOK_SIZE:
142            f_signed.write(f_unsign.read(BLCOK_SIZE))
143            remain_len -= BLCOK_SIZE
144        if remain_len > 0:
145            f_signed.write(f_unsign.read(remain_len))
146
147    zip_comment_len = struct.pack(ZIP_EOCD_COMMENT_LEN_FORMAT,
148            signature_total_size)
149    f_signed.write(zip_comment_len)
150
151    f_signed.write(signature)
152    footter = struct.pack(SIGANTURE_FOOTER_FORMAT, signature_total_size,
153            0xffff, signature_total_size)
154    f_signed.write(footter)
155    f_signed.close()
156
157
158def sign_ota_package(package_path, signed_package, private_key):
159    digest = calculate_package_hash(package_path)
160    data = create_encap_content_info(digest)
161    signature = sign_digest(digest, private_key)
162
163    digest_fd = os.open("digest", os.O_RDWR | os.O_CREAT, 0o755)
164    digest_file = os.fdopen(digest_fd, 'wb')
165    digest_file.write(digest)
166    digest_file.close()
167
168    signatute_fd = os.open("signature", os.O_RDWR | os.O_CREAT, 0o755)
169    signatute_file = os.fdopen(signatute_fd, 'wb')
170    signatute_file.write(signature)
171    signatute_file.close()
172
173    # Creating a SignedData object from cms
174    signed_data = cms.SignedData()
175    signed_data['version'] = 'v1'
176    signed_data['encap_content_info'] = util.OrderedDict([
177        ('content_type', 'data'),
178        ('content', data)])
179
180    signed_data['digest_algorithms'] = [util.OrderedDict([
181        ('algorithm', 'sha256'),
182        ('parameters', None)])]
183
184    cert = load_public_cert()
185
186    # Adding this certificate to SignedData object
187    signed_data['certificates'] = [cert]
188
189    # Setting signer info section
190    signer_info = cms.SignerInfo()
191    signer_info['version'] = 'v1'
192    signer_info['digest_algorithm'] = util.OrderedDict([
193                ('algorithm', 'sha256'),
194                ('parameters', None)])
195    signer_info['signature_algorithm'] = util.OrderedDict([
196                ('algorithm', 'sha256_rsa'),
197                ('parameters', None)])
198
199    issuer = cert.issuer
200    serial_number = cert.serial_number
201    issuer_and_serial = cms.IssuerAndSerialNumber()
202    issuer_and_serial['issuer'] = cert.issuer
203    issuer_and_serial['serial_number'] = cert.serial_number
204
205    key_id = cert.key_identifier_value.native
206    signer_info['sid'] = cms.SignerIdentifier({
207        'issuer_and_serial_number': issuer_and_serial})
208
209    signer_info['signature'] = signature
210    # Adding SignerInfo object to SignedData object
211    signed_data['signer_infos'] = [signer_info]
212
213    # Writing everything into ASN.1 object
214    asn1obj = cms.ContentInfo()
215    asn1obj['content_type'] = 'signed_data'
216    asn1obj['content'] = signed_data
217
218    # This asn1obj can be dumped to a disk using dump() method (DER format)
219    write_signed_package(package_path, asn1obj.dump(), signed_package)
220    return True