1#!/usr/bin/env python 2# coding: utf-8 3 4""" 5Copyright (c) 2023 Huawei Device Co., Ltd. 6Licensed under the Apache License, Version 2.0 (the "License"); 7you may not use this file except in compliance with the License. 8You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12Unless required by applicable law or agreed to in writing, software 13distributed under the License is distributed on an "AS IS" BASIS, 14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15See the License for the specific language governing permissions and 16limitations under the License. 17 18""" 19 20import os 21import re 22import shutil 23import subprocess 24import tempfile 25from concurrent.futures import ThreadPoolExecutor, as_completed 26import copy 27 28SYSTEM_CIL_HASH = "system.cil.sha256" 29PREBUILD_SEPOLICY_SYSTEM_CIL_HASH = "prebuild_sepolicy.system.cil.sha256" 30 31# list of all macros and te for sepolicy build 32SEPOLICY_TYPE_LIST = ["security_classes", 33 "initial_sids", 34 "access_vectors", 35 "glb_perm_def.spt", 36 "glb_never_def.spt", 37 "mls", 38 "policy_cap", 39 "glb_te_def.spt", 40 "attributes", 41 ".te", 42 "glb_roles.spt", 43 "users", 44 "initial_sid_contexts", 45 "fs_use", 46 "virtfs_contexts", 47 ] 48 49POLICY_TYPE_LIST = ["allow", "auditallow", "dontaudit", 50 "allowx", "auditallowx", "dontauditx", 51 "neverallow", "neverallowx", ] 52 53 54class PolicyDirList(object): 55 def __init__(self, min_policy_dir_list, system_policy_dir_list, vendor_policy_dir_list, public_policy_dir_list): 56 self.min_policy_dir_list = min_policy_dir_list 57 self.system_policy_dir_list = system_policy_dir_list 58 self.vendor_policy_dir_list = vendor_policy_dir_list 59 self.public_policy_dir_list = public_policy_dir_list 60 61 62class PolicyFileList(object): 63 def __init__(self, min_policy_file_list, system_policy_file_list, vendor_policy_file_list, public_policy_file_list): 64 self.min_policy_file_list = min_policy_file_list 65 self.system_policy_file_list = system_policy_file_list 66 self.vendor_policy_file_list = vendor_policy_file_list 67 self.public_policy_file_list = public_policy_file_list 68 69 70def traverse_folder_in_dir_name(search_dir, folder_suffix): 71 folder_list = [] 72 for root, dirs, _ in sorted(os.walk(search_dir)): 73 for dir_i in dirs: 74 if dir_i == folder_suffix: 75 folder_list.append(os.path.join(root, dir_i)) 76 return folder_list 77 78 79def traverse_folder_in_type(search_dir, file_suffix, build_root): 80 policy_file_list = [] 81 flag = 0 82 for root, _, files in sorted(os.walk(search_dir)): 83 for each_file in files: 84 if each_file.endswith(file_suffix): 85 path = os.path.join(root, each_file) 86 rel_path = os.path.relpath(path, build_root) 87 flag |= check_empty_row(rel_path) 88 policy_file_list.append(rel_path) 89 policy_file_list.sort() 90 return policy_file_list, flag 91 92 93def traverse_file_in_each_type(folder_list, sepolicy_type_list, build_root): 94 policy_files_list = [] 95 err = 0 96 for policy_type in sepolicy_type_list: 97 for folder in folder_list: 98 type_file_list, flag = traverse_folder_in_type( 99 folder, policy_type, build_root) 100 err |= flag 101 if len(type_file_list) == 0: 102 continue 103 policy_files_list.extend(type_file_list) 104 if err: 105 raise Exception(err) 106 return policy_files_list 107 108 109def check_empty_row(policy_file): 110 """ 111 Check whether the last line of te file is empty. 112 :param policy_file: te file 113 :return: 114 """ 115 err = 0 116 with open(policy_file, 'r') as fp: 117 lines = fp.readlines() 118 if len(lines) == 0: 119 return 0 120 last_line = lines[-1] 121 if '\n' not in last_line: 122 print("".join([policy_file, " : need an empty line at end\n"])) 123 err = 1 124 return err 125 126 127def run_command(in_cmd): 128 cmdstr = " ".join(in_cmd) 129 ret = subprocess.run(cmdstr, shell=True).returncode 130 if ret != 0: 131 raise Exception(ret) 132 133 134def build_conf(args, output_conf, file_list, with_developer=False): 135 m4_args = ["-D", "build_with_debug=" + args.debug_version] 136 m4_args += ["-D", "build_with_updater=" + args.updater_version] 137 if with_developer: 138 m4_args += ["-D", "build_with_developer=enable"] 139 build_conf_cmd = ["m4", "-s", "--fatal-warnings"] + m4_args + file_list 140 with open(output_conf, 'w') as fd: 141 ret = subprocess.run(build_conf_cmd, shell=False, stdout=fd).returncode 142 if ret != 0: 143 raise Exception(ret) 144 145 146def build_cil(args, output_cil, input_conf): 147 check_policy_cmd = [os.path.join(args.tool_path, "checkpolicy"), 148 input_conf, 149 "-M -C -c 31", 150 "-o " + output_cil] 151 run_command(check_policy_cmd) 152 153 154def add_version(version, string): 155 return "".join([string, "_", version]) 156 157 158def simplify_string(string): 159 return string.replace('(', '').replace(')', '').replace('\n', '') 160 161 162def deal_with_roletype(version, cil_write, elem_list, type_set, file, line): 163 if len(elem_list) < 3: 164 print('Error: invalid roletype in %s:%d' % (file, line)) 165 raise Exception(1) 166 167 sub_string = simplify_string(elem_list[2]) 168 if sub_string in type_set: 169 cil_write.write('(typeattribute ' + add_version(version, sub_string) + ')\n') 170 elem_list[2] = elem_list[2].replace( 171 sub_string, add_version(version, sub_string)) 172 cil_write.write(" ".join(elem_list)) 173 174 175def deal_with_typeattribute(version, cil_write, elem_list, type_set, file, line): 176 if len(elem_list) < 2: 177 print('Error: invalid typeattribute in %s:%d' % (file, line)) 178 raise Exception(1) 179 180 sub_string = simplify_string(elem_list[1]) 181 if sub_string.startswith("base_typeattr_"): 182 elem_list[1] = elem_list[1].replace( 183 sub_string, add_version(version, sub_string)) 184 cil_write.write(" ".join(elem_list)) 185 186 187def deal_with_typeattributeset(version, cil_write, elem_list, type_set, file, line): 188 if len(elem_list) < 2: 189 print('Error: invalid typeattributeset in %s:%d' % (file, line)) 190 raise Exception(1) 191 192 for index, elem in enumerate(elem_list[1:]): 193 sub_string = simplify_string(elem) 194 if sub_string.startswith("base_typeattr_") or sub_string in type_set: 195 elem_list[index + 1] = elem.replace(sub_string, add_version(version, sub_string)) 196 cil_write.write(" ".join(elem_list)) 197 198 199def deal_with_policy(version, cil_write, elem_list, type_set, file, line): 200 if len(elem_list) < 4: 201 print('Error: invalid policy in %s:%d' % (file, line)) 202 raise Exception(1) 203 204 for index, elem in enumerate(elem_list[1:3]): 205 sub_string = simplify_string(elem) 206 if sub_string.startswith("base_typeattr_") or sub_string in type_set: 207 elem_list[index + 1] = elem.replace(sub_string, add_version(version, sub_string)) 208 cil_write.write(" ".join(elem_list)) 209 210 211def deal_with_type(version, cil_write, elem_list, file, line): 212 if len(elem_list) < 2: 213 print('Error: invalid type in %s:%d' % (file, line)) 214 raise Exception(1) 215 216 sub_string = simplify_string(elem_list[1]) 217 cil_write.write(" ".join(['(typeattributeset', add_version(version, sub_string), '(', sub_string, '))\n'])) 218 cil_write.write(" ".join(['(expandtypeattribute', '(', add_version(version, sub_string), ') true)\n'])) 219 cil_write.write(" ".join(['(typeattribute', add_version(version, sub_string), ')\n'])) 220 221 222def build_version_cil(version, cil_file_input, cil_file_output, type_set): 223 index = 0 224 with open(cil_file_input, 'r') as cil_read, open(cil_file_output, 'w') as cil_write: 225 for line in cil_read: 226 index += 1 227 if not line.startswith('('): 228 continue 229 230 elem_list = line.split(' ') 231 if not elem_list: 232 continue 233 234 if elem_list[0] == '(type': 235 cil_write.write(line) 236 elif elem_list[0] == '(roletype': 237 deal_with_roletype(version, cil_write, elem_list, type_set, cil_file_input, line) 238 elif elem_list[0] == '(typeattribute': 239 deal_with_typeattribute(version, cil_write, elem_list, type_set, cil_file_input, line) 240 elif elem_list[0] == '(typeattributeset': 241 deal_with_typeattributeset(version, cil_write, elem_list, type_set, cil_file_input, line) 242 elif simplify_string(elem_list[0]) in POLICY_TYPE_LIST: 243 deal_with_policy(version, cil_write, elem_list, type_set, cil_file_input, line) 244 else: 245 cil_write.write(line) 246 247 248def build_type_version_cil(version, cil_file_input, cil_file_output): 249 index = 0 250 with open(cil_file_input, 'r') as cil_read, open(cil_file_output, 'w') as cil_write: 251 for line in cil_read: 252 index += 1 253 if not line.startswith('('): 254 continue 255 256 elem_list = line.split(' ') 257 if not elem_list: 258 continue 259 260 if elem_list[0] == '(type': 261 deal_with_type(version, cil_write, elem_list, line, index) 262 263 264def get_type_set(cil_file_input): 265 pattern_type = re.compile(r'^\(type (.*)\)$') 266 pattern_typeattribute = re.compile(r'^\(type_attribute (base_typeattr_[0-9]+)\)$') 267 type_set = set() 268 with open(cil_file_input, 'r') as cil_read: 269 for line in cil_read: 270 match_type = pattern_type.match(line) 271 match_typeattribute = pattern_typeattribute.match(line) 272 if match_type: 273 type_set.add(match_type.group(1)) 274 elif match_typeattribute: 275 type_set.add(match_typeattribute.group(1)) 276 return type_set 277 278 279def add_base_typeattr_prefix(cil_file, prefix): 280 with open(cil_file, 'r') as cil_read: 281 data = cil_read.read() 282 data = data.replace('base_typeattr_', '_'.join([prefix, 'base_typeattr_'])) 283 with open(cil_file, 'w') as cil_write: 284 cil_write.write(data) 285 286 287def build_binary_policy(tool_path, output_policy, check_neverallow, cil_list): 288 build_policy_cmd = [os.path.join(tool_path, "secilc"), 289 " ".join(cil_list), 290 "-m -M true -G -c 31", 291 "-f /dev/null", 292 "-o " + output_policy] 293 if not check_neverallow: 294 build_policy_cmd.append("-N") 295 run_command(build_policy_cmd) 296 297 298def prepare_build_path(dir_list, root_dir, build_dir_list, sepolicy_path): 299 build_policy_list = [os.path.join(sepolicy_path, "base"), os.path.join(sepolicy_path, "ohos_policy")] 300 build_policy_list += dir_list.split(":") 301 302 for i in build_policy_list: 303 if i == "" or i == "default": 304 continue 305 path = os.path.join(root_dir, i) 306 if (os.path.exists(path)): 307 build_dir_list.append(path) 308 else: 309 print("following path not exists!! {}".format(path)) 310 exit(-1) 311 312 313def get_policy_dir_list(args): 314 sepolicy_path = os.path.join(args.source_root_dir, "base/security/selinux_adapter/sepolicy/") 315 dir_list = [] 316 prepare_build_path(args.policy_dir_list, args.source_root_dir, dir_list, sepolicy_path) 317 min_policy_dir_list = [os.path.join(sepolicy_path, "min")] 318 system_policy = [] 319 public_policy = [] 320 vendor_policy = [] 321 322 for item in dir_list: 323 public_policy += traverse_folder_in_dir_name(item, "public") 324 system_policy += traverse_folder_in_dir_name(item, "system") 325 vendor_policy += traverse_folder_in_dir_name(item, "vendor") 326 327 # list of all policy folders 328 system_policy_dir_list = public_policy + system_policy 329 vendor_policy_dir_list = public_policy + vendor_policy + min_policy_dir_list 330 public_policy_dir_list = public_policy + min_policy_dir_list 331 332 # add temp dirs base/te folders 333 system_policy_dir_list.append(os.path.join(sepolicy_path, "base/te")) 334 vendor_policy_dir_list.append(os.path.join(sepolicy_path, "base/te")) 335 public_policy_dir_list.append(os.path.join(sepolicy_path, "base/te")) 336 337 return PolicyDirList(min_policy_dir_list, system_policy_dir_list, vendor_policy_dir_list, public_policy_dir_list) 338 339 340def get_policy_file_list(args, dir_list_object): 341 build_root = os.path.abspath(os.path.join(args.tool_path, "../../..")) 342 # list of all policy files 343 system_policy_file_list = traverse_file_in_each_type( 344 dir_list_object.system_policy_dir_list, SEPOLICY_TYPE_LIST, build_root) 345 vendor_policy_file_list = traverse_file_in_each_type( 346 dir_list_object.vendor_policy_dir_list, SEPOLICY_TYPE_LIST, build_root) 347 public_policy_file_list = traverse_file_in_each_type( 348 dir_list_object.public_policy_dir_list, SEPOLICY_TYPE_LIST, build_root) 349 min_policy_file_list = traverse_file_in_each_type( 350 dir_list_object.min_policy_dir_list, SEPOLICY_TYPE_LIST, build_root) 351 352 return PolicyFileList(min_policy_file_list, system_policy_file_list, vendor_policy_file_list, 353 public_policy_file_list) 354 355 356def filter_out(pattern_file, input_file): 357 with open(pattern_file, 'r', encoding='utf-8') as pat_file: 358 patterns = set(pat_file) 359 tmp_output = tempfile.NamedTemporaryFile() 360 with open(input_file, 'r', encoding='utf-8') as in_file: 361 tmp_output.writelines(line.encode(encoding='utf-8') for line in in_file.readlines() 362 if line not in patterns) 363 tmp_output.write("\n".encode(encoding='utf-8')) 364 tmp_output.flush() 365 shutil.copyfile(tmp_output.name, input_file) 366 367 368def generate_hash_file(input_file_list, output_file): 369 build_policy_cmd = ["cat", 370 " ".join(input_file_list), 371 "| sha256sum", 372 "| cut -d' ' -f1", 373 ">", 374 output_file] 375 run_command(build_policy_cmd) 376 377 378def generate_version_file(args, output_file): 379 cmd = ["echo", args.vendor_policy_version, 380 ">", output_file] 381 run_command(cmd) 382 383 384def generate_default_policy(args, policy, cil_list, with_developer=False): 385 if with_developer: 386 output_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "developer/") 387 if not os.path.exists(output_path): 388 os.makedirs(output_path) 389 else: 390 output_path = os.path.abspath(os.path.dirname(args.dst_file)) 391 system_output_conf = os.path.join(output_path, "system.conf") 392 vendor_output_conf = os.path.join(output_path, "vendor.conf") 393 min_output_conf = os.path.join(output_path, "min.conf") 394 395 system_cil_path = os.path.join(output_path, "system.cil") 396 vendor_cil_path = os.path.join(output_path, "vendor.cil") 397 min_cil_path = os.path.join(output_path, "min.cil") 398 399 # build system.conf 400 build_conf(args, system_output_conf, policy.system_policy_file_list, with_developer) 401 # build system.cil 402 build_cil(args, system_cil_path, system_output_conf) 403 404 # build vendor.conf 405 build_conf(args, vendor_output_conf, policy.vendor_policy_file_list, with_developer) 406 # build vendor.cil 407 build_cil(args, vendor_cil_path, vendor_output_conf) 408 add_base_typeattr_prefix(vendor_cil_path, 'vendor') 409 410 # build min.conf 411 build_conf(args, min_output_conf, policy.min_policy_file_list) 412 # build min.cil 413 build_cil(args, min_cil_path, min_output_conf) 414 415 filter_out(min_cil_path, vendor_cil_path) 416 417 cil_list += [vendor_cil_path, system_cil_path] 418 419 420def generate_special_policy(args, policy, cil_list, with_developer=False): 421 if with_developer: 422 output_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "developer/") 423 if not os.path.exists(output_path): 424 os.makedirs(output_path) 425 else: 426 output_path = os.path.abspath(os.path.dirname(args.dst_file)) 427 428 compatible_dir = os.path.join(output_path, "compatible/") 429 if not os.path.exists(compatible_dir): 430 os.makedirs(compatible_dir) 431 432 system_output_conf = os.path.join(output_path, "system.conf") 433 vendor_output_conf = os.path.join(output_path, "vendor.conf") 434 public_output_conf = os.path.join(output_path, "public.conf") 435 min_output_conf = os.path.join(output_path, "min.conf") 436 437 vendor_origin_cil_path = os.path.join(output_path, "vendor_origin.cil") 438 public_origin_cil_path = os.path.join(output_path, "public_origin.cil") 439 min_cil_path = os.path.join(output_path, "min.cil") 440 441 # output file 442 system_cil_path = os.path.join(output_path, "system.cil") 443 vendor_cil_path = os.path.join(output_path, "vendor.cil") 444 public_version_cil_path = os.path.join(output_path, "public.cil") 445 type_version_cil_path = os.path.join(output_path, "".join(["compatible/", args.vendor_policy_version, ".cil"])) 446 447 # build system.conf 448 build_conf(args, system_output_conf, policy.system_policy_file_list, with_developer) 449 # build system.cil 450 build_cil(args, system_cil_path, system_output_conf) 451 452 # build min.cil 453 build_conf(args, min_output_conf, policy.min_policy_file_list) 454 build_cil(args, min_cil_path, min_output_conf) 455 456 # build public.cil 457 build_conf(args, public_output_conf, policy.public_policy_file_list, with_developer) 458 build_cil(args, public_origin_cil_path, public_output_conf) 459 type_set = get_type_set(public_origin_cil_path) 460 filter_out(min_cil_path, public_origin_cil_path) 461 build_version_cil(args.vendor_policy_version, public_origin_cil_path, public_version_cil_path, type_set) 462 463 # build vendor.cil 464 build_conf(args, vendor_output_conf, policy.vendor_policy_file_list, with_developer) 465 build_cil(args, vendor_origin_cil_path, vendor_output_conf) 466 filter_out(min_cil_path, vendor_origin_cil_path) 467 build_version_cil(args.vendor_policy_version, vendor_origin_cil_path, vendor_cil_path, type_set) 468 filter_out(public_version_cil_path, vendor_cil_path) 469 470 build_type_version_cil(args.vendor_policy_version, public_origin_cil_path, type_version_cil_path) 471 472 if args.components == "system": 473 generate_hash_file([system_cil_path, type_version_cil_path], 474 os.path.join(output_path, SYSTEM_CIL_HASH)) 475 476 elif args.components == "vendor": 477 generate_hash_file([system_cil_path, type_version_cil_path], os.path.join( 478 output_path, PREBUILD_SEPOLICY_SYSTEM_CIL_HASH)) 479 480 version_file = os.path.join(output_path, "version") 481 generate_version_file(args, version_file) 482 483 cil_list += [vendor_cil_path, system_cil_path, type_version_cil_path, public_version_cil_path] 484 485 486def compile_sepolicy(args): 487 dir_list_object = get_policy_dir_list(args) 488 file_list_object = get_policy_file_list(args, dir_list_object) 489 cil_list = [] 490 developer_cil_list = [] 491 492 if args.updater_version == "enable": 493 generate_default_policy(args, file_list_object, cil_list, False) 494 build_binary_policy(args.tool_path, args.dst_file, True, cil_list) 495 return 496 497 with ThreadPoolExecutor(max_workers=2) as executor: 498 if args.components == "default": 499 normal_thread = executor.submit(generate_default_policy, args, file_list_object, cil_list, False) 500 developer_thread = executor.submit(generate_default_policy, args, file_list_object, 501 developer_cil_list, True) 502 else: 503 normal_thread = executor.submit(generate_special_policy, args, file_list_object, cil_list, False) 504 developer_thread = executor.submit(generate_special_policy, args, file_list_object, 505 developer_cil_list, True) 506 507 for future in as_completed([normal_thread, developer_thread]): 508 try: 509 future.result() 510 except Exception as e: 511 raise e 512 513 with ThreadPoolExecutor(max_workers=2) as executor: 514 build_normal_thread = executor.submit(build_binary_policy, args.tool_path, args.dst_file, True, cil_list) 515 developer_policy_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "developer/policy.31") 516 build_developer_thread = executor.submit(build_binary_policy, args.tool_path, developer_policy_path, 517 True, developer_cil_list) 518 519 for future in as_completed([build_normal_thread, build_developer_thread]): 520 try: 521 future.result() 522 except Exception as e: 523 raise e 524 525 526def copy_user_policy(args): 527 if args.updater_version == "enable": 528 return 529 530 output_path = os.path.abspath(os.path.dirname(args.dst_file)) 531 if args.debug_version == "enable": 532 src_dir = os.path.join(output_path, "pre_check/") 533 else: 534 src_dir = output_path 535 536 src_policy_path = os.path.join(src_dir, "developer/policy.31") 537 dest_policy_path = os.path.join(output_path, "developer/developer_policy") 538 shutil.copyfile(src_policy_path, dest_policy_path) 539 540 src_policy_path = os.path.join(src_dir, "policy.31") 541 dest_policy_path = os.path.join(output_path, "user_policy") 542 shutil.copyfile(src_policy_path, dest_policy_path) 543 544 545def pre_check(args): 546 output_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "pre_check/") 547 if not os.path.exists(output_path): 548 os.makedirs(output_path) 549 args.dst_file = os.path.join(output_path, "policy.31") 550 551 if args.debug_version == "enable": 552 args.debug_version = "disable" 553 else: 554 args.debug_version = "enable" 555 556 compile_sepolicy(args) 557 558 559def main(args): 560 with ThreadPoolExecutor(max_workers=2) as executor: 561 pre_check_args = copy.deepcopy(args) 562 pre_check_thread = executor.submit(pre_check, pre_check_args) 563 564 compile_args = copy.deepcopy(args) 565 compile_thread = executor.submit(compile_sepolicy, compile_args) 566 567 for future in as_completed([pre_check_thread, compile_thread]): 568 try: 569 future.result() 570 except Exception as e: 571 raise e 572 573 copy_user_policy(args) 574