• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# coding: utf-8
3
4"""
5Copyright (c) 2021-2022 Huawei Device Co., Ltd.
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17
18"""
19
20import os
21import argparse
22import re
23import shutil
24import subprocess
25from collections import defaultdict
26
27
28def parse_args():
29    parser = argparse.ArgumentParser()
30    parser.add_argument(
31        '--dst-dir', help='the output dest path', required=True)
32    parser.add_argument('--tool-path',
33                        help='the sefcontext_compile bin path', required=True)
34    parser.add_argument('--policy-file',
35                        help='the policy.31 file', required=True)
36    parser.add_argument('--source-root-dir',
37                        help='prj root path', required=True)
38    parser.add_argument('--policy_dir_list',
39                        help='policy dirs need to be included', required=True)
40    parser.add_argument('--components',
41                        help='system or vendor or default', required=True)
42    return parser.parse_args()
43
44
45def run_command(in_cmd):
46    cmdstr = " ".join(in_cmd)
47    ret = subprocess.run(cmdstr, shell=True).returncode
48    if ret != 0:
49        raise Exception(ret)
50
51
52def traverse_folder_in_type(search_dir_list, file_suffix):
53    """
54    for special folder search_dir, find all files endwith file_suffix.
55    :param search_dir: path to search
56    :param file_suffix: postfix of file name
57    :return: file list
58    """
59    flag = 0
60    policy_file_list = []
61    for item in search_dir_list:
62        for root, _, files in os.walk(item):
63            for each_file in files:
64                file_name = os.path.basename(each_file)
65                if file_name == file_suffix:
66                    file_list_path = os.path.join(root, each_file)
67                    flag |= check_contexts_file(file_list_path)
68                    policy_file_list.append(file_list_path)
69    if flag:
70        raise Exception(flag)
71    policy_file_list.sort()
72    return " ".join(str(x) for x in policy_file_list)
73
74
75def check_contexts_file(contexts_file):
76    """
77    Check the format of contexts file.
78    :param contexts_file: list of te file
79    :return:
80    """
81    err = 0
82    lines = []
83    with open(contexts_file, 'rb') as fp:
84        lines = fp.readlines()
85    if len(lines) == 0:
86        return 0
87    last_line = lines[-1]
88    if b'\n' not in last_line:
89        print("".join((contexts_file, " : need an empty line at end \n")))
90        err = 1
91    for line in lines:
92        if line.endswith(b'\r\n') or line.endswith(b'\r'):
93            print("".join((contexts_file, " : must be unix format\n")))
94            err = 1
95            break
96    return err
97
98
99def combine_contexts_file(file_contexts_list, combined_file_contexts):
100    cat_cmd = ["cat",
101               file_contexts_list,
102               ">", combined_file_contexts + "_tmp"]
103    run_command(cat_cmd)
104
105    grep_cmd = ["grep -v ^#",
106                combined_file_contexts + "_tmp",
107                "| grep -v ^$",
108                ">", combined_file_contexts]
109    run_command(grep_cmd)
110
111
112def check_redefinition(contexts_file):
113    type_hash = defaultdict(list)
114    err = 0
115    with open(contexts_file, 'r') as contexts_read:
116        pattern = re.compile(r'(\S+)\s+u:object_r:\S+:s0')
117        line_index = 0
118        for line in contexts_read:
119            line_ = line.lstrip()
120            line_index += 1
121            if line_.startswith('#') or line_.strip() == '':
122                continue
123            match = pattern.match(line_)
124            if match:
125                type_hash[match.group(1)].append(line_index)
126            else:
127                print(contexts_file + ":" +
128                      str(line_index) + " format check fail")
129                err = 1
130        contexts_read.close()
131    if err:
132        print("***********************************************************")
133        print("please check whether the format meets the following rules:")
134        print("[required format]: * u:object_r:*:s0")
135        print("***********************************************************")
136        raise Exception(err)
137    err = 0
138    for type_key in type_hash.keys():
139        if len(type_hash[type_key]) > 1:
140            err = 1
141            str_seq = (contexts_file, ":")
142            err_msg = "".join(str_seq)
143            for linenum in type_hash[type_key]:
144                str_seq = (err_msg, str(linenum), ":")
145                err_msg = "".join(str_seq)
146            str_seq = (err_msg, "'type ", str(type_key), " is redefinition'")
147            err_msg = "".join(str_seq)
148            print(err_msg)
149    if err:
150        raise Exception(err)
151
152
153def check_common_contexts(args, contexts_file):
154    """
155    check whether context used in contexts_file is defined in policy.31.
156    :param args:
157    :param contexts_file: path of contexts file
158    :return:
159    """
160    check_redefinition(contexts_file)
161
162    check_cmd = [os.path.join(args.tool_path, "sefcontext_compile"),
163                 "-o", contexts_file + ".bin",
164                 "-p", args.policy_file,
165                 contexts_file]
166    run_command(check_cmd)
167    if os.path.exists(contexts_file + ".bin"):
168        os.unlink(contexts_file + ".bin")
169
170
171def echo_error():
172    print("***********************************************************")
173    print("please check whether the format meets the following rules:")
174    print("[required format]: apl=* name=* domain=* type=*")
175    print("apl=*, apl should be one of system_core|system_basic|normal")
176    print("name=*, name is 'optional'")
177    print("domain=*, hapdomain selinux type")
178    print("type=*, hapdatafile selinux type")
179    print("***********************************************************")
180
181
182def check_sehap_contexts(args, contexts_file, domain):
183    """
184    check domain or type defined in sehap_contexts.
185    :param args:
186    :param contexts_file: path of contexts file
187    :param domain: true for domain, false for type
188    :return:
189    """
190    shutil.copyfile(contexts_file, contexts_file + "_bk")
191    err = 0
192    with open(contexts_file + "_bk", 'r') as contexts_read, open(contexts_file, 'w') as contexts_write:
193        pattern = re.compile(
194            r'apl=(system_core|system_basic|normal)\s+((name|debuggable)=\S+\s+)?domain=(\S+)\s+type=(\S+)\s*\n')
195        line_index = 0
196        for line in contexts_read:
197            line_ = line.lstrip()
198            line_index += 1
199            if line_.startswith('#') or line_.strip() == '':
200                contexts_write.write(line)
201                continue
202            match = pattern.match(line_)
203            if match:
204                if domain:
205                    line = match.group(1) + " u:r:" + match.group(4) + ":s0\n"
206                else:
207                    line = match.group(1) + " u:object_r:" + \
208                        match.group(5) + ":s0\n"
209                contexts_write.write(line)
210            else:
211                print(contexts_file + ":" +
212                      str(line_index) + " format check fail")
213                err = 1
214    if err:
215        shutil.move(contexts_file + "_bk", contexts_file)
216        echo_error()
217        raise Exception(err)
218    check_cmd = [os.path.join(args.tool_path, "sefcontext_compile"),
219                 "-o", contexts_file + ".bin",
220                 "-p", args.policy_file,
221                 contexts_file]
222    ret = subprocess.run(" ".join(check_cmd), shell=True).returncode
223    if ret != 0:
224        shutil.move(contexts_file + "_bk", contexts_file)
225        raise Exception(ret)
226    shutil.move(contexts_file + "_bk", contexts_file)
227    if os.path.exists(contexts_file + ".bin"):
228        os.unlink(contexts_file + ".bin")
229
230
231def build_file_contexts(args, output_path, policy_path, all_policy_path):
232    if args.components == "default":
233        all_combined_file_contexts = os.path.join(output_path, "file_contexts")
234    else:
235        all_combined_file_contexts = os.path.join(output_path, "all_file_contexts")
236        file_contexts_list = traverse_folder_in_type(policy_path, "file_contexts")
237        combined_file_contexts = os.path.join(output_path, "file_contexts")
238        combine_contexts_file(file_contexts_list, combined_file_contexts)
239
240    all_file_contexts_list = traverse_folder_in_type(
241        all_policy_path, "file_contexts")
242    combine_contexts_file(all_file_contexts_list, all_combined_file_contexts)
243
244    check_redefinition(all_combined_file_contexts)
245
246    build_bin_cmd = [os.path.join(args.tool_path, "sefcontext_compile"),
247                     "-o", os.path.join(args.dst_dir, "file_contexts.bin"),
248                     "-p", args.policy_file,
249                     all_combined_file_contexts]
250    run_command(build_bin_cmd)
251
252
253def build_common_contexts(args, output_path, contexts_file_name, policy_path, all_policy_path):
254    if args.components == "default":
255        all_combined_contexts = output_path + contexts_file_name
256    else:
257        all_combined_contexts = output_path + "all_" + contexts_file_name
258        contexts_list = traverse_folder_in_type(policy_path, contexts_file_name)
259        combined_contexts = output_path + contexts_file_name
260        combine_contexts_file(contexts_list, combined_contexts)
261
262    all_contexts_list = traverse_folder_in_type(all_policy_path, contexts_file_name)
263    combine_contexts_file(all_contexts_list, all_combined_contexts)
264    check_common_contexts(args, all_combined_contexts)
265
266
267def build_sehap_contexts(args, output_path, policy_path):
268    contexts_list = traverse_folder_in_type(
269        policy_path, "sehap_contexts")
270
271    combined_contexts = os.path.join(output_path, "sehap_contexts")
272    combine_contexts_file(contexts_list, combined_contexts)
273
274    check_sehap_contexts(args, combined_contexts, 1)
275    check_sehap_contexts(args, combined_contexts, 0)
276
277
278def prepare_build_path(dir_list, root_dir, build_dir_list):
279    build_contexts_list = ["base/security/selinux_adapter/sepolicy/base", "base/security/selinux_adapter/sepolicy/ohos_policy"]
280    build_contexts_list += dir_list.split(":")
281
282    for i in build_contexts_list:
283        if i == "" or i == "default":
284            continue
285        path = os.path.join(root_dir, i)
286        if (os.path.exists(path)):
287            build_dir_list.append(path)
288        else:
289            print("following path not exists!! {}".format(path))
290            exit(-1)
291
292
293def traverse_folder_in_dir_name(search_dir, folder_suffix):
294    folder_list = []
295    for root, dirs, _ in os.walk(search_dir):
296        for dir_i in dirs:
297            if dir_i == folder_suffix:
298                folder_list.append(os.path.join(root, dir_i))
299    return folder_list
300
301
302def main(args):
303    libpcre2_path = os.path.realpath("./clang_x64/thirdparty/pcre2/")
304    os.environ['LD_LIBRARY_PATH'] = libpcre2_path
305    output_path = args.dst_dir
306    policy_path = []
307    prepare_build_path(args.policy_dir_list, args.source_root_dir, policy_path)
308
309    public_policy = []
310    system_policy = []
311    vendor_policy = []
312
313    for item in policy_path:
314        public_policy += traverse_folder_in_dir_name(item, "public")
315        system_policy += traverse_folder_in_dir_name(item, "system")
316        vendor_policy += traverse_folder_in_dir_name(item, "vendor")
317
318    system_folder_list = public_policy + system_policy
319    vendor_folder_list = public_policy + vendor_policy
320    all_folder_list = public_policy + system_policy + vendor_policy
321
322    folder_list = []
323
324    if args.components == "system":
325        folder_list = system_folder_list
326    elif args.components == "vendor":
327        folder_list = vendor_folder_list
328    else:
329        folder_list = all_folder_list
330
331    build_file_contexts(args, output_path, folder_list, all_folder_list)
332    build_common_contexts(args, output_path, "service_contexts", folder_list, all_folder_list)
333    build_common_contexts(args, output_path, "hdf_service_contexts", folder_list, all_folder_list)
334    build_common_contexts(args, output_path, "parameter_contexts", folder_list, all_folder_list)
335    build_sehap_contexts(args, output_path, all_folder_list)
336
337
338if __name__ == "__main__":
339    input_args = parse_args()
340    main(input_args)
341