• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# coding: utf-8
3
4"""
5Copyright (c) 2021-2022 Huawei Device Co., Ltd.
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17
18"""
19
20import os
21import argparse
22import re
23import shutil
24import subprocess
25import sys
26import platform
27from collections import defaultdict
28
29
30def parse_args():
31    parser = argparse.ArgumentParser()
32    parser.add_argument(
33        '--dst-dir', help='the output dest path', required=True)
34    parser.add_argument('--tool-path',
35                        help='the sefcontext_compile bin path', required=True)
36    parser.add_argument('--policy-file',
37                        help='the policy.31 file', required=True)
38    parser.add_argument('--source-root-dir',
39                        help='prj root path', required=True)
40    parser.add_argument('--policy_dir_list',
41                        help='policy dirs need to be included', required=True)
42    parser.add_argument('--components',
43                        help='system or vendor or default', required=True)
44    return parser.parse_args()
45
46
47def run_command(in_cmd):
48    cmdstr = " ".join(in_cmd)
49    ret = subprocess.run(cmdstr, shell=True).returncode
50    if ret != 0:
51        raise Exception(ret)
52
53
54def traverse_folder_in_type(search_dir_list, file_suffix):
55    """
56    for special folder search_dir, find all files endwith file_suffix.
57    :param search_dir: path to search
58    :param file_suffix: postfix of file name
59    :return: file list
60    """
61    flag = 0
62    policy_file_list = []
63
64    for item in search_dir_list:
65        for root, _, files in sorted(os.walk(item)):
66            filtered_files = [f for f in files if f.endswith(file_suffix)]
67            for each_file in filtered_files:
68                file_list_path = os.path.join(root, each_file)
69                flag |= check_contexts_file(file_list_path)
70                policy_file_list.append(file_list_path)
71
72    if flag:
73        raise Exception(flag)
74    policy_file_list.sort()
75    return " ".join(str(x) for x in policy_file_list)
76
77
78def check_contexts_file(contexts_file):
79    """
80    Check the format of contexts file.
81    :param contexts_file: list of te file
82    :return:
83    """
84    err = 0
85    lines = []
86    with open(contexts_file, 'rb') as fp:
87        lines = fp.readlines()
88    if len(lines) == 0:
89        return 0
90    last_line = lines[-1]
91    if b'\n' not in last_line:
92        print("".join((contexts_file, " : need an empty line at end \n")))
93        err = 1
94    for line in lines:
95        if line.endswith(b'\r\n') or line.endswith(b'\r'):
96            print("".join((contexts_file, " : must be unix format\n")))
97            err = 1
98            break
99    return err
100
101
102def combine_contexts_file(file_contexts_list, combined_file_contexts):
103    cat_cmd = ["cat",
104               file_contexts_list,
105               ">", combined_file_contexts + "_tmp"]
106    run_command(cat_cmd)
107
108    grep_cmd = ["grep -v ^#",
109                combined_file_contexts + "_tmp",
110                "| grep -v ^$",
111                ">", combined_file_contexts]
112    run_command(grep_cmd)
113
114
115def check_redefinition(contexts_file):
116    type_hash = defaultdict(list)
117    err = 0
118    with open(contexts_file, 'r') as contexts_read:
119        pattern = re.compile(r'(\S+)\s+u:object_r:\S+:s0')
120        line_index = 0
121        for line in contexts_read:
122            line_ = line.lstrip()
123            line_index += 1
124            if line_.startswith('#') or line_.strip() == '':
125                continue
126            match = pattern.match(line_)
127            if match:
128                type_hash[match.group(1)].append(line_index)
129            else:
130                print(contexts_file + ":" +
131                      str(line_index) + " format check fail")
132                err = 1
133        contexts_read.close()
134    if err:
135        print("***********************************************************")
136        print("please check whether the format meets the following rules:")
137        print("[required format]: * u:object_r:*:s0")
138        print("***********************************************************")
139        raise Exception(err)
140    err = 0
141    for type_key in type_hash.keys():
142        if len(type_hash[type_key]) > 1:
143            err = 1
144            str_seq = (contexts_file, ":")
145            err_msg = "".join(str_seq)
146            for linenum in type_hash[type_key]:
147                str_seq = (err_msg, str(linenum), ":")
148                err_msg = "".join(str_seq)
149            str_seq = (err_msg, "'type ", str(type_key), " is redefinition'")
150            err_msg = "".join(str_seq)
151            print(err_msg)
152    if err:
153        raise Exception(err)
154
155
156def check_common_contexts(args, contexts_file):
157    """
158    check whether context used in contexts_file is defined in policy.31.
159    :param args:
160    :param contexts_file: path of contexts file
161    :return:
162    """
163    check_redefinition(contexts_file)
164
165    check_cmd = [os.path.join(args.tool_path, "sefcontext_compile"),
166                 "-o", contexts_file + ".bin",
167                 "-p", args.policy_file,
168                 contexts_file]
169    run_command(check_cmd)
170    if os.path.exists(contexts_file + ".bin"):
171        os.unlink(contexts_file + ".bin")
172
173
174def echo_error():
175    print("***********************************************************")
176    print("please check whether the format meets the following rules:")
177    print("[required format]: apl=* [name=*] [debuggable=*] [extension=*] [extra=*] domain=* type=*")
178    print("apl=*, apl should be one of system_core|system_basic|normal")
179    print("name=*, name is 'optional'")
180    print("debuggable=*, debuggable is 'optional'")
181    print("extension=*, extension is 'optional'")
182    print("extra=*, extra is 'optional'")
183    print("domain=*, hapdomain selinux type")
184    print("type=*, hapdatafile selinux type")
185    print("***********************************************************")
186
187
188def sehap_check_line(line, line_index, contexts_write, domain, contexts_file):
189    line_ = line.lstrip()
190    if line_.startswith('#') or line_.strip() == '':
191        contexts_write.write(line)
192        return
193
194    pattern = re.compile(
195        r'apl=(system_core|system_basic|normal)\s+'
196        r'((name|debuggable)=\S+\s+)?'
197        r'(extra=\S+\s+)?'
198        r'(extension=\S+\s+)?'
199        r'domain=(\S+)\s+'
200        r'type=(\S+)\s*'
201        r'.*\n',
202        re.DOTALL
203    )
204    match = pattern.match(line_)
205    if match:
206        if domain:
207            line = match.group(1) + " u:r:" + match.group(6) + ":s0\n"
208        else:
209            line = match.group(1) + " u:object_r:" + match.group(7) + ":s0\n"
210        contexts_write.write(line)
211    else:
212        print(contexts_file + ":" + str(line_index) + " format check fail")
213        raise Exception(1)
214
215
216def check_sehap_contexts(args, contexts_file, domain):
217    """
218    check domain or type defined in sehap_contexts.
219    :param args:
220    :param contexts_file: path of contexts file
221    :param domain: true for domain, false for type
222    :return:
223    """
224    shutil.copyfile(contexts_file, contexts_file + "_bk")
225    try:
226        with open(contexts_file + "_bk", 'r') as contexts_read, open(contexts_file, 'w') as contexts_write:
227            line_index = 0
228            for line in contexts_read:
229                line_index += 1
230                sehap_check_line(line, line_index, contexts_write, domain, contexts_file)
231    except Exception as e:
232        shutil.move(contexts_file + "_bk", contexts_file)
233        echo_error()
234        raise e
235
236    check_cmd = [os.path.join(args.tool_path, "sefcontext_compile"),
237                 "-o", contexts_file + ".bin",
238                 "-p", args.policy_file,
239                 contexts_file]
240    ret = subprocess.run(" ".join(check_cmd), shell=True).returncode
241
242    if ret != 0:
243        shutil.move(contexts_file + "_bk", contexts_file)
244        raise Exception(ret)
245
246    shutil.move(contexts_file + "_bk", contexts_file)
247
248    if os.path.exists(contexts_file + ".bin"):
249        os.unlink(contexts_file + ".bin")
250
251
252def build_file_contexts(args, output_path, policy_path, all_policy_path):
253    if args.components == "default":
254        all_combined_file_contexts = os.path.join(output_path, "file_contexts")
255    else:
256        all_combined_file_contexts = os.path.join(output_path, "all_file_contexts")
257        file_contexts_list = traverse_folder_in_type(policy_path, "file_contexts")
258        combined_file_contexts = os.path.join(output_path, "file_contexts")
259        combine_contexts_file(file_contexts_list, combined_file_contexts)
260
261    all_file_contexts_list = traverse_folder_in_type(
262        all_policy_path, "file_contexts")
263    combine_contexts_file(all_file_contexts_list, all_combined_file_contexts)
264
265    check_redefinition(all_combined_file_contexts)
266
267    build_bin_cmd = [os.path.join(args.tool_path, "sefcontext_compile"),
268                     "-o", os.path.join(args.dst_dir, "file_contexts.bin"),
269                     "-p", args.policy_file,
270                     all_combined_file_contexts]
271    run_command(build_bin_cmd)
272
273
274def build_common_contexts(args, output_path, contexts_file_name, policy_path, all_policy_path):
275    if args.components == "default":
276        all_combined_contexts = output_path + contexts_file_name
277    else:
278        all_combined_contexts = output_path + "all_" + contexts_file_name
279        contexts_list = traverse_folder_in_type(policy_path, contexts_file_name)
280        combined_contexts = output_path + contexts_file_name
281        combine_contexts_file(contexts_list, combined_contexts)
282
283    all_contexts_list = traverse_folder_in_type(all_policy_path, contexts_file_name)
284    combine_contexts_file(all_contexts_list, all_combined_contexts)
285    check_common_contexts(args, all_combined_contexts)
286
287
288def build_sehap_contexts(args, output_path, policy_path):
289    contexts_list = traverse_folder_in_type(
290        policy_path, "sehap_contexts")
291
292    combined_contexts = os.path.join(output_path, "sehap_contexts")
293    combine_contexts_file(contexts_list, combined_contexts)
294
295    check_sehap_contexts(args, combined_contexts, 1)
296    check_sehap_contexts(args, combined_contexts, 0)
297
298
299def prepare_build_path(dir_list, root_dir, build_dir_list):
300    build_contexts_list = \
301        ["base/security/selinux_adapter/sepolicy/base", "base/security/selinux_adapter/sepolicy/ohos_policy"]
302    build_contexts_list += dir_list.split(":")
303
304    for i in build_contexts_list:
305        if i == "" or i == "default":
306            continue
307        path = os.path.join(root_dir, i)
308        if (os.path.exists(path)):
309            build_dir_list.append(path)
310        else:
311            print("following path not exists!! {}".format(path))
312            exit(-1)
313
314
315def traverse_folder_in_dir_name(search_dir, folder_suffix):
316    folder_list = []
317    for root, dirs, _ in sorted(os.walk(search_dir)):
318        for dir_i in dirs:
319            if dir_i == folder_suffix:
320                folder_list.append(os.path.join(root, dir_i))
321    return folder_list
322
323
324def main(args):
325    if sys.platform == "linux" and platform.machine().lower() == "aarch64":
326        libpcre2_path = os.path.realpath("./clang_arm64/thirdparty/pcre2/")
327    else:
328        libpcre2_path = os.path.realpath("./clang_x64/thirdparty/pcre2/")
329    os.environ['LD_LIBRARY_PATH'] = libpcre2_path
330    output_path = args.dst_dir
331    policy_path = []
332    prepare_build_path(args.policy_dir_list, args.source_root_dir, policy_path)
333
334    public_policy = []
335    system_policy = []
336    vendor_policy = []
337
338    for item in policy_path:
339        public_policy += traverse_folder_in_dir_name(item, "public")
340        system_policy += traverse_folder_in_dir_name(item, "system")
341        vendor_policy += traverse_folder_in_dir_name(item, "vendor")
342
343    system_folder_list = public_policy + system_policy
344    vendor_folder_list = public_policy + vendor_policy
345    all_folder_list = public_policy + system_policy + vendor_policy
346
347    folder_list = []
348
349    if args.components == "system":
350        folder_list = system_folder_list
351    elif args.components == "vendor":
352        folder_list = vendor_folder_list
353    else:
354        folder_list = all_folder_list
355
356    build_file_contexts(args, output_path, folder_list, all_folder_list)
357    build_common_contexts(args, output_path, "service_contexts", folder_list, all_folder_list)
358    build_common_contexts(args, output_path, "hdf_service_contexts", folder_list, all_folder_list)
359    build_common_contexts(args, output_path, "parameter_contexts", folder_list, all_folder_list)
360    build_sehap_contexts(args, output_path, all_folder_list)
361
362
363if __name__ == "__main__":
364    input_args = parse_args()
365    main(input_args)
366