1#!/usr/bin/env python 2# coding: utf-8 3 4""" 5Copyright (c) 2021-2022 Huawei Device Co., Ltd. 6Licensed under the Apache License, Version 2.0 (the "License"); 7you may not use this file except in compliance with the License. 8You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12Unless required by applicable law or agreed to in writing, software 13distributed under the License is distributed on an "AS IS" BASIS, 14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15See the License for the specific language governing permissions and 16limitations under the License. 17 18""" 19 20import os 21import argparse 22import re 23import shutil 24import subprocess 25from collections import defaultdict 26 27 28def parse_args(): 29 parser = argparse.ArgumentParser() 30 parser.add_argument( 31 '--dst-dir', help='the output dest path', required=True) 32 parser.add_argument('--tool-path', 33 help='the sefcontext_compile bin path', required=True) 34 parser.add_argument('--policy-file', 35 help='the policy.31 file', required=True) 36 parser.add_argument('--source-root-dir', 37 help='prj root path', required=True) 38 parser.add_argument('--policy_dir_list', 39 help='policy dirs need to be included', required=True) 40 parser.add_argument('--components', 41 help='system or vendor or default', required=True) 42 return parser.parse_args() 43 44 45def run_command(in_cmd): 46 cmdstr = " ".join(in_cmd) 47 ret = subprocess.run(cmdstr, shell=True).returncode 48 if ret != 0: 49 raise Exception(ret) 50 51 52def traverse_folder_in_type(search_dir_list, file_suffix): 53 """ 54 for special folder search_dir, find all files endwith file_suffix. 55 :param search_dir: path to search 56 :param file_suffix: postfix of file name 57 :return: file list 58 """ 59 flag = 0 60 policy_file_list = [] 61 62 for item in search_dir_list: 63 for root, _, files in sorted(os.walk(item)): 64 filtered_files = [f for f in files if f.endswith(file_suffix)] 65 for each_file in filtered_files: 66 file_list_path = os.path.join(root, each_file) 67 flag |= check_contexts_file(file_list_path) 68 policy_file_list.append(file_list_path) 69 70 if flag: 71 raise Exception(flag) 72 policy_file_list.sort() 73 return " ".join(str(x) for x in policy_file_list) 74 75 76def check_contexts_file(contexts_file): 77 """ 78 Check the format of contexts file. 79 :param contexts_file: list of te file 80 :return: 81 """ 82 err = 0 83 lines = [] 84 with open(contexts_file, 'rb') as fp: 85 lines = fp.readlines() 86 if len(lines) == 0: 87 return 0 88 last_line = lines[-1] 89 if b'\n' not in last_line: 90 print("".join((contexts_file, " : need an empty line at end \n"))) 91 err = 1 92 for line in lines: 93 if line.endswith(b'\r\n') or line.endswith(b'\r'): 94 print("".join((contexts_file, " : must be unix format\n"))) 95 err = 1 96 break 97 return err 98 99 100def combine_contexts_file(file_contexts_list, combined_file_contexts): 101 cat_cmd = ["cat", 102 file_contexts_list, 103 ">", combined_file_contexts + "_tmp"] 104 run_command(cat_cmd) 105 106 grep_cmd = ["grep -v ^#", 107 combined_file_contexts + "_tmp", 108 "| grep -v ^$", 109 ">", combined_file_contexts] 110 run_command(grep_cmd) 111 112 113def check_redefinition(contexts_file): 114 type_hash = defaultdict(list) 115 err = 0 116 with open(contexts_file, 'r') as contexts_read: 117 pattern = re.compile(r'(\S+)\s+u:object_r:\S+:s0') 118 line_index = 0 119 for line in contexts_read: 120 line_ = line.lstrip() 121 line_index += 1 122 if line_.startswith('#') or line_.strip() == '': 123 continue 124 match = pattern.match(line_) 125 if match: 126 type_hash[match.group(1)].append(line_index) 127 else: 128 print(contexts_file + ":" + 129 str(line_index) + " format check fail") 130 err = 1 131 contexts_read.close() 132 if err: 133 print("***********************************************************") 134 print("please check whether the format meets the following rules:") 135 print("[required format]: * u:object_r:*:s0") 136 print("***********************************************************") 137 raise Exception(err) 138 err = 0 139 for type_key in type_hash.keys(): 140 if len(type_hash[type_key]) > 1: 141 err = 1 142 str_seq = (contexts_file, ":") 143 err_msg = "".join(str_seq) 144 for linenum in type_hash[type_key]: 145 str_seq = (err_msg, str(linenum), ":") 146 err_msg = "".join(str_seq) 147 str_seq = (err_msg, "'type ", str(type_key), " is redefinition'") 148 err_msg = "".join(str_seq) 149 print(err_msg) 150 if err: 151 raise Exception(err) 152 153 154def check_common_contexts(args, contexts_file): 155 """ 156 check whether context used in contexts_file is defined in policy.31. 157 :param args: 158 :param contexts_file: path of contexts file 159 :return: 160 """ 161 check_redefinition(contexts_file) 162 163 check_cmd = [os.path.join(args.tool_path, "sefcontext_compile"), 164 "-o", contexts_file + ".bin", 165 "-p", args.policy_file, 166 contexts_file] 167 run_command(check_cmd) 168 if os.path.exists(contexts_file + ".bin"): 169 os.unlink(contexts_file + ".bin") 170 171 172def echo_error(): 173 print("***********************************************************") 174 print("please check whether the format meets the following rules:") 175 print("[required format]: apl=* name=* domain=* type=*") 176 print("apl=*, apl should be one of system_core|system_basic|normal") 177 print("name=*, name is 'optional'") 178 print("domain=*, hapdomain selinux type") 179 print("type=*, hapdatafile selinux type") 180 print("***********************************************************") 181 182 183def sehap_process_line(line, line_index, contexts_write, domain, contexts_file): 184 line_ = line.lstrip() 185 if line_.startswith('#') or line_.strip() == '': 186 contexts_write.write(line) 187 return 188 189 pattern = re.compile( 190 r'apl=(system_core|system_basic|normal)\s+((name|debuggable)=\S+\s+)?domain=(\S+)\s+type=(\S+)\s*\n') 191 match = pattern.match(line_) 192 if match: 193 if domain: 194 line = match.group(1) + " u:r:" + match.group(4) + ":s0\n" 195 else: 196 line = match.group(1) + " u:object_r:" + match.group(5) + ":s0\n" 197 contexts_write.write(line) 198 else: 199 print(contexts_file + ":" + str(line_index) + " format check fail") 200 raise Exception(1) 201 202def check_sehap_contexts(args, contexts_file, domain): 203 """ 204 check domain or type defined in sehap_contexts. 205 :param args: 206 :param contexts_file: path of contexts file 207 :param domain: true for domain, false for type 208 :return: 209 """ 210 shutil.copyfile(contexts_file, contexts_file + "_bk") 211 try: 212 with open(contexts_file + "_bk", 'r') as contexts_read, open(contexts_file, 'w') as contexts_write: 213 line_index = 0 214 for line in contexts_read: 215 line_index += 1 216 sehap_process_line(line, line_index, contexts_write, domain, contexts_file) 217 except Exception as e: 218 shutil.move(contexts_file + "_bk", contexts_file) 219 echo_error() 220 raise e 221 222 check_cmd = [os.path.join(args.tool_path, "sefcontext_compile"), 223 "-o", contexts_file + ".bin", 224 "-p", args.policy_file, 225 contexts_file] 226 ret = subprocess.run(" ".join(check_cmd), shell=True).returncode 227 228 if ret != 0: 229 shutil.move(contexts_file + "_bk", contexts_file) 230 raise Exception(ret) 231 232 shutil.move(contexts_file + "_bk", contexts_file) 233 234 if os.path.exists(contexts_file + ".bin"): 235 os.unlink(contexts_file + ".bin") 236 237def build_file_contexts(args, output_path, policy_path, all_policy_path): 238 if args.components == "default": 239 all_combined_file_contexts = os.path.join(output_path, "file_contexts") 240 else: 241 all_combined_file_contexts = os.path.join(output_path, "all_file_contexts") 242 file_contexts_list = traverse_folder_in_type(policy_path, "file_contexts") 243 combined_file_contexts = os.path.join(output_path, "file_contexts") 244 combine_contexts_file(file_contexts_list, combined_file_contexts) 245 246 all_file_contexts_list = traverse_folder_in_type( 247 all_policy_path, "file_contexts") 248 combine_contexts_file(all_file_contexts_list, all_combined_file_contexts) 249 250 check_redefinition(all_combined_file_contexts) 251 252 build_bin_cmd = [os.path.join(args.tool_path, "sefcontext_compile"), 253 "-o", os.path.join(args.dst_dir, "file_contexts.bin"), 254 "-p", args.policy_file, 255 all_combined_file_contexts] 256 run_command(build_bin_cmd) 257 258 259def build_common_contexts(args, output_path, contexts_file_name, policy_path, all_policy_path): 260 if args.components == "default": 261 all_combined_contexts = output_path + contexts_file_name 262 else: 263 all_combined_contexts = output_path + "all_" + contexts_file_name 264 contexts_list = traverse_folder_in_type(policy_path, contexts_file_name) 265 combined_contexts = output_path + contexts_file_name 266 combine_contexts_file(contexts_list, combined_contexts) 267 268 all_contexts_list = traverse_folder_in_type(all_policy_path, contexts_file_name) 269 combine_contexts_file(all_contexts_list, all_combined_contexts) 270 check_common_contexts(args, all_combined_contexts) 271 272 273def build_sehap_contexts(args, output_path, policy_path): 274 contexts_list = traverse_folder_in_type( 275 policy_path, "sehap_contexts") 276 277 combined_contexts = os.path.join(output_path, "sehap_contexts") 278 combine_contexts_file(contexts_list, combined_contexts) 279 280 check_sehap_contexts(args, combined_contexts, 1) 281 check_sehap_contexts(args, combined_contexts, 0) 282 283 284def prepare_build_path(dir_list, root_dir, build_dir_list): 285 build_contexts_list = \ 286 ["base/security/selinux_adapter/sepolicy/base", "base/security/selinux_adapter/sepolicy/ohos_policy"] 287 build_contexts_list += dir_list.split(":") 288 289 for i in build_contexts_list: 290 if i == "" or i == "default": 291 continue 292 path = os.path.join(root_dir, i) 293 if (os.path.exists(path)): 294 build_dir_list.append(path) 295 else: 296 print("following path not exists!! {}".format(path)) 297 exit(-1) 298 299 300def traverse_folder_in_dir_name(search_dir, folder_suffix): 301 folder_list = [] 302 for root, dirs, _ in sorted(os.walk(search_dir)): 303 for dir_i in dirs: 304 if dir_i == folder_suffix: 305 folder_list.append(os.path.join(root, dir_i)) 306 return folder_list 307 308 309def main(args): 310 libpcre2_path = os.path.realpath("./clang_x64/thirdparty/pcre2/") 311 os.environ['LD_LIBRARY_PATH'] = libpcre2_path 312 output_path = args.dst_dir 313 policy_path = [] 314 prepare_build_path(args.policy_dir_list, args.source_root_dir, policy_path) 315 316 public_policy = [] 317 system_policy = [] 318 vendor_policy = [] 319 320 for item in policy_path: 321 public_policy += traverse_folder_in_dir_name(item, "public") 322 system_policy += traverse_folder_in_dir_name(item, "system") 323 vendor_policy += traverse_folder_in_dir_name(item, "vendor") 324 325 system_folder_list = public_policy + system_policy 326 vendor_folder_list = public_policy + vendor_policy 327 all_folder_list = public_policy + system_policy + vendor_policy 328 329 folder_list = [] 330 331 if args.components == "system": 332 folder_list = system_folder_list 333 elif args.components == "vendor": 334 folder_list = vendor_folder_list 335 else: 336 folder_list = all_folder_list 337 338 build_file_contexts(args, output_path, folder_list, all_folder_list) 339 build_common_contexts(args, output_path, "service_contexts", folder_list, all_folder_list) 340 build_common_contexts(args, output_path, "hdf_service_contexts", folder_list, all_folder_list) 341 build_common_contexts(args, output_path, "parameter_contexts", folder_list, all_folder_list) 342 build_sehap_contexts(args, output_path, all_folder_list) 343 344 345if __name__ == "__main__": 346 input_args = parse_args() 347 main(input_args) 348