1#!/usr/bin/env python 2# coding: utf-8 3 4""" 5Copyright (c) 2023 Huawei Device Co., Ltd. 6Licensed under the Apache License, Version 2.0 (the "License"); 7you may not use this file except in compliance with the License. 8You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12Unless required by applicable law or agreed to in writing, software 13distributed under the License is distributed on an "AS IS" BASIS, 14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15See the License for the specific language governing permissions and 16limitations under the License. 17 18""" 19 20import os 21import re 22import shutil 23import subprocess 24import tempfile 25from concurrent.futures import ThreadPoolExecutor, as_completed 26import copy 27 28SYSTEM_CIL_HASH = "system.cil.sha256" 29PREBUILD_SEPOLICY_SYSTEM_CIL_HASH = "prebuild_sepolicy.system.cil.sha256" 30 31# list of all macros and te for sepolicy build 32SEPOLICY_TYPE_LIST = ["security_classes", 33 "initial_sids", 34 "access_vectors", 35 "glb_perm_def.spt", 36 "glb_never_def.spt", 37 "mls", 38 "policy_cap", 39 "glb_te_def.spt", 40 "attributes", 41 ".te", 42 "glb_roles.spt", 43 "users", 44 "initial_sid_contexts", 45 "fs_use", 46 "virtfs_contexts", 47 ] 48 49POLICY_TYPE_LIST = ["allow", "auditallow", "dontaudit", 50 "allowx", "auditallowx", "dontauditx", 51 "neverallow", "neverallowx", ] 52 53 54class PolicyDirList(object): 55 def __init__(self, min_policy_dir_list, system_policy_dir_list, vendor_policy_dir_list, public_policy_dir_list): 56 self.min_policy_dir_list = min_policy_dir_list 57 self.system_policy_dir_list = system_policy_dir_list 58 self.vendor_policy_dir_list = vendor_policy_dir_list 59 self.public_policy_dir_list = public_policy_dir_list 60 61 62class PolicyFileList(object): 63 def __init__(self, min_policy_file_list, system_policy_file_list, vendor_policy_file_list, public_policy_file_list): 64 self.min_policy_file_list = min_policy_file_list 65 self.system_policy_file_list = system_policy_file_list 66 self.vendor_policy_file_list = vendor_policy_file_list 67 self.public_policy_file_list = public_policy_file_list 68 69 70def traverse_folder_in_dir_name(search_dir, folder_suffix): 71 folder_list = [] 72 for root, dirs, _ in sorted(os.walk(search_dir)): 73 for dir_i in dirs: 74 if dir_i == folder_suffix: 75 folder_list.append(os.path.join(root, dir_i)) 76 return folder_list 77 78 79def traverse_folder_in_type(search_dir, file_suffix, build_root): 80 policy_file_list = [] 81 flag = 0 82 for root, _, files in sorted(os.walk(search_dir)): 83 for each_file in files: 84 if each_file.endswith(file_suffix): 85 path = os.path.join(root, each_file) 86 rel_path = os.path.relpath(path, build_root) 87 flag |= check_empty_row(rel_path) 88 policy_file_list.append(rel_path) 89 policy_file_list.sort() 90 return policy_file_list, flag 91 92 93def traverse_file_in_each_type(folder_list, sepolicy_type_list, build_root): 94 policy_files_list = [] 95 err = 0 96 for policy_type in sepolicy_type_list: 97 for folder in folder_list: 98 type_file_list, flag = traverse_folder_in_type( 99 folder, policy_type, build_root) 100 err |= flag 101 if len(type_file_list) == 0: 102 continue 103 policy_files_list.extend(type_file_list) 104 if err: 105 raise Exception(err) 106 return policy_files_list 107 108 109def check_empty_row(policy_file): 110 """ 111 Check whether the last line of te file is empty. 112 :param policy_file: te file 113 :return: 114 """ 115 err = 0 116 with open(policy_file, 'r') as fp: 117 lines = fp.readlines() 118 if len(lines) == 0: 119 return 0 120 last_line = lines[-1] 121 if '\n' not in last_line: 122 print("".join([policy_file, " : need an empty line at end\n"])) 123 err = 1 124 return err 125 126 127def run_command(in_cmd): 128 cmdstr = " ".join(in_cmd) 129 ret = subprocess.run(cmdstr, shell=True).returncode 130 if ret != 0: 131 raise Exception(ret) 132 133 134def build_conf(args, output_conf, file_list, with_developer=False): 135 m4_args = ["-D", "build_with_debug=" + args.debug_version] 136 m4_args += ["-D", "build_with_updater=" + args.updater_version] 137 if with_developer: 138 m4_args += ["-D", "build_with_developer=enable"] 139 if getattr(args, "product_args", None): 140 for product_arg in args.product_args: 141 m4_args += ["-D", product_arg] 142 build_conf_cmd = ["m4", "-s", "--fatal-warnings"] + m4_args + file_list 143 with open(output_conf, 'w') as fd: 144 ret = subprocess.run(build_conf_cmd, shell=False, stdout=fd).returncode 145 if ret != 0: 146 raise Exception(ret) 147 148 149def build_cil(args, output_cil, input_conf): 150 check_policy_cmd = [os.path.join(args.tool_path, "checkpolicy"), 151 input_conf, 152 "-M -C -c 31", 153 "-o " + output_cil] 154 run_command(check_policy_cmd) 155 156 157def add_version(version, string): 158 return "".join([string, "_", version]) 159 160 161def simplify_string(string): 162 return string.replace('(', '').replace(')', '').replace('\n', '') 163 164 165def deal_with_roletype(version, cil_write, elem_list, type_set, file, line): 166 if len(elem_list) < 3: 167 print('Error: invalid roletype in %s:%d' % (file, line)) 168 raise Exception(1) 169 170 sub_string = simplify_string(elem_list[2]) 171 if sub_string in type_set: 172 cil_write.write('(typeattribute ' + add_version(version, sub_string) + ')\n') 173 elem_list[2] = elem_list[2].replace( 174 sub_string, add_version(version, sub_string)) 175 cil_write.write(" ".join(elem_list)) 176 177 178def deal_with_typeattribute(version, cil_write, elem_list, type_set, file, line): 179 if len(elem_list) < 2: 180 print('Error: invalid typeattribute in %s:%d' % (file, line)) 181 raise Exception(1) 182 183 sub_string = simplify_string(elem_list[1]) 184 if sub_string.startswith("base_typeattr_"): 185 elem_list[1] = elem_list[1].replace( 186 sub_string, add_version(version, sub_string)) 187 cil_write.write(" ".join(elem_list)) 188 189 190def deal_with_typeattributeset(version, cil_write, elem_list, type_set, file, line): 191 if len(elem_list) < 2: 192 print('Error: invalid typeattributeset in %s:%d' % (file, line)) 193 raise Exception(1) 194 195 for index, elem in enumerate(elem_list[1:]): 196 sub_string = simplify_string(elem) 197 if sub_string.startswith("base_typeattr_") or sub_string in type_set: 198 elem_list[index + 1] = elem.replace(sub_string, add_version(version, sub_string)) 199 cil_write.write(" ".join(elem_list)) 200 201 202def deal_with_policy(version, cil_write, elem_list, type_set, file, line): 203 if len(elem_list) < 4: 204 print('Error: invalid policy in %s:%d' % (file, line)) 205 raise Exception(1) 206 207 for index, elem in enumerate(elem_list[1:3]): 208 sub_string = simplify_string(elem) 209 if sub_string.startswith("base_typeattr_") or sub_string in type_set: 210 elem_list[index + 1] = elem.replace(sub_string, add_version(version, sub_string)) 211 cil_write.write(" ".join(elem_list)) 212 213 214def deal_with_type(version, cil_write, elem_list, file, line): 215 if len(elem_list) < 2: 216 print('Error: invalid type in %s:%d' % (file, line)) 217 raise Exception(1) 218 219 sub_string = simplify_string(elem_list[1]) 220 cil_write.write(" ".join(['(typeattributeset', add_version(version, sub_string), '(', sub_string, '))\n'])) 221 cil_write.write(" ".join(['(expandtypeattribute', '(', add_version(version, sub_string), ') true)\n'])) 222 cil_write.write(" ".join(['(typeattribute', add_version(version, sub_string), ')\n'])) 223 224 225def build_version_cil(version, cil_file_input, cil_file_output, type_set): 226 index = 0 227 with open(cil_file_input, 'r') as cil_read, open(cil_file_output, 'w') as cil_write: 228 for line in cil_read: 229 index += 1 230 if not line.startswith('('): 231 continue 232 233 elem_list = line.split(' ') 234 if not elem_list: 235 continue 236 237 if elem_list[0] == '(type': 238 cil_write.write(line) 239 elif elem_list[0] == '(roletype': 240 deal_with_roletype(version, cil_write, elem_list, type_set, cil_file_input, line) 241 elif elem_list[0] == '(typeattribute': 242 deal_with_typeattribute(version, cil_write, elem_list, type_set, cil_file_input, line) 243 elif elem_list[0] == '(typeattributeset': 244 deal_with_typeattributeset(version, cil_write, elem_list, type_set, cil_file_input, line) 245 elif simplify_string(elem_list[0]) in POLICY_TYPE_LIST: 246 deal_with_policy(version, cil_write, elem_list, type_set, cil_file_input, line) 247 else: 248 cil_write.write(line) 249 250 251def build_type_version_cil(version, cil_file_input, cil_file_output): 252 index = 0 253 with open(cil_file_input, 'r') as cil_read, open(cil_file_output, 'w') as cil_write: 254 for line in cil_read: 255 index += 1 256 if not line.startswith('('): 257 continue 258 259 elem_list = line.split(' ') 260 if not elem_list: 261 continue 262 263 if elem_list[0] == '(type': 264 deal_with_type(version, cil_write, elem_list, line, index) 265 266 267def get_type_set(cil_file_input): 268 pattern_type = re.compile(r'^\(type (.*)\)$') 269 pattern_typeattribute = re.compile(r'^\(type_attribute (base_typeattr_[0-9]+)\)$') 270 type_set = set() 271 with open(cil_file_input, 'r') as cil_read: 272 for line in cil_read: 273 match_type = pattern_type.match(line) 274 match_typeattribute = pattern_typeattribute.match(line) 275 if match_type: 276 type_set.add(match_type.group(1)) 277 elif match_typeattribute: 278 type_set.add(match_typeattribute.group(1)) 279 return type_set 280 281 282def add_base_typeattr_prefix(cil_file, prefix): 283 with open(cil_file, 'r') as cil_read: 284 data = cil_read.read() 285 data = data.replace('base_typeattr_', '_'.join([prefix, 'base_typeattr_'])) 286 with open(cil_file, 'w') as cil_write: 287 cil_write.write(data) 288 289 290def build_binary_policy(tool_path, output_policy, check_neverallow, cil_list): 291 build_policy_cmd = [os.path.join(tool_path, "secilc"), 292 " ".join(cil_list), 293 "-m -M true -G -c 31 -O", 294 "-f /dev/null", 295 "-o " + output_policy] 296 if not check_neverallow: 297 build_policy_cmd.append("-N") 298 run_command(build_policy_cmd) 299 300 301def prepare_build_path(dir_list, root_dir, build_dir_list, sepolicy_path): 302 build_policy_list = [os.path.join(sepolicy_path, "base"), os.path.join(sepolicy_path, "ohos_policy")] 303 build_policy_list += dir_list.split(":") 304 305 for i in build_policy_list: 306 if i == "" or i == "default": 307 continue 308 path = os.path.join(root_dir, i) 309 if (os.path.exists(path)): 310 build_dir_list.append(path) 311 else: 312 print("following path not exists!! {}".format(path)) 313 exit(-1) 314 315 316def get_policy_dir_list(args): 317 sepolicy_path = os.path.join(args.source_root_dir, "base/security/selinux_adapter/sepolicy/") 318 dir_list = [] 319 prepare_build_path(args.policy_dir_list, args.source_root_dir, dir_list, sepolicy_path) 320 min_policy_dir_list = [os.path.join(sepolicy_path, "min")] 321 system_policy = [] 322 public_policy = [] 323 vendor_policy = [] 324 325 for item in dir_list: 326 public_policy += traverse_folder_in_dir_name(item, "public") 327 system_policy += traverse_folder_in_dir_name(item, "system") 328 vendor_policy += traverse_folder_in_dir_name(item, "vendor") 329 330 # list of all policy folders 331 system_policy_dir_list = public_policy + system_policy 332 vendor_policy_dir_list = public_policy + vendor_policy + min_policy_dir_list 333 public_policy_dir_list = public_policy + min_policy_dir_list 334 335 # add temp dirs base/te folders 336 system_policy_dir_list.append(os.path.join(sepolicy_path, "base/te")) 337 vendor_policy_dir_list.append(os.path.join(sepolicy_path, "base/te")) 338 public_policy_dir_list.append(os.path.join(sepolicy_path, "base/te")) 339 340 return PolicyDirList(min_policy_dir_list, system_policy_dir_list, vendor_policy_dir_list, public_policy_dir_list) 341 342 343def get_policy_file_list(args, dir_list_object): 344 build_root = os.getcwd() 345 # list of all policy files 346 system_policy_file_list = traverse_file_in_each_type( 347 dir_list_object.system_policy_dir_list, SEPOLICY_TYPE_LIST, build_root) 348 vendor_policy_file_list = traverse_file_in_each_type( 349 dir_list_object.vendor_policy_dir_list, SEPOLICY_TYPE_LIST, build_root) 350 public_policy_file_list = traverse_file_in_each_type( 351 dir_list_object.public_policy_dir_list, SEPOLICY_TYPE_LIST, build_root) 352 min_policy_file_list = traverse_file_in_each_type( 353 dir_list_object.min_policy_dir_list, SEPOLICY_TYPE_LIST, build_root) 354 355 return PolicyFileList(min_policy_file_list, system_policy_file_list, vendor_policy_file_list, 356 public_policy_file_list) 357 358 359def filter_out(pattern_file, input_file): 360 with open(pattern_file, 'r', encoding='utf-8') as pat_file: 361 patterns = set(pat_file) 362 tmp_output = tempfile.NamedTemporaryFile() 363 with open(input_file, 'r', encoding='utf-8') as in_file: 364 tmp_output.writelines(line.encode(encoding='utf-8') for line in in_file.readlines() 365 if line not in patterns) 366 tmp_output.write("\n".encode(encoding='utf-8')) 367 tmp_output.flush() 368 shutil.copyfile(tmp_output.name, input_file) 369 370 371def generate_hash_file(input_file_list, output_file): 372 build_policy_cmd = ["cat", 373 " ".join(input_file_list), 374 "| sha256sum", 375 "| cut -d' ' -f1", 376 ">", 377 output_file] 378 run_command(build_policy_cmd) 379 380 381def generate_version_file(args, output_file): 382 cmd = ["echo", args.vendor_policy_version, 383 ">", output_file] 384 run_command(cmd) 385 386 387def generate_default_policy(args, policy, cil_list, with_developer=False): 388 if with_developer: 389 output_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "developer/") 390 if not os.path.exists(output_path): 391 os.makedirs(output_path) 392 else: 393 output_path = os.path.abspath(os.path.dirname(args.dst_file)) 394 system_output_conf = os.path.join(output_path, "system.conf") 395 vendor_output_conf = os.path.join(output_path, "vendor.conf") 396 min_output_conf = os.path.join(output_path, "min.conf") 397 398 system_cil_path = os.path.join(output_path, "system.cil") 399 vendor_cil_path = os.path.join(output_path, "vendor.cil") 400 min_cil_path = os.path.join(output_path, "min.cil") 401 402 # build system.conf 403 build_conf(args, system_output_conf, policy.system_policy_file_list, with_developer) 404 # build system.cil 405 build_cil(args, system_cil_path, system_output_conf) 406 407 # build vendor.conf 408 build_conf(args, vendor_output_conf, policy.vendor_policy_file_list, with_developer) 409 # build vendor.cil 410 build_cil(args, vendor_cil_path, vendor_output_conf) 411 add_base_typeattr_prefix(vendor_cil_path, 'vendor') 412 413 # build min.conf 414 build_conf(args, min_output_conf, policy.min_policy_file_list) 415 # build min.cil 416 build_cil(args, min_cil_path, min_output_conf) 417 418 filter_out(min_cil_path, vendor_cil_path) 419 420 cil_list += [vendor_cil_path, system_cil_path] 421 422 423def generate_special_policy(args, policy, cil_list, with_developer=False): 424 if with_developer: 425 output_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "developer/") 426 if not os.path.exists(output_path): 427 os.makedirs(output_path) 428 else: 429 output_path = os.path.abspath(os.path.dirname(args.dst_file)) 430 431 compatible_dir = os.path.join(output_path, "compatible/") 432 if not os.path.exists(compatible_dir): 433 os.makedirs(compatible_dir) 434 435 system_output_conf = os.path.join(output_path, "system.conf") 436 vendor_output_conf = os.path.join(output_path, "vendor.conf") 437 public_output_conf = os.path.join(output_path, "public.conf") 438 min_output_conf = os.path.join(output_path, "min.conf") 439 440 vendor_origin_cil_path = os.path.join(output_path, "vendor_origin.cil") 441 public_origin_cil_path = os.path.join(output_path, "public_origin.cil") 442 min_cil_path = os.path.join(output_path, "min.cil") 443 444 # output file 445 system_cil_path = os.path.join(output_path, "system.cil") 446 vendor_cil_path = os.path.join(output_path, "vendor.cil") 447 public_version_cil_path = os.path.join(output_path, "public.cil") 448 type_version_cil_path = os.path.join(output_path, "".join(["compatible/", args.vendor_policy_version, ".cil"])) 449 450 # build system.conf 451 build_conf(args, system_output_conf, policy.system_policy_file_list, with_developer) 452 # build system.cil 453 build_cil(args, system_cil_path, system_output_conf) 454 455 # build min.cil 456 build_conf(args, min_output_conf, policy.min_policy_file_list) 457 build_cil(args, min_cil_path, min_output_conf) 458 459 # build public.cil 460 build_conf(args, public_output_conf, policy.public_policy_file_list, with_developer) 461 build_cil(args, public_origin_cil_path, public_output_conf) 462 type_set = get_type_set(public_origin_cil_path) 463 filter_out(min_cil_path, public_origin_cil_path) 464 build_version_cil(args.vendor_policy_version, public_origin_cil_path, public_version_cil_path, type_set) 465 466 # build vendor.cil 467 build_conf(args, vendor_output_conf, policy.vendor_policy_file_list, with_developer) 468 build_cil(args, vendor_origin_cil_path, vendor_output_conf) 469 filter_out(min_cil_path, vendor_origin_cil_path) 470 build_version_cil(args.vendor_policy_version, vendor_origin_cil_path, vendor_cil_path, type_set) 471 filter_out(public_version_cil_path, vendor_cil_path) 472 473 build_type_version_cil(args.vendor_policy_version, public_origin_cil_path, type_version_cil_path) 474 475 if args.components == "system": 476 generate_hash_file([system_cil_path, type_version_cil_path], 477 os.path.join(output_path, SYSTEM_CIL_HASH)) 478 479 elif args.components == "vendor": 480 generate_hash_file([system_cil_path, type_version_cil_path], os.path.join( 481 output_path, PREBUILD_SEPOLICY_SYSTEM_CIL_HASH)) 482 483 version_file = os.path.join(output_path, "version") 484 generate_version_file(args, version_file) 485 486 cil_list += [vendor_cil_path, system_cil_path, type_version_cil_path, public_version_cil_path] 487 488 489def compile_sepolicy(args): 490 dir_list_object = get_policy_dir_list(args) 491 file_list_object = get_policy_file_list(args, dir_list_object) 492 cil_list = [] 493 developer_cil_list = [] 494 495 if args.updater_version == "enable": 496 generate_default_policy(args, file_list_object, cil_list, False) 497 build_binary_policy(args.tool_path, args.dst_file, True, cil_list) 498 return 499 500 with ThreadPoolExecutor(max_workers=2) as executor: 501 if args.components == "default": 502 normal_thread = executor.submit(generate_default_policy, args, file_list_object, cil_list, False) 503 developer_thread = executor.submit(generate_default_policy, args, file_list_object, 504 developer_cil_list, True) 505 else: 506 normal_thread = executor.submit(generate_special_policy, args, file_list_object, cil_list, False) 507 developer_thread = executor.submit(generate_special_policy, args, file_list_object, 508 developer_cil_list, True) 509 510 for future in as_completed([normal_thread, developer_thread]): 511 try: 512 future.result() 513 except Exception as e: 514 raise e 515 516 with ThreadPoolExecutor(max_workers=2) as executor: 517 build_normal_thread = executor.submit(build_binary_policy, args.tool_path, args.dst_file, True, cil_list) 518 developer_policy_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "developer/policy.31") 519 build_developer_thread = executor.submit(build_binary_policy, args.tool_path, developer_policy_path, 520 True, developer_cil_list) 521 522 for future in as_completed([build_normal_thread, build_developer_thread]): 523 try: 524 future.result() 525 except Exception as e: 526 raise e 527 528 529def copy_user_policy(args): 530 if args.updater_version == "enable": 531 return 532 533 output_path = os.path.abspath(os.path.dirname(args.dst_file)) 534 if args.debug_version == "enable": 535 src_dir = os.path.join(output_path, "pre_check/") 536 else: 537 src_dir = output_path 538 539 src_policy_path = os.path.join(src_dir, "developer/policy.31") 540 dest_policy_path = os.path.join(output_path, "developer/developer_policy") 541 shutil.copyfile(src_policy_path, dest_policy_path) 542 543 src_policy_path = os.path.join(src_dir, "policy.31") 544 dest_policy_path = os.path.join(output_path, "user_policy") 545 shutil.copyfile(src_policy_path, dest_policy_path) 546 547 548def pre_check(args): 549 output_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "pre_check/") 550 if not os.path.exists(output_path): 551 os.makedirs(output_path) 552 args.dst_file = os.path.join(output_path, "policy.31") 553 554 if args.debug_version == "enable": 555 args.debug_version = "disable" 556 else: 557 args.debug_version = "enable" 558 559 compile_sepolicy(args) 560 561 562def main(args): 563 with ThreadPoolExecutor(max_workers=2) as executor: 564 pre_check_args = copy.deepcopy(args) 565 pre_check_thread = executor.submit(pre_check, pre_check_args) 566 567 compile_args = copy.deepcopy(args) 568 compile_thread = executor.submit(compile_sepolicy, compile_args) 569 570 for future in as_completed([pre_check_thread, compile_thread]): 571 try: 572 future.result() 573 except Exception as e: 574 raise e 575 576 copy_user_policy(args) 577