• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# coding: utf-8
3
4"""
5Copyright (c) 2023 Huawei Device Co., Ltd.
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17
18"""
19
20import os
21import re
22import shutil
23import subprocess
24import tempfile
25from concurrent.futures import ThreadPoolExecutor, as_completed
26import copy
27
28SYSTEM_CIL_HASH = "system.cil.sha256"
29PREBUILD_SEPOLICY_SYSTEM_CIL_HASH = "prebuild_sepolicy.system.cil.sha256"
30
31# list of all macros and te for sepolicy build
32SEPOLICY_TYPE_LIST = ["security_classes",
33                      "initial_sids",
34                      "access_vectors",
35                      "glb_perm_def.spt",
36                      "glb_never_def.spt",
37                      "mls",
38                      "policy_cap",
39                      "glb_te_def.spt",
40                      "attributes",
41                      ".te",
42                      "glb_roles.spt",
43                      "users",
44                      "initial_sid_contexts",
45                      "fs_use",
46                      "virtfs_contexts",
47                      ]
48
49POLICY_TYPE_LIST = ["allow", "auditallow", "dontaudit",
50                    "allowx", "auditallowx", "dontauditx",
51                    "neverallow", "neverallowx", ]
52
53
54class PolicyDirList(object):
55    def __init__(self, min_policy_dir_list, system_policy_dir_list, vendor_policy_dir_list, public_policy_dir_list):
56        self.min_policy_dir_list = min_policy_dir_list
57        self.system_policy_dir_list = system_policy_dir_list
58        self.vendor_policy_dir_list = vendor_policy_dir_list
59        self.public_policy_dir_list = public_policy_dir_list
60
61
62class PolicyFileList(object):
63    def __init__(self, min_policy_file_list, system_policy_file_list, vendor_policy_file_list, public_policy_file_list):
64        self.min_policy_file_list = min_policy_file_list
65        self.system_policy_file_list = system_policy_file_list
66        self.vendor_policy_file_list = vendor_policy_file_list
67        self.public_policy_file_list = public_policy_file_list
68
69
70def traverse_folder_in_dir_name(search_dir, folder_suffix):
71    folder_list = []
72    for root, dirs, _ in sorted(os.walk(search_dir)):
73        for dir_i in dirs:
74            if dir_i == folder_suffix:
75                folder_list.append(os.path.join(root, dir_i))
76    return folder_list
77
78
79def traverse_folder_in_type(search_dir, file_suffix, build_root):
80    policy_file_list = []
81    flag = 0
82    for root, _, files in sorted(os.walk(search_dir)):
83        for each_file in files:
84            if each_file.endswith(file_suffix):
85                path = os.path.join(root, each_file)
86                rel_path = os.path.relpath(path, build_root)
87                flag |= check_empty_row(rel_path)
88                policy_file_list.append(rel_path)
89    policy_file_list.sort()
90    return policy_file_list, flag
91
92
93def traverse_file_in_each_type(folder_list, sepolicy_type_list, build_root):
94    policy_files_list = []
95    err = 0
96    for policy_type in sepolicy_type_list:
97        for folder in folder_list:
98            type_file_list, flag = traverse_folder_in_type(
99                folder, policy_type, build_root)
100            err |= flag
101            if len(type_file_list) == 0:
102                continue
103            policy_files_list.extend(type_file_list)
104    if err:
105        raise Exception(err)
106    return policy_files_list
107
108
109def check_empty_row(policy_file):
110    """
111    Check whether the last line of te file is empty.
112    :param policy_file: te file
113    :return:
114    """
115    err = 0
116    with open(policy_file, 'r') as fp:
117        lines = fp.readlines()
118        if len(lines) == 0:
119            return 0
120        last_line = lines[-1]
121        if '\n' not in last_line:
122            print("".join([policy_file, " : need an empty line at end\n"]))
123            err = 1
124    return err
125
126
127def run_command(in_cmd):
128    cmdstr = " ".join(in_cmd)
129    ret = subprocess.run(cmdstr, shell=True).returncode
130    if ret != 0:
131        raise Exception(ret)
132
133
134def build_conf(args, output_conf, file_list, with_developer=False):
135    m4_args = ["-D", "build_with_debug=" + args.debug_version]
136    m4_args += ["-D", "build_with_updater=" + args.updater_version]
137    if with_developer:
138        m4_args += ["-D", "build_with_developer=enable"]
139    if getattr(args, "product_args", None):
140        for product_arg in args.product_args:
141            m4_args += ["-D", product_arg]
142    build_conf_cmd = ["m4", "-s", "--fatal-warnings"] + m4_args + file_list
143    with open(output_conf, 'w') as fd:
144        ret = subprocess.run(build_conf_cmd, shell=False, stdout=fd).returncode
145        if ret != 0:
146            raise Exception(ret)
147
148
149def build_cil(args, output_cil, input_conf):
150    check_policy_cmd = [os.path.join(args.tool_path, "checkpolicy"),
151                        input_conf,
152                        "-M -C -c 31",
153                        "-o " + output_cil]
154    run_command(check_policy_cmd)
155
156
157def add_version(version, string):
158    return "".join([string, "_", version])
159
160
161def simplify_string(string):
162    return string.replace('(', '').replace(')', '').replace('\n', '')
163
164
165def deal_with_roletype(version, cil_write, elem_list, type_set, file, line):
166    if len(elem_list) < 3:
167        print('Error: invalid roletype in %s:%d' % (file, line))
168        raise Exception(1)
169
170    sub_string = simplify_string(elem_list[2])
171    if sub_string in type_set:
172        cil_write.write('(typeattribute ' + add_version(version, sub_string) + ')\n')
173        elem_list[2] = elem_list[2].replace(
174            sub_string, add_version(version, sub_string))
175    cil_write.write(" ".join(elem_list))
176
177
178def deal_with_typeattribute(version, cil_write, elem_list, type_set, file, line):
179    if len(elem_list) < 2:
180        print('Error: invalid typeattribute in %s:%d' % (file, line))
181        raise Exception(1)
182
183    sub_string = simplify_string(elem_list[1])
184    if sub_string.startswith("base_typeattr_"):
185        elem_list[1] = elem_list[1].replace(
186            sub_string, add_version(version, sub_string))
187    cil_write.write(" ".join(elem_list))
188
189
190def deal_with_typeattributeset(version, cil_write, elem_list, type_set, file, line):
191    if len(elem_list) < 2:
192        print('Error: invalid typeattributeset in %s:%d' % (file, line))
193        raise Exception(1)
194
195    for index, elem in enumerate(elem_list[1:]):
196        sub_string = simplify_string(elem)
197        if sub_string.startswith("base_typeattr_") or sub_string in type_set:
198            elem_list[index + 1] = elem.replace(sub_string, add_version(version, sub_string))
199    cil_write.write(" ".join(elem_list))
200
201
202def deal_with_policy(version, cil_write, elem_list, type_set, file, line):
203    if len(elem_list) < 4:
204        print('Error: invalid policy in %s:%d' % (file, line))
205        raise Exception(1)
206
207    for index, elem in enumerate(elem_list[1:3]):
208        sub_string = simplify_string(elem)
209        if sub_string.startswith("base_typeattr_") or sub_string in type_set:
210            elem_list[index + 1] = elem.replace(sub_string, add_version(version, sub_string))
211    cil_write.write(" ".join(elem_list))
212
213
214def deal_with_type(version, cil_write, elem_list, file, line):
215    if len(elem_list) < 2:
216        print('Error: invalid type in %s:%d' % (file, line))
217        raise Exception(1)
218
219    sub_string = simplify_string(elem_list[1])
220    cil_write.write(" ".join(['(typeattributeset', add_version(version, sub_string), '(', sub_string, '))\n']))
221    cil_write.write(" ".join(['(expandtypeattribute', '(', add_version(version, sub_string), ') true)\n']))
222    cil_write.write(" ".join(['(typeattribute', add_version(version, sub_string), ')\n']))
223
224
225def build_version_cil(version, cil_file_input, cil_file_output, type_set):
226    index = 0
227    with open(cil_file_input, 'r') as cil_read, open(cil_file_output, 'w') as cil_write:
228        for line in cil_read:
229            index += 1
230            if not line.startswith('('):
231                continue
232
233            elem_list = line.split(' ')
234            if not elem_list:
235                continue
236
237            if elem_list[0] == '(type':
238                cil_write.write(line)
239            elif elem_list[0] == '(roletype':
240                deal_with_roletype(version, cil_write, elem_list, type_set, cil_file_input, line)
241            elif elem_list[0] == '(typeattribute':
242                deal_with_typeattribute(version, cil_write, elem_list, type_set, cil_file_input, line)
243            elif elem_list[0] == '(typeattributeset':
244                deal_with_typeattributeset(version, cil_write, elem_list, type_set, cil_file_input, line)
245            elif simplify_string(elem_list[0]) in POLICY_TYPE_LIST:
246                deal_with_policy(version, cil_write, elem_list, type_set, cil_file_input, line)
247            else:
248                cil_write.write(line)
249
250
251def build_type_version_cil(version, cil_file_input, cil_file_output):
252    index = 0
253    with open(cil_file_input, 'r') as cil_read, open(cil_file_output, 'w') as cil_write:
254        for line in cil_read:
255            index += 1
256            if not line.startswith('('):
257                continue
258
259            elem_list = line.split(' ')
260            if not elem_list:
261                continue
262
263            if elem_list[0] == '(type':
264                deal_with_type(version, cil_write, elem_list, line, index)
265
266
267def get_type_set(cil_file_input):
268    pattern_type = re.compile(r'^\(type (.*)\)$')
269    pattern_typeattribute = re.compile(r'^\(type_attribute (base_typeattr_[0-9]+)\)$')
270    type_set = set()
271    with open(cil_file_input, 'r') as cil_read:
272        for line in cil_read:
273            match_type = pattern_type.match(line)
274            match_typeattribute = pattern_typeattribute.match(line)
275            if match_type:
276                type_set.add(match_type.group(1))
277            elif match_typeattribute:
278                type_set.add(match_typeattribute.group(1))
279    return type_set
280
281
282def add_base_typeattr_prefix(cil_file, prefix):
283    with open(cil_file, 'r') as cil_read:
284        data = cil_read.read()
285        data = data.replace('base_typeattr_', '_'.join([prefix, 'base_typeattr_']))
286        with open(cil_file, 'w') as cil_write:
287            cil_write.write(data)
288
289
290def build_binary_policy(tool_path, output_policy, check_neverallow, cil_list):
291    build_policy_cmd = [os.path.join(tool_path, "secilc"),
292                        " ".join(cil_list),
293                        "-m -M true -G -c 31 -O",
294                        "-f /dev/null",
295                        "-o " + output_policy]
296    if not check_neverallow:
297        build_policy_cmd.append("-N")
298    run_command(build_policy_cmd)
299
300
301def prepare_build_path(dir_list, root_dir, build_dir_list, sepolicy_path):
302    build_policy_list = [os.path.join(sepolicy_path, "base"), os.path.join(sepolicy_path, "ohos_policy")]
303    build_policy_list += dir_list.split(":")
304
305    for i in build_policy_list:
306        if i == "" or i == "default":
307            continue
308        path = os.path.join(root_dir, i)
309        if (os.path.exists(path)):
310            build_dir_list.append(path)
311        else:
312            print("following path not exists!! {}".format(path))
313            exit(-1)
314
315
316def get_policy_dir_list(args):
317    sepolicy_path = os.path.join(args.source_root_dir, "base/security/selinux_adapter/sepolicy/")
318    dir_list = []
319    prepare_build_path(args.policy_dir_list, args.source_root_dir, dir_list, sepolicy_path)
320    min_policy_dir_list = [os.path.join(sepolicy_path, "min")]
321    system_policy = []
322    public_policy = []
323    vendor_policy = []
324
325    for item in dir_list:
326        public_policy += traverse_folder_in_dir_name(item, "public")
327        system_policy += traverse_folder_in_dir_name(item, "system")
328        vendor_policy += traverse_folder_in_dir_name(item, "vendor")
329
330    # list of all policy folders
331    system_policy_dir_list = public_policy + system_policy
332    vendor_policy_dir_list = public_policy + vendor_policy + min_policy_dir_list
333    public_policy_dir_list = public_policy + min_policy_dir_list
334
335    # add temp dirs base/te folders
336    system_policy_dir_list.append(os.path.join(sepolicy_path, "base/te"))
337    vendor_policy_dir_list.append(os.path.join(sepolicy_path, "base/te"))
338    public_policy_dir_list.append(os.path.join(sepolicy_path, "base/te"))
339
340    return PolicyDirList(min_policy_dir_list, system_policy_dir_list, vendor_policy_dir_list, public_policy_dir_list)
341
342
343def get_policy_file_list(args, dir_list_object):
344    build_root = os.getcwd()
345    # list of all policy files
346    system_policy_file_list = traverse_file_in_each_type(
347        dir_list_object.system_policy_dir_list, SEPOLICY_TYPE_LIST, build_root)
348    vendor_policy_file_list = traverse_file_in_each_type(
349        dir_list_object.vendor_policy_dir_list, SEPOLICY_TYPE_LIST, build_root)
350    public_policy_file_list = traverse_file_in_each_type(
351        dir_list_object.public_policy_dir_list, SEPOLICY_TYPE_LIST, build_root)
352    min_policy_file_list = traverse_file_in_each_type(
353        dir_list_object.min_policy_dir_list, SEPOLICY_TYPE_LIST, build_root)
354
355    return PolicyFileList(min_policy_file_list, system_policy_file_list, vendor_policy_file_list,
356                          public_policy_file_list)
357
358
359def filter_out(pattern_file, input_file):
360    with open(pattern_file, 'r', encoding='utf-8') as pat_file:
361        patterns = set(pat_file)
362        tmp_output = tempfile.NamedTemporaryFile()
363        with open(input_file, 'r', encoding='utf-8') as in_file:
364            tmp_output.writelines(line.encode(encoding='utf-8') for line in in_file.readlines()
365                                if line not in patterns)
366            tmp_output.write("\n".encode(encoding='utf-8'))
367            tmp_output.flush()
368        shutil.copyfile(tmp_output.name, input_file)
369
370
371def generate_hash_file(input_file_list, output_file):
372    build_policy_cmd = ["cat",
373                        " ".join(input_file_list),
374                        "| sha256sum",
375                        "| cut -d' ' -f1",
376                        ">",
377                        output_file]
378    run_command(build_policy_cmd)
379
380
381def generate_version_file(args, output_file):
382    cmd = ["echo", args.vendor_policy_version,
383           ">", output_file]
384    run_command(cmd)
385
386
387def generate_default_policy(args, policy, cil_list, with_developer=False):
388    if with_developer:
389        output_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "developer/")
390        if not os.path.exists(output_path):
391            os.makedirs(output_path)
392    else:
393        output_path = os.path.abspath(os.path.dirname(args.dst_file))
394    system_output_conf = os.path.join(output_path, "system.conf")
395    vendor_output_conf = os.path.join(output_path, "vendor.conf")
396    min_output_conf = os.path.join(output_path, "min.conf")
397
398    system_cil_path = os.path.join(output_path, "system.cil")
399    vendor_cil_path = os.path.join(output_path, "vendor.cil")
400    min_cil_path = os.path.join(output_path, "min.cil")
401
402    # build system.conf
403    build_conf(args, system_output_conf, policy.system_policy_file_list, with_developer)
404    # build system.cil
405    build_cil(args, system_cil_path, system_output_conf)
406
407    # build vendor.conf
408    build_conf(args, vendor_output_conf, policy.vendor_policy_file_list, with_developer)
409    # build vendor.cil
410    build_cil(args, vendor_cil_path, vendor_output_conf)
411    add_base_typeattr_prefix(vendor_cil_path, 'vendor')
412
413    # build min.conf
414    build_conf(args, min_output_conf, policy.min_policy_file_list)
415    # build min.cil
416    build_cil(args, min_cil_path, min_output_conf)
417
418    filter_out(min_cil_path, vendor_cil_path)
419
420    cil_list += [vendor_cil_path, system_cil_path]
421
422
423def generate_special_policy(args, policy, cil_list, with_developer=False):
424    if with_developer:
425        output_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "developer/")
426        if not os.path.exists(output_path):
427            os.makedirs(output_path)
428    else:
429        output_path = os.path.abspath(os.path.dirname(args.dst_file))
430
431    compatible_dir = os.path.join(output_path, "compatible/")
432    if not os.path.exists(compatible_dir):
433        os.makedirs(compatible_dir)
434
435    system_output_conf = os.path.join(output_path, "system.conf")
436    vendor_output_conf = os.path.join(output_path, "vendor.conf")
437    public_output_conf = os.path.join(output_path, "public.conf")
438    min_output_conf = os.path.join(output_path, "min.conf")
439
440    vendor_origin_cil_path = os.path.join(output_path, "vendor_origin.cil")
441    public_origin_cil_path = os.path.join(output_path, "public_origin.cil")
442    min_cil_path = os.path.join(output_path, "min.cil")
443
444    # output file
445    system_cil_path = os.path.join(output_path, "system.cil")
446    vendor_cil_path = os.path.join(output_path, "vendor.cil")
447    public_version_cil_path = os.path.join(output_path, "public.cil")
448    type_version_cil_path = os.path.join(output_path, "".join(["compatible/", args.vendor_policy_version, ".cil"]))
449
450    # build system.conf
451    build_conf(args, system_output_conf, policy.system_policy_file_list, with_developer)
452    # build system.cil
453    build_cil(args, system_cil_path, system_output_conf)
454
455    # build min.cil
456    build_conf(args, min_output_conf, policy.min_policy_file_list)
457    build_cil(args, min_cil_path, min_output_conf)
458
459    # build public.cil
460    build_conf(args, public_output_conf, policy.public_policy_file_list, with_developer)
461    build_cil(args, public_origin_cil_path, public_output_conf)
462    type_set = get_type_set(public_origin_cil_path)
463    filter_out(min_cil_path, public_origin_cil_path)
464    build_version_cil(args.vendor_policy_version, public_origin_cil_path, public_version_cil_path, type_set)
465
466    # build vendor.cil
467    build_conf(args, vendor_output_conf, policy.vendor_policy_file_list, with_developer)
468    build_cil(args, vendor_origin_cil_path, vendor_output_conf)
469    filter_out(min_cil_path, vendor_origin_cil_path)
470    build_version_cil(args.vendor_policy_version, vendor_origin_cil_path, vendor_cil_path, type_set)
471    filter_out(public_version_cil_path, vendor_cil_path)
472
473    build_type_version_cil(args.vendor_policy_version, public_origin_cil_path, type_version_cil_path)
474
475    if args.components == "system":
476        generate_hash_file([system_cil_path, type_version_cil_path],
477                           os.path.join(output_path, SYSTEM_CIL_HASH))
478
479    elif args.components == "vendor":
480        generate_hash_file([system_cil_path, type_version_cil_path], os.path.join(
481            output_path, PREBUILD_SEPOLICY_SYSTEM_CIL_HASH))
482
483    version_file = os.path.join(output_path, "version")
484    generate_version_file(args, version_file)
485
486    cil_list += [vendor_cil_path, system_cil_path, type_version_cil_path, public_version_cil_path]
487
488
489def compile_sepolicy(args):
490    dir_list_object = get_policy_dir_list(args)
491    file_list_object = get_policy_file_list(args, dir_list_object)
492    cil_list = []
493    developer_cil_list = []
494
495    if args.updater_version == "enable":
496        generate_default_policy(args, file_list_object, cil_list, False)
497        build_binary_policy(args.tool_path, args.dst_file, True, cil_list)
498        return
499
500    with ThreadPoolExecutor(max_workers=2) as executor:
501        if args.components == "default":
502            normal_thread = executor.submit(generate_default_policy, args, file_list_object, cil_list, False)
503            developer_thread = executor.submit(generate_default_policy, args, file_list_object,
504                                                developer_cil_list, True)
505        else:
506            normal_thread = executor.submit(generate_special_policy, args, file_list_object, cil_list, False)
507            developer_thread = executor.submit(generate_special_policy, args, file_list_object,
508                                                developer_cil_list, True)
509
510        for future in as_completed([normal_thread, developer_thread]):
511            try:
512                future.result()
513            except Exception as e:
514                raise e
515
516    with ThreadPoolExecutor(max_workers=2) as executor:
517        build_normal_thread = executor.submit(build_binary_policy, args.tool_path, args.dst_file, True, cil_list)
518        developer_policy_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "developer/policy.31")
519        build_developer_thread = executor.submit(build_binary_policy, args.tool_path, developer_policy_path,
520                                                    True, developer_cil_list)
521
522        for future in as_completed([build_normal_thread, build_developer_thread]):
523            try:
524                future.result()
525            except Exception as e:
526                raise e
527
528
529def copy_user_policy(args):
530    if args.updater_version == "enable":
531        return
532
533    output_path = os.path.abspath(os.path.dirname(args.dst_file))
534    if args.debug_version == "enable":
535        src_dir = os.path.join(output_path, "pre_check/")
536    else:
537        src_dir = output_path
538
539    src_policy_path = os.path.join(src_dir, "developer/policy.31")
540    dest_policy_path = os.path.join(output_path, "developer/developer_policy")
541    shutil.copyfile(src_policy_path, dest_policy_path)
542
543    src_policy_path = os.path.join(src_dir, "policy.31")
544    dest_policy_path = os.path.join(output_path, "user_policy")
545    shutil.copyfile(src_policy_path, dest_policy_path)
546
547
548def pre_check(args):
549    output_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "pre_check/")
550    if not os.path.exists(output_path):
551        os.makedirs(output_path)
552    args.dst_file = os.path.join(output_path, "policy.31")
553
554    if args.debug_version == "enable":
555        args.debug_version = "disable"
556    else:
557        args.debug_version = "enable"
558
559    compile_sepolicy(args)
560
561
562def main(args):
563    with ThreadPoolExecutor(max_workers=2) as executor:
564        pre_check_args = copy.deepcopy(args)
565        pre_check_thread = executor.submit(pre_check, pre_check_args)
566
567        compile_args = copy.deepcopy(args)
568        compile_thread = executor.submit(compile_sepolicy, compile_args)
569
570        for future in as_completed([pre_check_thread, compile_thread]):
571            try:
572                future.result()
573            except Exception as e:
574                raise e
575
576    copy_user_policy(args)
577