1#!/usr/bin/env python 2# coding=utf-8 3 4# 5# Copyright (C) 2022 Huawei Technologies Co., Ltd. 6# Licensed under the Mulan PSL v2. 7# You can use this software according to the terms and conditions of the Mulan 8# PSL v2. 9# You may obtain a copy of Mulan PSL v2 at: 10# http://license.coscl.org.cn/MulanPSL2 11# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY 12# KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO 13# NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. 14# See the Mulan PSL v2 for more details. 15# 16 17import struct 18import os 19import stat 20import sys 21import hashlib 22import shutil 23import re 24import xml.etree.ElementTree as ET 25import subprocess 26import configparser 27import logging 28 29CONFIG_VERSION = 1 30CONFIG_CERT_PATH = './config_cert' 31 32BASE_POLICY_VERSION_TEE = 0b001 33BASE_POLICY_VERSION_OH = 0b101 34 35XML2TLV_PARSE_TOOL_INDEX = 1 36#use java parse xml 37XML2TLV_JAR_VALUE = 0 << XML2TLV_PARSE_TOOL_INDEX 38#use python parse xml 39XML2TLV_PY_VALUE = 1 << XML2TLV_PARSE_TOOL_INDEX 40logging.basicConfig(level=logging.INFO) 41 42 43def get_policy_version(config_file): 44 45 if not os.path.exists(config_file): 46 base_policy_ver = BASE_POLICY_VERSION_TEE 47 else: 48 cfg = Configuration(config_file) 49 if Configuration.check_cfg_format(cfg): 50 sys.exit(1) 51 if cfg.policy == "0": 52 base_policy_ver = BASE_POLICY_VERSION_OH 53 else: 54 base_policy_ver = BASE_POLICY_VERSION_TEE 55 if os.path.isfile("./xml2tlv.jar"): 56 policy_ver = base_policy_ver | XML2TLV_JAR_VALUE 57 else: 58 policy_ver = base_policy_ver | XML2TLV_PY_VALUE 59 return policy_ver 60 61 62def integer_check(intput_str): 63 if not str(intput_str).isdigit(): 64 return 1 65 return 0 66 67 68def whitelist_check(intput_str): 69 if not re.match(r"^[A-Za-z0-9\/\-_.]+$", intput_str): 70 return 1 71 return 0 72 73 74def run_cmd(command): 75 ret = subprocess.run(command, shell=False, check=True) 76 if ret.returncode != 0: 77 logging.error("run command failed.") 78 sys.exit(1) 79 80 81class LoadConfigHeader: 82 str = struct.Struct('IHHIIIIIIIII') 83 84 def __init__(self, data): 85 unpacked_data = (LoadConfigHeader.str).unpack(data.encode()) 86 self.unpacked_data = unpacked_data 87 self.magic_num = unpacked_data[0] 88 self.version = unpacked_data[1] 89 self.policy_version = unpacked_data[2] 90 self.context_len = unpacked_data[3] 91 self.ta_cert_len = unpacked_data[4] 92 self.config_len = unpacked_data[5] 93 self.sign_len = unpacked_data[6] 94 self.cfg_cert_len = unpacked_data[7] 95 self.reserved1 = unpacked_data[8] 96 self.reserved2 = unpacked_data[8] 97 self.reserved3 = unpacked_data[8] 98 self.reserved4 = unpacked_data[8] 99 100 def get_packed_data(self): 101 values = [self.magic_num, 102 self.version, 103 self.policy_version, 104 self.context_len, 105 self.ta_cert_len, 106 self.config_len, 107 self.sign_len, 108 self.cfg_cert_len, 109 self.reserved1, 110 self.reserved2, 111 self.reserved3, 112 self.reserved4, 113 ] 114 return (LoadConfigHeader.str).pack(*values) 115 116 117def pkg_config_header(hdr_len, magic_num, version, policy_version, 118 context_len, ta_cert_len, config_len, sign_len, cfg_cert_len): 119 config_hd_len = hdr_len 120 config_hd = LoadConfigHeader('\0' * config_hd_len) 121 config_hd.magic_num = magic_num 122 config_hd.version = version 123 config_hd.policy_version = policy_version 124 config_hd.context_len = context_len 125 config_hd.ta_cert_len = ta_cert_len 126 config_hd.config_len = config_len 127 config_hd.sign_len = sign_len 128 config_hd.cfg_cert_len = cfg_cert_len 129 return config_hd 130 131 132#---------------------------------------------------------------------------- 133# generate hash use SHA256 134#---------------------------------------------------------------------------- 135def generate_sha256_hash(in_buf): 136 # Initialize a SHA256 object from the Python hash library 137 obj = hashlib.sha256() 138 # Set the input buffer and return the output digest 139 obj.update(in_buf) 140 return obj.digest() 141 142 143def gen_rsa_signature(sign_conf_alg, config_buf, input_path_gen, output_file): 144 if sign_conf_alg == "RSA_PSS": 145 pri_key = os.path.join(CONFIG_CERT_PATH, "taconfig_key.pem") 146 msg_file = os.path.join(input_path_gen, "temp/config_msg") 147 fd_msg_file = os.open(msg_file, os.O_WRONLY | os.O_CREAT, \ 148 stat.S_IWUSR | stat.S_IRUSR) 149 msg_file_fp = os.fdopen(fd_msg_file, "wb") 150 msg_file_fp.write(config_buf) 151 msg_file_fp.close() 152 153 cmd = "openssl dgst -sign {} -sha256 -sigopt rsa_padding_mode:pss \ 154 -sigopt rsa_pss_saltlen:-1 \ 155 -out {} {}".format(pri_key, output_file, msg_file) 156 try: 157 subprocess.check_output(cmd.split(), shell=False) 158 except Exception: 159 logging.error("sign operation failed") 160 raise RuntimeError 161 logging.info("Sign Config with PSS Success") 162 else: 163 logging.error("Sign Config alg is not support!") 164 exit(0) 165 return 166 167 168def gen_ecdsa_signature(config_buf, input_path_gen, output_file): 169 msg_file = os.path.join(input_path_gen, "temp/config_msg") 170 fd_msg_file = os.open(msg_file, os.O_WRONLY | os.O_CREAT, \ 171 stat.S_IWUSR | stat.S_IRUSR) 172 msg_file_fp = os.fdopen(fd_msg_file, "wb") 173 msg_file_fp.write(config_buf) 174 msg_file_fp.close() 175 176 pri_key = os.path.join(CONFIG_CERT_PATH, "taconfig_key.pem") 177 cmd = ["openssl", "dgst", "-sha256", "-sign", pri_key, \ 178 "-out", output_file, msg_file] 179 run_cmd(cmd) 180 logging.critical('Sign Config Success') 181 return 182 183 184def gen_config_sign(sign_conf_alg, input_path_gen, header, 185 config, ta_cert, output_file): 186 temp_file = os.path.join(input_path_gen, "temp/file_to_sign") 187 fd_temp = os.open(temp_file, os.O_WRONLY | os.O_CREAT, \ 188 stat.S_IWUSR | stat.S_IRUSR) 189 temp_file_fp = os.fdopen(fd_temp, "wb") 190 temp_file_fp.write(header) 191 temp_file_fp.write(ta_cert) 192 temp_file_fp.write(config) 193 temp_file_fp.close() 194 195 temp_file_len = os.path.getsize(temp_file) 196 with open(temp_file, 'rb') as temp_file_fp: 197 config_buf = temp_file_fp.read(temp_file_len) 198 199 if sign_conf_alg == "ECDSA": 200 gen_ecdsa_signature(config_buf, input_path_gen, output_file) 201 else: # rsa 202 gen_rsa_signature(sign_conf_alg, config_buf, 203 input_path_gen, output_file) 204 return 205 206 207def convert_xml2tlv(xml_file, tlv_file, input_path, config_file): 208 policy_ver = get_policy_version(config_file) 209 if (policy_ver & (1 << XML2TLV_PARSE_TOOL_INDEX)) == XML2TLV_JAR_VALUE: 210 cmd = ["java", "-jar", "xml2tlv.jar", xml_file, tlv_file] 211 run_cmd(cmd) 212 if os.path.isfile(tlv_file): 213 logging.info('convert xml to tlv success') 214 elif (policy_ver & (1 << XML2TLV_PARSE_TOOL_INDEX)) == XML2TLV_PY_VALUE: 215 csv_dir = os.path.realpath(os.path.join(os.getcwd(), './')) 216 tag_parse_dict_file_path = os.path.join(csv_dir, './tag_parse_dict.csv') 217 218 from dyn_conf_parser import parser_config_xml 219 parser_config_xml(xml_file, tag_parse_dict_file_path, \ 220 tlv_file, input_path) 221 if os.path.isfile(tlv_file): 222 logging.info('convert xml to tlv success') 223 else: 224 logging.error('convert xml to tlv failed') 225 raise RuntimeError 226 else: 227 logging.error('invlid policy version') 228 raise RuntimeError 229 230 231def creat_temp_folder(input_path_creat): 232 temp_path = os.path.join(input_path_creat, 'temp') 233 if os.path.exists(temp_path): 234 shutil.rmtree(temp_path) 235 236 cmd = ["mkdir", temp_path] 237 run_cmd(cmd) 238 return 239 240 241def delete_temp_folder(input_path_delete): 242 if os.path.exists(input_path_delete + '/temp'): 243 shutil.rmtree(os.path.join(input_path_delete, "temp")) 244 if os.path.exists(input_path_delete + '/config_tlv'): 245 os.remove(os.path.join(input_path_delete, "config_tlv")) 246 return 247 248 249def check_dyn_perm(xml_config_file, input_path): 250 xml_tree = ET.parse(xml_config_file) 251 xml_root = xml_tree.getroot() 252 drv_perm = None 253 for child in xml_root.findall('drv_perm'): 254 if child != '': 255 drv_perm = child 256 if os.path.exists(os.path.join(input_path, 'temp')): 257 out_save_file = os.path.join(input_path, \ 258 'temp/configs_bak.xml') 259 xml_tree.write(out_save_file, encoding="utf-8") 260 xml_root.remove(child) 261 if drv_perm is not None: 262 newtree = ET.ElementTree(drv_perm) 263 if os.path.exists(os.path.join(input_path, 'temp')): 264 out_file = os.path.join(input_path, 'temp/dyn_perm.xml') 265 newtree.write(out_file, encoding="utf-8") 266 xml_tree.write(xml_config_file) 267 return 1 268 return 0 269 270 271def get_target_type_in_config(config_path, in_path): 272 tree = ET.parse(config_path) 273 drv_target_type = tree.find('./TA_Manifest_Info/target_type') 274 flag = os.O_RDWR | os.O_TRUNC | os.O_CREAT 275 mode = stat.S_IWUSR | stat.S_IRUSR 276 if drv_target_type is not None: 277 if drv_target_type.text == "1": 278 ans = "gpd.ta.dynConf:00000\n" 279 out_tlv = os.path.join(in_path, 'config_tlv') 280 with os.fdopen(os.open(out_tlv, flag, mode), 'w+') as conf: 281 conf.write(ans) 282 283 284class Configuration: 285 sign_alg = "RSA_PKCS1" 286 policy = "1" 287 288 def __init__(self, file_name): 289 parser = configparser.ConfigParser() 290 parser.read(file_name) 291 self.sign_alg = parser.get("signConfigPrivateCfg", "configSignAlg") 292 self.policy = parser.get("signConfigPrivateCfg", "configPolicy") 293 294 def check_cfg_format(self): 295 if whitelist_check(self.sign_alg): 296 logging.error("configSignAlg is invalid") 297 return 1 298 if whitelist_check(self.policy): 299 logging.error("signConfigPrivateCfg is invalid") 300 return 1 301 return 0 302 303def pack_signature(signature_path, signature_size): 304 add_size = 72 - signature_size 305 with open(signature_path, 'rb+') as signature_file: 306 signature_buf = signature_file.read(signature_size) 307 signature_file.seek(0) 308 for _ in range(0, add_size): 309 signature_file.write(b'\x00') 310 signature_file.write(signature_buf) 311 312 313def gen_config_header(sign_conf_alg, config_content_size, ta_cert_size, config_cert_size, config_file): 314 config_hd_len = 44 315 if sign_conf_alg == "ECDSA": 316 config_sign_size = 72 | 0xC0000000 317 final_sign_size = 72 318 else: # rsa 319 config_sign_size = 512 320 final_sign_size = 512 321 if sign_conf_alg == "RSA_PSS": 322 config_sign_size = config_sign_size | 0x80000000 323 config_context_size = config_content_size + ta_cert_size \ 324 + config_sign_size + config_cert_size 325 config_header = pkg_config_header(config_hd_len, 0xABCDABCD, \ 326 CONFIG_VERSION, get_policy_version(config_file), config_context_size, \ 327 ta_cert_size, config_content_size, config_sign_size, \ 328 config_cert_size) 329 return config_header, final_sign_size 330 331 332def get_tlv_buffer(input_path, config_file): 333 #convert xml to tlv 334 tlv_dynconf_data = os.path.join(input_path, "config_tlv") 335 xml_config_file = os.path.join(input_path, "configs.xml") 336 tlv_config_file = os.path.join(input_path, "temp/configs_tlv") 337 if check_dyn_perm(xml_config_file, input_path) != 0: 338 from dyn_conf_parser import parser_dyn_conf 339 dyn_conf_xml_file_path = os.path.join(input_path, 'temp/dyn_perm.xml') 340 csv_dir = os.path.realpath(os.path.join(os.getcwd(), './')) 341 tag_parse_dict_file_path = \ 342 os.path.join(csv_dir, './tag_parse_dict.csv') 343 parser_dyn_conf(dyn_conf_xml_file_path, "", \ 344 tag_parse_dict_file_path, input_path) 345 convert_xml2tlv(xml_config_file, tlv_config_file, \ 346 input_path, config_file) 347 348 src_file_path = os.path.join(input_path, 'temp/configs_bak.xml') 349 cmd = ["mv", src_file_path, xml_config_file] 350 run_cmd(cmd) 351 else: 352 convert_xml2tlv(xml_config_file, tlv_config_file, \ 353 input_path, config_file) 354 get_target_type_in_config(xml_config_file, input_path) 355 356 if os.path.exists(tlv_dynconf_data): 357 with open(tlv_config_file, 'rb') as tlv_config_fp: 358 tlv_config_buf = \ 359 tlv_config_fp.read(os.path.getsize(tlv_config_file)) 360 with open(tlv_dynconf_data, 'rb') as tlv_dynconf_fp: 361 tlv_config_buf = tlv_config_buf + \ 362 tlv_dynconf_fp.read(os.path.getsize(tlv_dynconf_data)) + b"\n" 363 config_content_size = len(tlv_config_buf) 364 else: 365 config_content_size = os.path.getsize(tlv_config_file) 366 with open(tlv_config_file, 'rb') as tlv_config_fp: 367 tlv_config_buf = tlv_config_fp.read(config_content_size) 368 return tlv_config_buf, config_content_size 369 370 371def gen_config_section(input_path, cert_path, config_section): 372 373 creat_temp_folder(input_path) 374 config_path = input_path 375 config_file = os.path.join(config_path, "config_tee_private_sample.ini") 376 if not os.path.exists(config_file): 377 logging.critical("config_tee_private_sample.ini is not exist.") 378 sign_conf_alg = "RSA_PSS" 379 else: 380 cfg = Configuration(config_file) 381 if Configuration.check_cfg_format(cfg): 382 sys.exit(1) 383 sign_conf_alg = cfg.sign_alg 384 385 tlv_config_buf, config_content_size = get_tlv_buffer(input_path, config_file) 386 ta_cert_size = os.path.getsize(cert_path) 387 with open(cert_path, 'rb') as ta_cert_fp: 388 ta_cert_buf = ta_cert_fp.read(ta_cert_size) 389 390 config_cert_path = os.path.join(CONFIG_CERT_PATH, "taconfig.der") 391 config_cert_size = os.path.getsize(config_cert_path) 392 with open(config_cert_path, 'rb') as config_cert_fp: 393 config_cert_buf = config_cert_fp.read(config_cert_size) 394 395 config_header, final_sign_size = gen_config_header(sign_conf_alg, config_content_size, ta_cert_size, \ 396 config_cert_size, config_file) 397 output_file = os.path.join(input_path, "temp/config_sign") 398 gen_config_sign(sign_conf_alg, input_path, \ 399 config_header.get_packed_data(), \ 400 tlv_config_buf, ta_cert_buf, output_file) 401 config_sign_size = os.path.getsize(output_file) 402 if sign_conf_alg == "ECDSA": 403 if config_sign_size != final_sign_size: 404 pack_signature(output_file, config_sign_size) 405 406 with open(output_file, 'rb') as config_sign_fp: 407 config_sign_buf = config_sign_fp.read(final_sign_size) 408 409 fd_config = os.open(config_section, os.O_WRONLY | os.O_CREAT, \ 410 stat.S_IWUSR | stat.S_IRUSR) 411 config_section_fp = os.fdopen(fd_config, "wb") 412 #write config header 413 config_section_fp.write(config_header.get_packed_data()) 414 #write ta_cert 415 config_section_fp.write(ta_cert_buf) 416 #write config content 417 config_section_fp.write(tlv_config_buf) 418 #write signature 419 config_section_fp.write(config_sign_buf) 420 #write config cert 421 config_section_fp.write(config_cert_buf) 422 config_section_fp.close() 423 424 delete_temp_folder(input_path) 425 return 426 427 428if __name__ == '__main__': 429 argv_data = sys.argv 430 ta_input_path = argv_data[1] 431 ta_cert_path = argv_data[2] 432 ta_config_section = argv_data[3] 433 434 if not os.path.exists(ta_input_path): 435 logging.error("ta_input_path does not exist.") 436 exit() 437 if not os.path.exists(ta_cert_path): 438 logging.error("ta_cert_path does not exist.") 439 exit() 440 441 if whitelist_check(ta_input_path): 442 logging.error("ta_input_path is incorrect.") 443 exit() 444 if whitelist_check(ta_cert_path): 445 logging.error("ta_cert_path is incorrect.") 446 exit() 447 if whitelist_check(ta_config_section): 448 logging.error("ta_config_section is incorrect.") 449 exit() 450 gen_config_section(ta_input_path, ta_cert_path, \ 451 ta_config_section)