1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2022 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import io 18import os 19import hashlib 20import struct 21 22from log_exception import UPDATE_LOGGER 23from asn1crypto import cms 24from asn1crypto import pem 25from asn1crypto import util 26from asn1crypto import x509 27from cryptography.hazmat.backends import default_backend 28from cryptography.hazmat.primitives import serialization 29from cryptography.hazmat.primitives.asymmetric import padding 30from cryptography.hazmat.primitives import hashes 31 32operation_path = os.path.dirname(os.path.realpath(__file__)) 33CERT_PATH = os.path.join(operation_path, 'sign_cert/signing_cert.crt') 34BLCOK_SIZE = 8192 35FOOTER_LENGTH = 6 36ZIP_ECOD_LENGTH = 22 37DIGEST_SHA256 = 672 38SHA256_HASH_LEN = 32 39 40CONTENT_INFO_FORMAT = "<2H32s" 41# the length of zip eocd comment 42ZIP_EOCD_COMMENT_LEN_FORMAT = "<H" 43# signed package footer 44SIGANTURE_FOOTER_FORMAT = "<3H" 45 46 47def load_public_cert(): 48 with open(CERT_PATH, 'rb') as cert_file: 49 der_bytes = cert_file.read() 50 if pem.detect(der_bytes): 51 type_name, headers, der_bytes = pem.unarmor(der_bytes) 52 53 return x509.Certificate.load(der_bytes) 54 55 56def calculate_package_hash(package_path): 57 """ 58 :return: (hash) for path using hashlib.sha256() 59 """ 60 hash_sha256 = hashlib.sha256() 61 length = 0 62 63 remain_len = os.path.getsize(package_path) - ZIP_ECOD_LENGTH 64 with open(package_path, 'rb') as package_file: 65 while remain_len > BLCOK_SIZE: 66 hash_sha256.update(package_file.read(BLCOK_SIZE)) 67 remain_len -= BLCOK_SIZE 68 if remain_len > 0: 69 hash_sha256.update(package_file.read(remain_len)) 70 71 return hash_sha256.digest() 72 73 74def sign_digest_with_pss(digset, private_key_file): 75 # read private key from pem file 76 try: 77 with open(private_key_file, 'rb') as f_r: 78 key_data = f_r.read() 79 80 private_key = serialization.load_pem_private_key( 81 key_data, 82 password=None, 83 backend=default_backend()) 84 pad = padding.PSS( 85 mgf=padding.MGF1(hashes.SHA256()), 86 salt_length=padding.PSS.MAX_LENGTH) 87 88 signature = private_key.sign( 89 digset, 90 pad, 91 hashes.SHA256() 92 ) 93 except (OSError, ValueError): 94 return False 95 return signature 96 97 98def sign_digest(digset, private_key_file): 99 # read private key from pem file 100 try: 101 with open(private_key_file, 'rb') as f_r: 102 key_data = f_r.read() 103 104 private_key = serialization.load_pem_private_key( 105 key_data, 106 password=None, 107 backend=default_backend()) 108 109 signature = private_key.sign( 110 digset, 111 padding.PKCS1v15(), 112 hashes.SHA256() 113 ) 114 except (OSError, ValueError): 115 return False 116 return signature 117 118 119def create_encap_content_info(diget): 120 if not diget: 121 UPDATE_LOGGER.print_log("calc package hash failed! file: %s", 122 log_type=UPDATE_LOGGER.ERROR_LOG) 123 return False 124 content_header = struct.pack(CONTENT_INFO_FORMAT, DIGEST_SHA256, 125 SHA256_HASH_LEN, diget) 126 return content_header 127 128 129def write_signed_package(unsigned_package, signature, signed_package): 130 """ 131 :Write signature to signed package 132 """ 133 signature_size = len(signature) 134 signature_total_size = signature_size + FOOTER_LENGTH 135 136 package_fd = os.open(signed_package, os.O_RDWR | os.O_CREAT, 0o755) 137 f_signed = os.fdopen(package_fd, 'wb') 138 139 remain_len = os.path.getsize(unsigned_package) - 2 140 with open(unsigned_package, 'rb') as f_unsign: 141 while remain_len > BLCOK_SIZE: 142 f_signed.write(f_unsign.read(BLCOK_SIZE)) 143 remain_len -= BLCOK_SIZE 144 if remain_len > 0: 145 f_signed.write(f_unsign.read(remain_len)) 146 147 zip_comment_len = struct.pack(ZIP_EOCD_COMMENT_LEN_FORMAT, 148 signature_total_size) 149 f_signed.write(zip_comment_len) 150 151 f_signed.write(signature) 152 footter = struct.pack(SIGANTURE_FOOTER_FORMAT, signature_total_size, 153 0xffff, signature_total_size) 154 f_signed.write(footter) 155 f_signed.close() 156 157 158def sign_ota_package(package_path, signed_package, private_key): 159 digest = calculate_package_hash(package_path) 160 data = create_encap_content_info(digest) 161 signature = sign_digest(digest, private_key) 162 163 digest_fd = os.open("digest", os.O_RDWR | os.O_CREAT, 0o755) 164 digest_file = os.fdopen(digest_fd, 'wb') 165 digest_file.write(digest) 166 digest_file.close() 167 168 signatute_fd = os.open("signature", os.O_RDWR | os.O_CREAT, 0o755) 169 signatute_file = os.fdopen(signatute_fd, 'wb') 170 signatute_file.write(signature) 171 signatute_file.close() 172 173 # Creating a SignedData object from cms 174 signed_data = cms.SignedData() 175 signed_data['version'] = 'v1' 176 signed_data['encap_content_info'] = util.OrderedDict([ 177 ('content_type', 'data'), 178 ('content', data)]) 179 180 signed_data['digest_algorithms'] = [util.OrderedDict([ 181 ('algorithm', 'sha256'), 182 ('parameters', None)])] 183 184 cert = load_public_cert() 185 186 # Adding this certificate to SignedData object 187 signed_data['certificates'] = [cert] 188 189 # Setting signer info section 190 signer_info = cms.SignerInfo() 191 signer_info['version'] = 'v1' 192 signer_info['digest_algorithm'] = util.OrderedDict([ 193 ('algorithm', 'sha256'), 194 ('parameters', None)]) 195 signer_info['signature_algorithm'] = util.OrderedDict([ 196 ('algorithm', 'sha256_rsa'), 197 ('parameters', None)]) 198 199 issuer = cert.issuer 200 serial_number = cert.serial_number 201 issuer_and_serial = cms.IssuerAndSerialNumber() 202 issuer_and_serial['issuer'] = cert.issuer 203 issuer_and_serial['serial_number'] = cert.serial_number 204 205 key_id = cert.key_identifier_value.native 206 signer_info['sid'] = cms.SignerIdentifier({ 207 'issuer_and_serial_number': issuer_and_serial}) 208 209 signer_info['signature'] = signature 210 # Adding SignerInfo object to SignedData object 211 signed_data['signer_infos'] = [signer_info] 212 213 # Writing everything into ASN.1 object 214 asn1obj = cms.ContentInfo() 215 asn1obj['content_type'] = 'signed_data' 216 asn1obj['content'] = signed_data 217 218 # This asn1obj can be dumped to a disk using dump() method (DER format) 219 write_signed_package(package_path, asn1obj.dump(), signed_package) 220 return True