• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# coding: utf-8
3
4"""
5Copyright (c) 2021-2022 Huawei Device Co., Ltd.
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17
18"""
19
20import os
21import argparse
22import re
23import shutil
24import subprocess
25from collections import defaultdict
26
27
28def parse_args():
29    parser = argparse.ArgumentParser()
30    parser.add_argument(
31        '--dst-dir', help='the output dest path', required=True)
32    parser.add_argument('--tool-path',
33                        help='the sefcontext_compile bin path', required=True)
34    parser.add_argument('--policy-file',
35                        help='the policy.31 file', required=True)
36    parser.add_argument('--source-root-dir',
37                        help='prj root path', required=True)
38    parser.add_argument('--policy_dir_list',
39                        help='policy dirs need to be included', required=True)
40    parser.add_argument('--components',
41                        help='system or vendor or default', required=True)
42    return parser.parse_args()
43
44
45def run_command(in_cmd):
46    cmdstr = " ".join(in_cmd)
47    ret = subprocess.run(cmdstr, shell=True).returncode
48    if ret != 0:
49        raise Exception(ret)
50
51
52def traverse_folder_in_type(search_dir_list, file_suffix):
53    """
54    for special folder search_dir, find all files endwith file_suffix.
55    :param search_dir: path to search
56    :param file_suffix: postfix of file name
57    :return: file list
58    """
59    flag = 0
60    policy_file_list = []
61
62    for item in search_dir_list:
63        for root, _, files in sorted(os.walk(item)):
64            filtered_files = [f for f in files if f.endswith(file_suffix)]
65            for each_file in filtered_files:
66                file_list_path = os.path.join(root, each_file)
67                flag |= check_contexts_file(file_list_path)
68                policy_file_list.append(file_list_path)
69
70    if flag:
71        raise Exception(flag)
72    policy_file_list.sort()
73    return " ".join(str(x) for x in policy_file_list)
74
75
76def check_contexts_file(contexts_file):
77    """
78    Check the format of contexts file.
79    :param contexts_file: list of te file
80    :return:
81    """
82    err = 0
83    lines = []
84    with open(contexts_file, 'rb') as fp:
85        lines = fp.readlines()
86    if len(lines) == 0:
87        return 0
88    last_line = lines[-1]
89    if b'\n' not in last_line:
90        print("".join((contexts_file, " : need an empty line at end \n")))
91        err = 1
92    for line in lines:
93        if line.endswith(b'\r\n') or line.endswith(b'\r'):
94            print("".join((contexts_file, " : must be unix format\n")))
95            err = 1
96            break
97    return err
98
99
100def combine_contexts_file(file_contexts_list, combined_file_contexts):
101    cat_cmd = ["cat",
102               file_contexts_list,
103               ">", combined_file_contexts + "_tmp"]
104    run_command(cat_cmd)
105
106    grep_cmd = ["grep -v ^#",
107                combined_file_contexts + "_tmp",
108                "| grep -v ^$",
109                ">", combined_file_contexts]
110    run_command(grep_cmd)
111
112
113def check_redefinition(contexts_file):
114    type_hash = defaultdict(list)
115    err = 0
116    with open(contexts_file, 'r') as contexts_read:
117        pattern = re.compile(r'(\S+)\s+u:object_r:\S+:s0')
118        line_index = 0
119        for line in contexts_read:
120            line_ = line.lstrip()
121            line_index += 1
122            if line_.startswith('#') or line_.strip() == '':
123                continue
124            match = pattern.match(line_)
125            if match:
126                type_hash[match.group(1)].append(line_index)
127            else:
128                print(contexts_file + ":" +
129                      str(line_index) + " format check fail")
130                err = 1
131        contexts_read.close()
132    if err:
133        print("***********************************************************")
134        print("please check whether the format meets the following rules:")
135        print("[required format]: * u:object_r:*:s0")
136        print("***********************************************************")
137        raise Exception(err)
138    err = 0
139    for type_key in type_hash.keys():
140        if len(type_hash[type_key]) > 1:
141            err = 1
142            str_seq = (contexts_file, ":")
143            err_msg = "".join(str_seq)
144            for linenum in type_hash[type_key]:
145                str_seq = (err_msg, str(linenum), ":")
146                err_msg = "".join(str_seq)
147            str_seq = (err_msg, "'type ", str(type_key), " is redefinition'")
148            err_msg = "".join(str_seq)
149            print(err_msg)
150    if err:
151        raise Exception(err)
152
153
154def check_common_contexts(args, contexts_file):
155    """
156    check whether context used in contexts_file is defined in policy.31.
157    :param args:
158    :param contexts_file: path of contexts file
159    :return:
160    """
161    check_redefinition(contexts_file)
162
163    check_cmd = [os.path.join(args.tool_path, "sefcontext_compile"),
164                 "-o", contexts_file + ".bin",
165                 "-p", args.policy_file,
166                 contexts_file]
167    run_command(check_cmd)
168    if os.path.exists(contexts_file + ".bin"):
169        os.unlink(contexts_file + ".bin")
170
171
172def echo_error():
173    print("***********************************************************")
174    print("please check whether the format meets the following rules:")
175    print("[required format]: apl=* name=* domain=* type=*")
176    print("apl=*, apl should be one of system_core|system_basic|normal")
177    print("name=*, name is 'optional'")
178    print("domain=*, hapdomain selinux type")
179    print("type=*, hapdatafile selinux type")
180    print("***********************************************************")
181
182
183def sehap_process_line(line, line_index, contexts_write, domain, contexts_file):
184    line_ = line.lstrip()
185    if line_.startswith('#') or line_.strip() == '':
186        contexts_write.write(line)
187        return
188
189    pattern = re.compile(
190        r'apl=(system_core|system_basic|normal)\s+((name|debuggable)=\S+\s+)?domain=(\S+)\s+type=(\S+)\s*\n')
191    match = pattern.match(line_)
192    if match:
193        if domain:
194            line = match.group(1) + " u:r:" + match.group(4) + ":s0\n"
195        else:
196            line = match.group(1) + " u:object_r:" + match.group(5) + ":s0\n"
197        contexts_write.write(line)
198    else:
199        print(contexts_file + ":" + str(line_index) + " format check fail")
200        raise Exception(1)
201
202def check_sehap_contexts(args, contexts_file, domain):
203    """
204    check domain or type defined in sehap_contexts.
205    :param args:
206    :param contexts_file: path of contexts file
207    :param domain: true for domain, false for type
208    :return:
209    """
210    shutil.copyfile(contexts_file, contexts_file + "_bk")
211    try:
212        with open(contexts_file + "_bk", 'r') as contexts_read, open(contexts_file, 'w') as contexts_write:
213            line_index = 0
214            for line in contexts_read:
215                line_index += 1
216                sehap_process_line(line, line_index, contexts_write, domain, contexts_file)
217    except Exception as e:
218        shutil.move(contexts_file + "_bk", contexts_file)
219        echo_error()
220        raise e
221
222    check_cmd = [os.path.join(args.tool_path, "sefcontext_compile"),
223                 "-o", contexts_file + ".bin",
224                 "-p", args.policy_file,
225                 contexts_file]
226    ret = subprocess.run(" ".join(check_cmd), shell=True).returncode
227
228    if ret != 0:
229        shutil.move(contexts_file + "_bk", contexts_file)
230        raise Exception(ret)
231
232    shutil.move(contexts_file + "_bk", contexts_file)
233
234    if os.path.exists(contexts_file + ".bin"):
235        os.unlink(contexts_file + ".bin")
236
237def build_file_contexts(args, output_path, policy_path, all_policy_path):
238    if args.components == "default":
239        all_combined_file_contexts = os.path.join(output_path, "file_contexts")
240    else:
241        all_combined_file_contexts = os.path.join(output_path, "all_file_contexts")
242        file_contexts_list = traverse_folder_in_type(policy_path, "file_contexts")
243        combined_file_contexts = os.path.join(output_path, "file_contexts")
244        combine_contexts_file(file_contexts_list, combined_file_contexts)
245
246    all_file_contexts_list = traverse_folder_in_type(
247        all_policy_path, "file_contexts")
248    combine_contexts_file(all_file_contexts_list, all_combined_file_contexts)
249
250    check_redefinition(all_combined_file_contexts)
251
252    build_bin_cmd = [os.path.join(args.tool_path, "sefcontext_compile"),
253                     "-o", os.path.join(args.dst_dir, "file_contexts.bin"),
254                     "-p", args.policy_file,
255                     all_combined_file_contexts]
256    run_command(build_bin_cmd)
257
258
259def build_common_contexts(args, output_path, contexts_file_name, policy_path, all_policy_path):
260    if args.components == "default":
261        all_combined_contexts = output_path + contexts_file_name
262    else:
263        all_combined_contexts = output_path + "all_" + contexts_file_name
264        contexts_list = traverse_folder_in_type(policy_path, contexts_file_name)
265        combined_contexts = output_path + contexts_file_name
266        combine_contexts_file(contexts_list, combined_contexts)
267
268    all_contexts_list = traverse_folder_in_type(all_policy_path, contexts_file_name)
269    combine_contexts_file(all_contexts_list, all_combined_contexts)
270    check_common_contexts(args, all_combined_contexts)
271
272
273def build_sehap_contexts(args, output_path, policy_path):
274    contexts_list = traverse_folder_in_type(
275        policy_path, "sehap_contexts")
276
277    combined_contexts = os.path.join(output_path, "sehap_contexts")
278    combine_contexts_file(contexts_list, combined_contexts)
279
280    check_sehap_contexts(args, combined_contexts, 1)
281    check_sehap_contexts(args, combined_contexts, 0)
282
283
284def prepare_build_path(dir_list, root_dir, build_dir_list):
285    build_contexts_list = \
286        ["base/security/selinux_adapter/sepolicy/base", "base/security/selinux_adapter/sepolicy/ohos_policy"]
287    build_contexts_list += dir_list.split(":")
288
289    for i in build_contexts_list:
290        if i == "" or i == "default":
291            continue
292        path = os.path.join(root_dir, i)
293        if (os.path.exists(path)):
294            build_dir_list.append(path)
295        else:
296            print("following path not exists!! {}".format(path))
297            exit(-1)
298
299
300def traverse_folder_in_dir_name(search_dir, folder_suffix):
301    folder_list = []
302    for root, dirs, _ in sorted(os.walk(search_dir)):
303        for dir_i in dirs:
304            if dir_i == folder_suffix:
305                folder_list.append(os.path.join(root, dir_i))
306    return folder_list
307
308
309def main(args):
310    libpcre2_path = os.path.realpath("./clang_x64/thirdparty/pcre2/")
311    os.environ['LD_LIBRARY_PATH'] = libpcre2_path
312    output_path = args.dst_dir
313    policy_path = []
314    prepare_build_path(args.policy_dir_list, args.source_root_dir, policy_path)
315
316    public_policy = []
317    system_policy = []
318    vendor_policy = []
319
320    for item in policy_path:
321        public_policy += traverse_folder_in_dir_name(item, "public")
322        system_policy += traverse_folder_in_dir_name(item, "system")
323        vendor_policy += traverse_folder_in_dir_name(item, "vendor")
324
325    system_folder_list = public_policy + system_policy
326    vendor_folder_list = public_policy + vendor_policy
327    all_folder_list = public_policy + system_policy + vendor_policy
328
329    folder_list = []
330
331    if args.components == "system":
332        folder_list = system_folder_list
333    elif args.components == "vendor":
334        folder_list = vendor_folder_list
335    else:
336        folder_list = all_folder_list
337
338    build_file_contexts(args, output_path, folder_list, all_folder_list)
339    build_common_contexts(args, output_path, "service_contexts", folder_list, all_folder_list)
340    build_common_contexts(args, output_path, "hdf_service_contexts", folder_list, all_folder_list)
341    build_common_contexts(args, output_path, "parameter_contexts", folder_list, all_folder_list)
342    build_sehap_contexts(args, output_path, all_folder_list)
343
344
345if __name__ == "__main__":
346    input_args = parse_args()
347    main(input_args)
348