• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# coding: utf-8
3
4"""
5Copyright (c) 2023 Huawei Device Co., Ltd.
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17
18"""
19
20import os
21import re
22import shutil
23import subprocess
24import tempfile
25from concurrent.futures import ThreadPoolExecutor, as_completed
26import copy
27
28SYSTEM_CIL_HASH = "system.cil.sha256"
29PREBUILD_SEPOLICY_SYSTEM_CIL_HASH = "prebuild_sepolicy.system.cil.sha256"
30
31# list of all macros and te for sepolicy build
32SEPOLICY_TYPE_LIST = ["security_classes",
33                      "initial_sids",
34                      "access_vectors",
35                      "glb_perm_def.spt",
36                      "glb_never_def.spt",
37                      "mls",
38                      "policy_cap",
39                      "glb_te_def.spt",
40                      "attributes",
41                      ".te",
42                      "glb_roles.spt",
43                      "users",
44                      "initial_sid_contexts",
45                      "fs_use",
46                      "virtfs_contexts",
47                      ]
48
49POLICY_TYPE_LIST = ["allow", "auditallow", "dontaudit",
50                    "allowx", "auditallowx", "dontauditx",
51                    "neverallow", "neverallowx", ]
52
53
54class PolicyDirList(object):
55    def __init__(self, min_policy_dir_list, system_policy_dir_list, vendor_policy_dir_list, public_policy_dir_list):
56        self.min_policy_dir_list = min_policy_dir_list
57        self.system_policy_dir_list = system_policy_dir_list
58        self.vendor_policy_dir_list = vendor_policy_dir_list
59        self.public_policy_dir_list = public_policy_dir_list
60
61
62class PolicyFileList(object):
63    def __init__(self, min_policy_file_list, system_policy_file_list, vendor_policy_file_list, public_policy_file_list):
64        self.min_policy_file_list = min_policy_file_list
65        self.system_policy_file_list = system_policy_file_list
66        self.vendor_policy_file_list = vendor_policy_file_list
67        self.public_policy_file_list = public_policy_file_list
68
69
70def traverse_folder_in_dir_name(search_dir, folder_suffix):
71    folder_list = []
72    for root, dirs, _ in sorted(os.walk(search_dir)):
73        for dir_i in dirs:
74            if dir_i == folder_suffix:
75                folder_list.append(os.path.join(root, dir_i))
76    return folder_list
77
78
79def traverse_folder_in_type(search_dir, file_suffix, build_root):
80    policy_file_list = []
81    flag = 0
82    for root, _, files in sorted(os.walk(search_dir)):
83        for each_file in files:
84            if each_file.endswith(file_suffix):
85                path = os.path.join(root, each_file)
86                rel_path = os.path.relpath(path, build_root)
87                flag |= check_empty_row(rel_path)
88                policy_file_list.append(rel_path)
89    policy_file_list.sort()
90    return policy_file_list, flag
91
92
93def traverse_file_in_each_type(folder_list, sepolicy_type_list, build_root):
94    policy_files_list = []
95    err = 0
96    for policy_type in sepolicy_type_list:
97        for folder in folder_list:
98            type_file_list, flag = traverse_folder_in_type(
99                folder, policy_type, build_root)
100            err |= flag
101            if len(type_file_list) == 0:
102                continue
103            policy_files_list.extend(type_file_list)
104    if err:
105        raise Exception(err)
106    return policy_files_list
107
108
109def check_empty_row(policy_file):
110    """
111    Check whether the last line of te file is empty.
112    :param policy_file: te file
113    :return:
114    """
115    err = 0
116    with open(policy_file, 'r') as fp:
117        lines = fp.readlines()
118        if len(lines) == 0:
119            return 0
120        last_line = lines[-1]
121        if '\n' not in last_line:
122            print("".join([policy_file, " : need an empty line at end\n"]))
123            err = 1
124    return err
125
126
127def run_command(in_cmd):
128    cmdstr = " ".join(in_cmd)
129    ret = subprocess.run(cmdstr, shell=True).returncode
130    if ret != 0:
131        raise Exception(ret)
132
133
134def build_conf(args, output_conf, file_list, with_developer=False):
135    m4_args = ["-D", "build_with_debug=" + args.debug_version]
136    m4_args += ["-D", "build_with_updater=" + args.updater_version]
137    if with_developer:
138        m4_args += ["-D", "build_with_developer=enable"]
139    build_conf_cmd = ["m4", "-s", "--fatal-warnings"] + m4_args + file_list
140    with open(output_conf, 'w') as fd:
141        ret = subprocess.run(build_conf_cmd, shell=False, stdout=fd).returncode
142        if ret != 0:
143            raise Exception(ret)
144
145
146def build_cil(args, output_cil, input_conf):
147    check_policy_cmd = [os.path.join(args.tool_path, "checkpolicy"),
148                        input_conf,
149                        "-M -C -c 31",
150                        "-o " + output_cil]
151    run_command(check_policy_cmd)
152
153
154def add_version(version, string):
155    return "".join([string, "_", version])
156
157
158def simplify_string(string):
159    return string.replace('(', '').replace(')', '').replace('\n', '')
160
161
162def deal_with_roletype(version, cil_write, elem_list, type_set, file, line):
163    if len(elem_list) < 3:
164        print('Error: invalid roletype in %s:%d' % (file, line))
165        raise Exception(1)
166
167    sub_string = simplify_string(elem_list[2])
168    if sub_string in type_set:
169        cil_write.write('(typeattribute ' + add_version(version, sub_string) + ')\n')
170        elem_list[2] = elem_list[2].replace(
171            sub_string, add_version(version, sub_string))
172    cil_write.write(" ".join(elem_list))
173
174
175def deal_with_typeattribute(version, cil_write, elem_list, type_set, file, line):
176    if len(elem_list) < 2:
177        print('Error: invalid typeattribute in %s:%d' % (file, line))
178        raise Exception(1)
179
180    sub_string = simplify_string(elem_list[1])
181    if sub_string.startswith("base_typeattr_"):
182        elem_list[1] = elem_list[1].replace(
183            sub_string, add_version(version, sub_string))
184    cil_write.write(" ".join(elem_list))
185
186
187def deal_with_typeattributeset(version, cil_write, elem_list, type_set, file, line):
188    if len(elem_list) < 2:
189        print('Error: invalid typeattributeset in %s:%d' % (file, line))
190        raise Exception(1)
191
192    for index, elem in enumerate(elem_list[1:]):
193        sub_string = simplify_string(elem)
194        if sub_string.startswith("base_typeattr_") or sub_string in type_set:
195            elem_list[index + 1] = elem.replace(sub_string, add_version(version, sub_string))
196    cil_write.write(" ".join(elem_list))
197
198
199def deal_with_policy(version, cil_write, elem_list, type_set, file, line):
200    if len(elem_list) < 4:
201        print('Error: invalid policy in %s:%d' % (file, line))
202        raise Exception(1)
203
204    for index, elem in enumerate(elem_list[1:3]):
205        sub_string = simplify_string(elem)
206        if sub_string.startswith("base_typeattr_") or sub_string in type_set:
207            elem_list[index + 1] = elem.replace(sub_string, add_version(version, sub_string))
208    cil_write.write(" ".join(elem_list))
209
210
211def deal_with_type(version, cil_write, elem_list, file, line):
212    if len(elem_list) < 2:
213        print('Error: invalid type in %s:%d' % (file, line))
214        raise Exception(1)
215
216    sub_string = simplify_string(elem_list[1])
217    cil_write.write(" ".join(['(typeattributeset', add_version(version, sub_string), '(', sub_string, '))\n']))
218    cil_write.write(" ".join(['(expandtypeattribute', '(', add_version(version, sub_string), ') true)\n']))
219    cil_write.write(" ".join(['(typeattribute', add_version(version, sub_string), ')\n']))
220
221
222def build_version_cil(version, cil_file_input, cil_file_output, type_set):
223    index = 0
224    with open(cil_file_input, 'r') as cil_read, open(cil_file_output, 'w') as cil_write:
225        for line in cil_read:
226            index += 1
227            if not line.startswith('('):
228                continue
229
230            elem_list = line.split(' ')
231            if not elem_list:
232                continue
233
234            if elem_list[0] == '(type':
235                cil_write.write(line)
236            elif elem_list[0] == '(roletype':
237                deal_with_roletype(version, cil_write, elem_list, type_set, cil_file_input, line)
238            elif elem_list[0] == '(typeattribute':
239                deal_with_typeattribute(version, cil_write, elem_list, type_set, cil_file_input, line)
240            elif elem_list[0] == '(typeattributeset':
241                deal_with_typeattributeset(version, cil_write, elem_list, type_set, cil_file_input, line)
242            elif simplify_string(elem_list[0]) in POLICY_TYPE_LIST:
243                deal_with_policy(version, cil_write, elem_list, type_set, cil_file_input, line)
244            else:
245                cil_write.write(line)
246
247
248def build_type_version_cil(version, cil_file_input, cil_file_output):
249    index = 0
250    with open(cil_file_input, 'r') as cil_read, open(cil_file_output, 'w') as cil_write:
251        for line in cil_read:
252            index += 1
253            if not line.startswith('('):
254                continue
255
256            elem_list = line.split(' ')
257            if not elem_list:
258                continue
259
260            if elem_list[0] == '(type':
261                deal_with_type(version, cil_write, elem_list, line, index)
262
263
264def get_type_set(cil_file_input):
265    pattern_type = re.compile(r'^\(type (.*)\)$')
266    pattern_typeattribute = re.compile(r'^\(type_attribute (base_typeattr_[0-9]+)\)$')
267    type_set = set()
268    with open(cil_file_input, 'r') as cil_read:
269        for line in cil_read:
270            match_type = pattern_type.match(line)
271            match_typeattribute = pattern_typeattribute.match(line)
272            if match_type:
273                type_set.add(match_type.group(1))
274            elif match_typeattribute:
275                type_set.add(match_typeattribute.group(1))
276    return type_set
277
278
279def add_base_typeattr_prefix(cil_file, prefix):
280    with open(cil_file, 'r') as cil_read:
281        data = cil_read.read()
282        data = data.replace('base_typeattr_', '_'.join([prefix, 'base_typeattr_']))
283        with open(cil_file, 'w') as cil_write:
284            cil_write.write(data)
285
286
287def build_binary_policy(tool_path, output_policy, check_neverallow, cil_list):
288    build_policy_cmd = [os.path.join(tool_path, "secilc"),
289                        " ".join(cil_list),
290                        "-m -M true -G -c 31",
291                        "-f /dev/null",
292                        "-o " + output_policy]
293    if not check_neverallow:
294        build_policy_cmd.append("-N")
295    run_command(build_policy_cmd)
296
297
298def prepare_build_path(dir_list, root_dir, build_dir_list, sepolicy_path):
299    build_policy_list = [os.path.join(sepolicy_path, "base"), os.path.join(sepolicy_path, "ohos_policy")]
300    build_policy_list += dir_list.split(":")
301
302    for i in build_policy_list:
303        if i == "" or i == "default":
304            continue
305        path = os.path.join(root_dir, i)
306        if (os.path.exists(path)):
307            build_dir_list.append(path)
308        else:
309            print("following path not exists!! {}".format(path))
310            exit(-1)
311
312
313def get_policy_dir_list(args):
314    sepolicy_path = os.path.join(args.source_root_dir, "base/security/selinux_adapter/sepolicy/")
315    dir_list = []
316    prepare_build_path(args.policy_dir_list, args.source_root_dir, dir_list, sepolicy_path)
317    min_policy_dir_list = [os.path.join(sepolicy_path, "min")]
318    system_policy = []
319    public_policy = []
320    vendor_policy = []
321
322    for item in dir_list:
323        public_policy += traverse_folder_in_dir_name(item, "public")
324        system_policy += traverse_folder_in_dir_name(item, "system")
325        vendor_policy += traverse_folder_in_dir_name(item, "vendor")
326
327    # list of all policy folders
328    system_policy_dir_list = public_policy + system_policy
329    vendor_policy_dir_list = public_policy + vendor_policy + min_policy_dir_list
330    public_policy_dir_list = public_policy + min_policy_dir_list
331
332    # add temp dirs base/te folders
333    system_policy_dir_list.append(os.path.join(sepolicy_path, "base/te"))
334    vendor_policy_dir_list.append(os.path.join(sepolicy_path, "base/te"))
335    public_policy_dir_list.append(os.path.join(sepolicy_path, "base/te"))
336
337    return PolicyDirList(min_policy_dir_list, system_policy_dir_list, vendor_policy_dir_list, public_policy_dir_list)
338
339
340def get_policy_file_list(args, dir_list_object):
341    build_root = os.path.abspath(os.path.join(args.tool_path, "../../.."))
342    # list of all policy files
343    system_policy_file_list = traverse_file_in_each_type(
344        dir_list_object.system_policy_dir_list, SEPOLICY_TYPE_LIST, build_root)
345    vendor_policy_file_list = traverse_file_in_each_type(
346        dir_list_object.vendor_policy_dir_list, SEPOLICY_TYPE_LIST, build_root)
347    public_policy_file_list = traverse_file_in_each_type(
348        dir_list_object.public_policy_dir_list, SEPOLICY_TYPE_LIST, build_root)
349    min_policy_file_list = traverse_file_in_each_type(
350        dir_list_object.min_policy_dir_list, SEPOLICY_TYPE_LIST, build_root)
351
352    return PolicyFileList(min_policy_file_list, system_policy_file_list, vendor_policy_file_list,
353                          public_policy_file_list)
354
355
356def filter_out(pattern_file, input_file):
357    with open(pattern_file, 'r', encoding='utf-8') as pat_file:
358        patterns = set(pat_file)
359        tmp_output = tempfile.NamedTemporaryFile()
360        with open(input_file, 'r', encoding='utf-8') as in_file:
361            tmp_output.writelines(line.encode(encoding='utf-8') for line in in_file.readlines()
362                                if line not in patterns)
363            tmp_output.write("\n".encode(encoding='utf-8'))
364            tmp_output.flush()
365        shutil.copyfile(tmp_output.name, input_file)
366
367
368def generate_hash_file(input_file_list, output_file):
369    build_policy_cmd = ["cat",
370                        " ".join(input_file_list),
371                        "| sha256sum",
372                        "| cut -d' ' -f1",
373                        ">",
374                        output_file]
375    run_command(build_policy_cmd)
376
377
378def generate_version_file(args, output_file):
379    cmd = ["echo", args.vendor_policy_version,
380           ">", output_file]
381    run_command(cmd)
382
383
384def generate_default_policy(args, policy, cil_list, with_developer=False):
385    if with_developer:
386        output_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "developer/")
387        if not os.path.exists(output_path):
388            os.makedirs(output_path)
389    else:
390        output_path = os.path.abspath(os.path.dirname(args.dst_file))
391    system_output_conf = os.path.join(output_path, "system.conf")
392    vendor_output_conf = os.path.join(output_path, "vendor.conf")
393    min_output_conf = os.path.join(output_path, "min.conf")
394
395    system_cil_path = os.path.join(output_path, "system.cil")
396    vendor_cil_path = os.path.join(output_path, "vendor.cil")
397    min_cil_path = os.path.join(output_path, "min.cil")
398
399    # build system.conf
400    build_conf(args, system_output_conf, policy.system_policy_file_list, with_developer)
401    # build system.cil
402    build_cil(args, system_cil_path, system_output_conf)
403
404    # build vendor.conf
405    build_conf(args, vendor_output_conf, policy.vendor_policy_file_list, with_developer)
406    # build vendor.cil
407    build_cil(args, vendor_cil_path, vendor_output_conf)
408    add_base_typeattr_prefix(vendor_cil_path, 'vendor')
409
410    # build min.conf
411    build_conf(args, min_output_conf, policy.min_policy_file_list)
412    # build min.cil
413    build_cil(args, min_cil_path, min_output_conf)
414
415    filter_out(min_cil_path, vendor_cil_path)
416
417    cil_list += [vendor_cil_path, system_cil_path]
418
419
420def generate_special_policy(args, policy, cil_list, with_developer=False):
421    if with_developer:
422        output_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "developer/")
423        if not os.path.exists(output_path):
424            os.makedirs(output_path)
425    else:
426        output_path = os.path.abspath(os.path.dirname(args.dst_file))
427
428    compatible_dir = os.path.join(output_path, "compatible/")
429    if not os.path.exists(compatible_dir):
430        os.makedirs(compatible_dir)
431
432    system_output_conf = os.path.join(output_path, "system.conf")
433    vendor_output_conf = os.path.join(output_path, "vendor.conf")
434    public_output_conf = os.path.join(output_path, "public.conf")
435    min_output_conf = os.path.join(output_path, "min.conf")
436
437    vendor_origin_cil_path = os.path.join(output_path, "vendor_origin.cil")
438    public_origin_cil_path = os.path.join(output_path, "public_origin.cil")
439    min_cil_path = os.path.join(output_path, "min.cil")
440
441    # output file
442    system_cil_path = os.path.join(output_path, "system.cil")
443    vendor_cil_path = os.path.join(output_path, "vendor.cil")
444    public_version_cil_path = os.path.join(output_path, "public.cil")
445    type_version_cil_path = os.path.join(output_path, "".join(["compatible/", args.vendor_policy_version, ".cil"]))
446
447    # build system.conf
448    build_conf(args, system_output_conf, policy.system_policy_file_list, with_developer)
449    # build system.cil
450    build_cil(args, system_cil_path, system_output_conf)
451
452    # build min.cil
453    build_conf(args, min_output_conf, policy.min_policy_file_list)
454    build_cil(args, min_cil_path, min_output_conf)
455
456    # build public.cil
457    build_conf(args, public_output_conf, policy.public_policy_file_list, with_developer)
458    build_cil(args, public_origin_cil_path, public_output_conf)
459    type_set = get_type_set(public_origin_cil_path)
460    filter_out(min_cil_path, public_origin_cil_path)
461    build_version_cil(args.vendor_policy_version, public_origin_cil_path, public_version_cil_path, type_set)
462
463    # build vendor.cil
464    build_conf(args, vendor_output_conf, policy.vendor_policy_file_list, with_developer)
465    build_cil(args, vendor_origin_cil_path, vendor_output_conf)
466    filter_out(min_cil_path, vendor_origin_cil_path)
467    build_version_cil(args.vendor_policy_version, vendor_origin_cil_path, vendor_cil_path, type_set)
468    filter_out(public_version_cil_path, vendor_cil_path)
469
470    build_type_version_cil(args.vendor_policy_version, public_origin_cil_path, type_version_cil_path)
471
472    if args.components == "system":
473        generate_hash_file([system_cil_path, type_version_cil_path],
474                           os.path.join(output_path, SYSTEM_CIL_HASH))
475
476    elif args.components == "vendor":
477        generate_hash_file([system_cil_path, type_version_cil_path], os.path.join(
478            output_path, PREBUILD_SEPOLICY_SYSTEM_CIL_HASH))
479
480    version_file = os.path.join(output_path, "version")
481    generate_version_file(args, version_file)
482
483    cil_list += [vendor_cil_path, system_cil_path, type_version_cil_path, public_version_cil_path]
484
485
486def compile_sepolicy(args):
487    dir_list_object = get_policy_dir_list(args)
488    file_list_object = get_policy_file_list(args, dir_list_object)
489    cil_list = []
490    developer_cil_list = []
491
492    if args.updater_version == "enable":
493        generate_default_policy(args, file_list_object, cil_list, False)
494        build_binary_policy(args.tool_path, args.dst_file, True, cil_list)
495        return
496
497    with ThreadPoolExecutor(max_workers=2) as executor:
498        if args.components == "default":
499            normal_thread = executor.submit(generate_default_policy, args, file_list_object, cil_list, False)
500            developer_thread = executor.submit(generate_default_policy, args, file_list_object,
501                                                developer_cil_list, True)
502        else:
503            normal_thread = executor.submit(generate_special_policy, args, file_list_object, cil_list, False)
504            developer_thread = executor.submit(generate_special_policy, args, file_list_object,
505                                                developer_cil_list, True)
506
507        for future in as_completed([normal_thread, developer_thread]):
508            try:
509                future.result()
510            except Exception as e:
511                raise e
512
513    with ThreadPoolExecutor(max_workers=2) as executor:
514        build_normal_thread = executor.submit(build_binary_policy, args.tool_path, args.dst_file, True, cil_list)
515        developer_policy_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "developer/policy.31")
516        build_developer_thread = executor.submit(build_binary_policy, args.tool_path, developer_policy_path,
517                                                    True, developer_cil_list)
518
519        for future in as_completed([build_normal_thread, build_developer_thread]):
520            try:
521                future.result()
522            except Exception as e:
523                raise e
524
525
526def copy_user_policy(args):
527    if args.updater_version == "enable":
528        return
529
530    output_path = os.path.abspath(os.path.dirname(args.dst_file))
531    if args.debug_version == "enable":
532        src_dir = os.path.join(output_path, "pre_check/")
533    else:
534        src_dir = output_path
535
536    src_policy_path = os.path.join(src_dir, "developer/policy.31")
537    dest_policy_path = os.path.join(output_path, "developer/developer_policy")
538    shutil.copyfile(src_policy_path, dest_policy_path)
539
540    src_policy_path = os.path.join(src_dir, "policy.31")
541    dest_policy_path = os.path.join(output_path, "user_policy")
542    shutil.copyfile(src_policy_path, dest_policy_path)
543
544
545def pre_check(args):
546    output_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "pre_check/")
547    if not os.path.exists(output_path):
548        os.makedirs(output_path)
549    args.dst_file = os.path.join(output_path, "policy.31")
550
551    if args.debug_version == "enable":
552        args.debug_version = "disable"
553    else:
554        args.debug_version = "enable"
555
556    compile_sepolicy(args)
557
558
559def main(args):
560    with ThreadPoolExecutor(max_workers=2) as executor:
561        pre_check_args = copy.deepcopy(args)
562        pre_check_thread = executor.submit(pre_check, pre_check_args)
563
564        compile_args = copy.deepcopy(args)
565        compile_thread = executor.submit(compile_sepolicy, compile_args)
566
567        for future in as_completed([pre_check_thread, compile_thread]):
568            try:
569                future.result()
570            except Exception as e:
571                raise e
572
573    copy_user_policy(args)
574