• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# coding=utf-8
3
4#
5# Copyright (C) 2022 Huawei Technologies Co., Ltd.
6# Licensed under the Mulan PSL v2.
7# You can use this software according to the terms and conditions of the Mulan
8# PSL v2.
9# You may obtain a copy of Mulan PSL v2 at:
10#     http://license.coscl.org.cn/MulanPSL2
11# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY
12# KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
13# NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14# See the Mulan PSL v2 for more details.
15#
16
17import struct
18import os
19import stat
20import sys
21import hashlib
22import shutil
23import re
24import xml.etree.ElementTree as ET
25import subprocess
26import configparser
27import logging
28
29CONFIG_VERSION = 1
30CONFIG_CERT_PATH = './config_cert'
31
32BASE_POLICY_VERSION_TEE = 0b001
33BASE_POLICY_VERSION_OH = 0b101
34
35XML2TLV_PARSE_TOOL_INDEX = 1
36#use java parse xml
37XML2TLV_JAR_VALUE = 0 << XML2TLV_PARSE_TOOL_INDEX
38#use python parse xml
39XML2TLV_PY_VALUE = 1 << XML2TLV_PARSE_TOOL_INDEX
40logging.basicConfig(level=logging.INFO)
41
42
43def get_policy_version(config_file):
44
45    if not os.path.exists(config_file):
46        base_policy_ver = BASE_POLICY_VERSION_TEE
47    else:
48        cfg = Configuration(config_file)
49        if Configuration.check_cfg_format(cfg):
50            sys.exit(1)
51        if cfg.policy == "0":
52            base_policy_ver = BASE_POLICY_VERSION_OH
53        else:
54            base_policy_ver = BASE_POLICY_VERSION_TEE
55    if os.path.isfile("./xml2tlv.jar"):
56        policy_ver = base_policy_ver | XML2TLV_JAR_VALUE
57    else:
58        policy_ver = base_policy_ver | XML2TLV_PY_VALUE
59    return policy_ver
60
61
62def integer_check(intput_str):
63    if not str(intput_str).isdigit():
64        return 1
65    return 0
66
67
68def whitelist_check(intput_str):
69    if not re.match(r"^[A-Za-z0-9\/\-_.]+$", intput_str):
70        return 1
71    return 0
72
73
74def run_cmd(command):
75    ret = subprocess.run(command, shell=False, check=True)
76    if ret.returncode != 0:
77        logging.error("run command failed.")
78        sys.exit(1)
79
80
81class LoadConfigHeader:
82    str = struct.Struct('IHHIIIIIIIII')
83
84    def __init__(self, data):
85        unpacked_data = (LoadConfigHeader.str).unpack(data.encode())
86        self.unpacked_data = unpacked_data
87        self.magic_num = unpacked_data[0]
88        self.version = unpacked_data[1]
89        self.policy_version = unpacked_data[2]
90        self.context_len = unpacked_data[3]
91        self.ta_cert_len = unpacked_data[4]
92        self.config_len = unpacked_data[5]
93        self.sign_len = unpacked_data[6]
94        self.cfg_cert_len = unpacked_data[7]
95        self.reserved1 = unpacked_data[8]
96        self.reserved2 = unpacked_data[8]
97        self.reserved3 = unpacked_data[8]
98        self.reserved4 = unpacked_data[8]
99
100    def get_packed_data(self):
101        values = [self.magic_num,
102                  self.version,
103                  self.policy_version,
104                  self.context_len,
105                  self.ta_cert_len,
106                  self.config_len,
107                  self.sign_len,
108                  self.cfg_cert_len,
109                  self.reserved1,
110                  self.reserved2,
111                  self.reserved3,
112                  self.reserved4,
113                 ]
114        return (LoadConfigHeader.str).pack(*values)
115
116
117def pkg_config_header(hdr_len, magic_num, version, policy_version,
118        context_len, ta_cert_len, config_len, sign_len, cfg_cert_len):
119    config_hd_len = hdr_len
120    config_hd = LoadConfigHeader('\0' * config_hd_len)
121    config_hd.magic_num = magic_num
122    config_hd.version = version
123    config_hd.policy_version = policy_version
124    config_hd.context_len = context_len
125    config_hd.ta_cert_len = ta_cert_len
126    config_hd.config_len = config_len
127    config_hd.sign_len = sign_len
128    config_hd.cfg_cert_len = cfg_cert_len
129    return config_hd
130
131
132#----------------------------------------------------------------------------
133# generate hash use SHA256
134#----------------------------------------------------------------------------
135def generate_sha256_hash(in_buf):
136    # Initialize a SHA256 object from the Python hash library
137    obj = hashlib.sha256()
138    # Set the input buffer and return the output digest
139    obj.update(in_buf)
140    return obj.digest()
141
142
143def gen_rsa_signature(sign_conf_alg, config_buf, input_path_gen, output_file):
144    if sign_conf_alg == "RSA_PSS":
145        pri_key = os.path.join(CONFIG_CERT_PATH, "taconfig_key.pem")
146        msg_file = os.path.join(input_path_gen, "temp/config_msg")
147        fd_msg_file = os.open(msg_file, os.O_WRONLY | os.O_CREAT, \
148            stat.S_IWUSR | stat.S_IRUSR)
149        msg_file_fp = os.fdopen(fd_msg_file, "wb")
150        msg_file_fp.write(config_buf)
151        msg_file_fp.close()
152
153        cmd = "openssl dgst -sign {} -sha256 -sigopt rsa_padding_mode:pss \
154                -sigopt rsa_pss_saltlen:-1 \
155                -out {} {}".format(pri_key, output_file, msg_file)
156        try:
157            subprocess.check_output(cmd.split(), shell=False)
158        except Exception:
159            logging.error("sign operation failed")
160            raise RuntimeError
161        logging.info("Sign Config with PSS Success")
162    else:
163        logging.error("Sign Config alg is not support!")
164        exit(0)
165    return
166
167
168def gen_ecdsa_signature(config_buf, input_path_gen, output_file):
169    msg_file = os.path.join(input_path_gen, "temp/config_msg")
170    fd_msg_file = os.open(msg_file, os.O_WRONLY | os.O_CREAT, \
171        stat.S_IWUSR | stat.S_IRUSR)
172    msg_file_fp = os.fdopen(fd_msg_file, "wb")
173    msg_file_fp.write(config_buf)
174    msg_file_fp.close()
175
176    pri_key = os.path.join(CONFIG_CERT_PATH, "taconfig_key.pem")
177    cmd = ["openssl", "dgst", "-sha256", "-sign", pri_key, \
178           "-out", output_file, msg_file]
179    run_cmd(cmd)
180    logging.critical('Sign Config Success')
181    return
182
183
184def gen_config_sign(sign_conf_alg, input_path_gen, header,
185                    config, ta_cert, output_file):
186    temp_file = os.path.join(input_path_gen, "temp/file_to_sign")
187    fd_temp = os.open(temp_file, os.O_WRONLY | os.O_CREAT, \
188        stat.S_IWUSR | stat.S_IRUSR)
189    temp_file_fp = os.fdopen(fd_temp, "wb")
190    temp_file_fp.write(header)
191    temp_file_fp.write(ta_cert)
192    temp_file_fp.write(config)
193    temp_file_fp.close()
194
195    temp_file_len = os.path.getsize(temp_file)
196    with open(temp_file, 'rb') as temp_file_fp:
197        config_buf = temp_file_fp.read(temp_file_len)
198
199    if sign_conf_alg == "ECDSA":
200        gen_ecdsa_signature(config_buf, input_path_gen, output_file)
201    else: # rsa
202        gen_rsa_signature(sign_conf_alg, config_buf,
203                          input_path_gen, output_file)
204    return
205
206
207def convert_xml2tlv(xml_file, tlv_file, input_path, config_file):
208    policy_ver = get_policy_version(config_file)
209    if (policy_ver & (1 << XML2TLV_PARSE_TOOL_INDEX)) == XML2TLV_JAR_VALUE:
210        cmd = ["java", "-jar", "xml2tlv.jar", xml_file, tlv_file]
211        run_cmd(cmd)
212        if os.path.isfile(tlv_file):
213            logging.info('convert xml to tlv success')
214    elif (policy_ver & (1 << XML2TLV_PARSE_TOOL_INDEX)) == XML2TLV_PY_VALUE:
215        csv_dir = os.path.realpath(os.path.join(os.getcwd(), './'))
216        tag_parse_dict_file_path = os.path.join(csv_dir, './tag_parse_dict.csv')
217
218        from dyn_conf_parser import parser_config_xml
219        parser_config_xml(xml_file, tag_parse_dict_file_path, \
220            tlv_file, input_path)
221        if os.path.isfile(tlv_file):
222            logging.info('convert xml to tlv success')
223        else:
224            logging.error('convert xml to tlv failed')
225            raise RuntimeError
226    else:
227        logging.error('invlid policy version')
228        raise RuntimeError
229
230
231def creat_temp_folder(input_path_creat):
232    temp_path = os.path.join(input_path_creat, 'temp')
233    if os.path.exists(temp_path):
234        shutil.rmtree(temp_path)
235
236    cmd = ["mkdir", temp_path]
237    run_cmd(cmd)
238    return
239
240
241def delete_temp_folder(input_path_delete):
242    if os.path.exists(input_path_delete + '/temp'):
243        shutil.rmtree(os.path.join(input_path_delete, "temp"))
244    if os.path.exists(input_path_delete + '/config_tlv'):
245        os.remove(os.path.join(input_path_delete, "config_tlv"))
246    return
247
248
249def check_dyn_perm(xml_config_file, input_path):
250    xml_tree = ET.parse(xml_config_file)
251    xml_root = xml_tree.getroot()
252    drv_perm = None
253    for child in xml_root.findall('drv_perm'):
254        if child != '':
255            drv_perm = child
256            if os.path.exists(os.path.join(input_path, 'temp')):
257                out_save_file = os.path.join(input_path, \
258                    'temp/configs_bak.xml')
259                xml_tree.write(out_save_file, encoding="utf-8")
260            xml_root.remove(child)
261    if drv_perm is not None:
262        newtree = ET.ElementTree(drv_perm)
263        if os.path.exists(os.path.join(input_path, 'temp')):
264            out_file = os.path.join(input_path, 'temp/dyn_perm.xml')
265            newtree.write(out_file, encoding="utf-8")
266        xml_tree.write(xml_config_file)
267        return 1
268    return 0
269
270
271def get_target_type_in_config(config_path, in_path):
272    tree = ET.parse(config_path)
273    drv_target_type = tree.find('./TA_Manifest_Info/target_type')
274    flag = os.O_RDWR | os.O_TRUNC | os.O_CREAT
275    mode = stat.S_IWUSR | stat.S_IRUSR
276    if drv_target_type is not None:
277        if drv_target_type.text == "1":
278            ans = "gpd.ta.dynConf:00000\n"
279            out_tlv = os.path.join(in_path, 'config_tlv')
280            with os.fdopen(os.open(out_tlv, flag, mode), 'w+') as conf:
281                conf.write(ans)
282
283
284class Configuration:
285    sign_alg = "RSA_PKCS1"
286    policy = "1"
287
288    def __init__(self, file_name):
289        parser = configparser.ConfigParser()
290        parser.read(file_name)
291        self.sign_alg = parser.get("signConfigPrivateCfg", "configSignAlg")
292        self.policy = parser.get("signConfigPrivateCfg", "configPolicy")
293
294    def check_cfg_format(self):
295        if whitelist_check(self.sign_alg):
296            logging.error("configSignAlg is invalid")
297            return 1
298        if whitelist_check(self.policy):
299            logging.error("signConfigPrivateCfg is invalid")
300            return 1
301        return 0
302
303def pack_signature(signature_path, signature_size):
304    add_size = 72 - signature_size
305    with open(signature_path, 'rb+') as signature_file:
306        signature_buf = signature_file.read(signature_size)
307        signature_file.seek(0)
308        for _ in range(0, add_size):
309            signature_file.write(b'\x00')
310        signature_file.write(signature_buf)
311
312
313def gen_config_header(sign_conf_alg, config_content_size, ta_cert_size, config_cert_size, config_file):
314    config_hd_len = 44
315    if sign_conf_alg == "ECDSA":
316        config_sign_size = 72 | 0xC0000000
317        final_sign_size = 72
318    else: # rsa
319        config_sign_size = 512
320        final_sign_size = 512
321        if sign_conf_alg == "RSA_PSS":
322            config_sign_size = config_sign_size | 0x80000000
323    config_context_size = config_content_size + ta_cert_size \
324            + config_sign_size + config_cert_size
325    config_header = pkg_config_header(config_hd_len, 0xABCDABCD, \
326            CONFIG_VERSION, get_policy_version(config_file), config_context_size, \
327            ta_cert_size, config_content_size, config_sign_size, \
328            config_cert_size)
329    return config_header, final_sign_size
330
331
332def get_tlv_buffer(input_path, config_file):
333    #convert xml to tlv
334    tlv_dynconf_data = os.path.join(input_path, "config_tlv")
335    xml_config_file = os.path.join(input_path, "configs.xml")
336    tlv_config_file = os.path.join(input_path, "temp/configs_tlv")
337    if check_dyn_perm(xml_config_file, input_path) != 0:
338        from dyn_conf_parser import parser_dyn_conf
339        dyn_conf_xml_file_path = os.path.join(input_path, 'temp/dyn_perm.xml')
340        csv_dir = os.path.realpath(os.path.join(os.getcwd(), './'))
341        tag_parse_dict_file_path = \
342            os.path.join(csv_dir, './tag_parse_dict.csv')
343        parser_dyn_conf(dyn_conf_xml_file_path, "", \
344            tag_parse_dict_file_path, input_path)
345        convert_xml2tlv(xml_config_file, tlv_config_file, \
346            input_path, config_file)
347
348        src_file_path = os.path.join(input_path, 'temp/configs_bak.xml')
349        cmd = ["mv", src_file_path, xml_config_file]
350        run_cmd(cmd)
351    else:
352        convert_xml2tlv(xml_config_file, tlv_config_file, \
353            input_path, config_file)
354        get_target_type_in_config(xml_config_file, input_path)
355
356    if os.path.exists(tlv_dynconf_data):
357        with open(tlv_config_file, 'rb') as tlv_config_fp:
358            tlv_config_buf = \
359                tlv_config_fp.read(os.path.getsize(tlv_config_file))
360        with open(tlv_dynconf_data, 'rb') as tlv_dynconf_fp:
361            tlv_config_buf = tlv_config_buf + \
362                tlv_dynconf_fp.read(os.path.getsize(tlv_dynconf_data)) + b"\n"
363        config_content_size = len(tlv_config_buf)
364    else:
365        config_content_size = os.path.getsize(tlv_config_file)
366        with open(tlv_config_file, 'rb') as tlv_config_fp:
367            tlv_config_buf = tlv_config_fp.read(config_content_size)
368    return tlv_config_buf, config_content_size
369
370
371def gen_config_section(input_path, cert_path, config_section):
372
373    creat_temp_folder(input_path)
374    config_path = input_path
375    config_file = os.path.join(config_path, "config_tee_private_sample.ini")
376    if not os.path.exists(config_file):
377        logging.critical("config_tee_private_sample.ini is not exist.")
378        sign_conf_alg = "RSA_PSS"
379    else:
380        cfg = Configuration(config_file)
381        if Configuration.check_cfg_format(cfg):
382            sys.exit(1)
383        sign_conf_alg = cfg.sign_alg
384
385    tlv_config_buf, config_content_size = get_tlv_buffer(input_path, config_file)
386    ta_cert_size = os.path.getsize(cert_path)
387    with open(cert_path, 'rb') as ta_cert_fp:
388        ta_cert_buf = ta_cert_fp.read(ta_cert_size)
389
390    config_cert_path = os.path.join(CONFIG_CERT_PATH, "taconfig.der")
391    config_cert_size = os.path.getsize(config_cert_path)
392    with open(config_cert_path, 'rb') as config_cert_fp:
393        config_cert_buf = config_cert_fp.read(config_cert_size)
394
395    config_header, final_sign_size = gen_config_header(sign_conf_alg, config_content_size, ta_cert_size, \
396                                    config_cert_size, config_file)
397    output_file = os.path.join(input_path, "temp/config_sign")
398    gen_config_sign(sign_conf_alg, input_path, \
399            config_header.get_packed_data(), \
400            tlv_config_buf, ta_cert_buf, output_file)
401    config_sign_size = os.path.getsize(output_file)
402    if sign_conf_alg == "ECDSA":
403        if config_sign_size != final_sign_size:
404            pack_signature(output_file, config_sign_size)
405
406    with open(output_file, 'rb') as config_sign_fp:
407        config_sign_buf = config_sign_fp.read(final_sign_size)
408
409    fd_config = os.open(config_section, os.O_WRONLY | os.O_CREAT, \
410        stat.S_IWUSR | stat.S_IRUSR)
411    config_section_fp = os.fdopen(fd_config, "wb")
412    #write config header
413    config_section_fp.write(config_header.get_packed_data())
414    #write ta_cert
415    config_section_fp.write(ta_cert_buf)
416    #write config content
417    config_section_fp.write(tlv_config_buf)
418    #write signature
419    config_section_fp.write(config_sign_buf)
420    #write config cert
421    config_section_fp.write(config_cert_buf)
422    config_section_fp.close()
423
424    delete_temp_folder(input_path)
425    return
426
427
428if __name__ == '__main__':
429    argv_data = sys.argv
430    ta_input_path = argv_data[1]
431    ta_cert_path = argv_data[2]
432    ta_config_section = argv_data[3]
433
434    if not os.path.exists(ta_input_path):
435        logging.error("ta_input_path does not exist.")
436        exit()
437    if not os.path.exists(ta_cert_path):
438        logging.error("ta_cert_path does not exist.")
439        exit()
440
441    if whitelist_check(ta_input_path):
442        logging.error("ta_input_path is incorrect.")
443        exit()
444    if whitelist_check(ta_cert_path):
445        logging.error("ta_cert_path is incorrect.")
446        exit()
447    if whitelist_check(ta_config_section):
448        logging.error("ta_config_section is incorrect.")
449        exit()
450    gen_config_section(ta_input_path, ta_cert_path, \
451            ta_config_section)