1#!/usr/bin/env python 2# coding: utf-8 3 4""" 5Copyright (c) 2021-2022 Huawei Device Co., Ltd. 6Licensed under the Apache License, Version 2.0 (the "License"); 7you may not use this file except in compliance with the License. 8You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12Unless required by applicable law or agreed to in writing, software 13distributed under the License is distributed on an "AS IS" BASIS, 14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15See the License for the specific language governing permissions and 16limitations under the License. 17 18""" 19 20import os 21import argparse 22import re 23import shutil 24import subprocess 25import sys 26import platform 27from collections import defaultdict 28 29 30def parse_args(): 31 parser = argparse.ArgumentParser() 32 parser.add_argument( 33 '--dst-dir', help='the output dest path', required=True) 34 parser.add_argument('--tool-path', 35 help='the sefcontext_compile bin path', required=True) 36 parser.add_argument('--policy-file', 37 help='the policy.31 file', required=True) 38 parser.add_argument('--source-root-dir', 39 help='prj root path', required=True) 40 parser.add_argument('--policy_dir_list', 41 help='policy dirs need to be included', required=True) 42 parser.add_argument('--components', 43 help='system or vendor or default', required=True) 44 return parser.parse_args() 45 46 47def run_command(in_cmd): 48 cmdstr = " ".join(in_cmd) 49 ret = subprocess.run(cmdstr, shell=True).returncode 50 if ret != 0: 51 raise Exception(ret) 52 53 54def traverse_folder_in_type(search_dir_list, file_suffix): 55 """ 56 for special folder search_dir, find all files endwith file_suffix. 57 :param search_dir: path to search 58 :param file_suffix: postfix of file name 59 :return: file list 60 """ 61 flag = 0 62 policy_file_list = [] 63 64 for item in search_dir_list: 65 for root, _, files in sorted(os.walk(item)): 66 filtered_files = [f for f in files if f.endswith(file_suffix)] 67 for each_file in filtered_files: 68 file_list_path = os.path.join(root, each_file) 69 flag |= check_contexts_file(file_list_path) 70 policy_file_list.append(file_list_path) 71 72 if flag: 73 raise Exception(flag) 74 policy_file_list.sort() 75 return " ".join(str(x) for x in policy_file_list) 76 77 78def check_contexts_file(contexts_file): 79 """ 80 Check the format of contexts file. 81 :param contexts_file: list of te file 82 :return: 83 """ 84 err = 0 85 lines = [] 86 with open(contexts_file, 'rb') as fp: 87 lines = fp.readlines() 88 if len(lines) == 0: 89 return 0 90 last_line = lines[-1] 91 if b'\n' not in last_line: 92 print("".join((contexts_file, " : need an empty line at end \n"))) 93 err = 1 94 for line in lines: 95 if line.endswith(b'\r\n') or line.endswith(b'\r'): 96 print("".join((contexts_file, " : must be unix format\n"))) 97 err = 1 98 break 99 return err 100 101 102def combine_contexts_file(file_contexts_list, combined_file_contexts): 103 cat_cmd = ["cat", 104 file_contexts_list, 105 ">", combined_file_contexts + "_tmp"] 106 run_command(cat_cmd) 107 108 grep_cmd = ["grep -v ^#", 109 combined_file_contexts + "_tmp", 110 "| grep -v ^$", 111 ">", combined_file_contexts] 112 run_command(grep_cmd) 113 114 115def check_redefinition(contexts_file): 116 type_hash = defaultdict(list) 117 err = 0 118 with open(contexts_file, 'r') as contexts_read: 119 pattern = re.compile(r'(\S+)\s+u:object_r:\S+:s0') 120 line_index = 0 121 for line in contexts_read: 122 line_ = line.lstrip() 123 line_index += 1 124 if line_.startswith('#') or line_.strip() == '': 125 continue 126 match = pattern.match(line_) 127 if match: 128 type_hash[match.group(1)].append(line_index) 129 else: 130 print(contexts_file + ":" + 131 str(line_index) + " format check fail") 132 err = 1 133 contexts_read.close() 134 if err: 135 print("***********************************************************") 136 print("please check whether the format meets the following rules:") 137 print("[required format]: * u:object_r:*:s0") 138 print("***********************************************************") 139 raise Exception(err) 140 err = 0 141 for type_key in type_hash.keys(): 142 if len(type_hash[type_key]) > 1: 143 err = 1 144 str_seq = (contexts_file, ":") 145 err_msg = "".join(str_seq) 146 for linenum in type_hash[type_key]: 147 str_seq = (err_msg, str(linenum), ":") 148 err_msg = "".join(str_seq) 149 str_seq = (err_msg, "'type ", str(type_key), " is redefinition'") 150 err_msg = "".join(str_seq) 151 print(err_msg) 152 if err: 153 raise Exception(err) 154 155 156def check_common_contexts(args, contexts_file): 157 """ 158 check whether context used in contexts_file is defined in policy.31. 159 :param args: 160 :param contexts_file: path of contexts file 161 :return: 162 """ 163 check_redefinition(contexts_file) 164 165 check_cmd = [os.path.join(args.tool_path, "sefcontext_compile"), 166 "-o", contexts_file + ".bin", 167 "-p", args.policy_file, 168 contexts_file] 169 run_command(check_cmd) 170 if os.path.exists(contexts_file + ".bin"): 171 os.unlink(contexts_file + ".bin") 172 173 174def echo_error(): 175 print("***********************************************************") 176 print("please check whether the format meets the following rules:") 177 print("[required format]: apl=* [name=*] [debuggable=*] [extension=*] [extra=*] domain=* type=*") 178 print("apl=*, apl should be one of system_core|system_basic|normal") 179 print("name=*, name is 'optional'") 180 print("debuggable=*, debuggable is 'optional'") 181 print("extension=*, extension is 'optional'") 182 print("extra=*, extra is 'optional'") 183 print("domain=*, hapdomain selinux type") 184 print("type=*, hapdatafile selinux type") 185 print("***********************************************************") 186 187 188def sehap_check_line(line, line_index, contexts_write, domain, contexts_file): 189 line_ = line.lstrip() 190 if line_.startswith('#') or line_.strip() == '': 191 contexts_write.write(line) 192 return 193 194 pattern = re.compile( 195 r'apl=(system_core|system_basic|normal)\s+' 196 r'((name|debuggable)=\S+\s+)?' 197 r'(extra=\S+\s+)?' 198 r'(extension=\S+\s+)?' 199 r'domain=(\S+)\s+' 200 r'type=(\S+)\s*' 201 r'.*\n', 202 re.DOTALL 203 ) 204 match = pattern.match(line_) 205 if match: 206 if domain: 207 line = match.group(1) + " u:r:" + match.group(6) + ":s0\n" 208 else: 209 line = match.group(1) + " u:object_r:" + match.group(7) + ":s0\n" 210 contexts_write.write(line) 211 else: 212 print(contexts_file + ":" + str(line_index) + " format check fail") 213 raise Exception(1) 214 215 216def check_sehap_contexts(args, contexts_file, domain): 217 """ 218 check domain or type defined in sehap_contexts. 219 :param args: 220 :param contexts_file: path of contexts file 221 :param domain: true for domain, false for type 222 :return: 223 """ 224 shutil.copyfile(contexts_file, contexts_file + "_bk") 225 try: 226 with open(contexts_file + "_bk", 'r') as contexts_read, open(contexts_file, 'w') as contexts_write: 227 line_index = 0 228 for line in contexts_read: 229 line_index += 1 230 sehap_check_line(line, line_index, contexts_write, domain, contexts_file) 231 except Exception as e: 232 shutil.move(contexts_file + "_bk", contexts_file) 233 echo_error() 234 raise e 235 236 check_cmd = [os.path.join(args.tool_path, "sefcontext_compile"), 237 "-o", contexts_file + ".bin", 238 "-p", args.policy_file, 239 contexts_file] 240 ret = subprocess.run(" ".join(check_cmd), shell=True).returncode 241 242 if ret != 0: 243 shutil.move(contexts_file + "_bk", contexts_file) 244 raise Exception(ret) 245 246 shutil.move(contexts_file + "_bk", contexts_file) 247 248 if os.path.exists(contexts_file + ".bin"): 249 os.unlink(contexts_file + ".bin") 250 251 252def build_file_contexts(args, output_path, policy_path, all_policy_path): 253 if args.components == "default": 254 all_combined_file_contexts = os.path.join(output_path, "file_contexts") 255 else: 256 all_combined_file_contexts = os.path.join(output_path, "all_file_contexts") 257 file_contexts_list = traverse_folder_in_type(policy_path, "file_contexts") 258 combined_file_contexts = os.path.join(output_path, "file_contexts") 259 combine_contexts_file(file_contexts_list, combined_file_contexts) 260 261 all_file_contexts_list = traverse_folder_in_type( 262 all_policy_path, "file_contexts") 263 combine_contexts_file(all_file_contexts_list, all_combined_file_contexts) 264 265 check_redefinition(all_combined_file_contexts) 266 267 build_bin_cmd = [os.path.join(args.tool_path, "sefcontext_compile"), 268 "-o", os.path.join(args.dst_dir, "file_contexts.bin"), 269 "-p", args.policy_file, 270 all_combined_file_contexts] 271 run_command(build_bin_cmd) 272 273 274def build_common_contexts(args, output_path, contexts_file_name, policy_path, all_policy_path): 275 if args.components == "default": 276 all_combined_contexts = output_path + contexts_file_name 277 else: 278 all_combined_contexts = output_path + "all_" + contexts_file_name 279 contexts_list = traverse_folder_in_type(policy_path, contexts_file_name) 280 combined_contexts = output_path + contexts_file_name 281 combine_contexts_file(contexts_list, combined_contexts) 282 283 all_contexts_list = traverse_folder_in_type(all_policy_path, contexts_file_name) 284 combine_contexts_file(all_contexts_list, all_combined_contexts) 285 check_common_contexts(args, all_combined_contexts) 286 287 288def build_sehap_contexts(args, output_path, policy_path): 289 contexts_list = traverse_folder_in_type( 290 policy_path, "sehap_contexts") 291 292 combined_contexts = os.path.join(output_path, "sehap_contexts") 293 combine_contexts_file(contexts_list, combined_contexts) 294 295 check_sehap_contexts(args, combined_contexts, 1) 296 check_sehap_contexts(args, combined_contexts, 0) 297 298 299def prepare_build_path(dir_list, root_dir, build_dir_list): 300 build_contexts_list = \ 301 ["base/security/selinux_adapter/sepolicy/base", "base/security/selinux_adapter/sepolicy/ohos_policy"] 302 build_contexts_list += dir_list.split(":") 303 304 for i in build_contexts_list: 305 if i == "" or i == "default": 306 continue 307 path = os.path.join(root_dir, i) 308 if (os.path.exists(path)): 309 build_dir_list.append(path) 310 else: 311 print("following path not exists!! {}".format(path)) 312 exit(-1) 313 314 315def traverse_folder_in_dir_name(search_dir, folder_suffix): 316 folder_list = [] 317 for root, dirs, _ in sorted(os.walk(search_dir)): 318 for dir_i in dirs: 319 if dir_i == folder_suffix: 320 folder_list.append(os.path.join(root, dir_i)) 321 return folder_list 322 323 324def main(args): 325 if sys.platform == "linux" and platform.machine().lower() == "aarch64": 326 libpcre2_path = os.path.realpath("./clang_arm64/thirdparty/pcre2/") 327 else: 328 libpcre2_path = os.path.realpath("./clang_x64/thirdparty/pcre2/") 329 os.environ['LD_LIBRARY_PATH'] = libpcre2_path 330 output_path = args.dst_dir 331 policy_path = [] 332 prepare_build_path(args.policy_dir_list, args.source_root_dir, policy_path) 333 334 public_policy = [] 335 system_policy = [] 336 vendor_policy = [] 337 338 for item in policy_path: 339 public_policy += traverse_folder_in_dir_name(item, "public") 340 system_policy += traverse_folder_in_dir_name(item, "system") 341 vendor_policy += traverse_folder_in_dir_name(item, "vendor") 342 343 system_folder_list = public_policy + system_policy 344 vendor_folder_list = public_policy + vendor_policy 345 all_folder_list = public_policy + system_policy + vendor_policy 346 347 folder_list = [] 348 349 if args.components == "system": 350 folder_list = system_folder_list 351 elif args.components == "vendor": 352 folder_list = vendor_folder_list 353 else: 354 folder_list = all_folder_list 355 356 build_file_contexts(args, output_path, folder_list, all_folder_list) 357 build_common_contexts(args, output_path, "service_contexts", folder_list, all_folder_list) 358 build_common_contexts(args, output_path, "hdf_service_contexts", folder_list, all_folder_list) 359 build_common_contexts(args, output_path, "parameter_contexts", folder_list, all_folder_list) 360 build_sehap_contexts(args, output_path, all_folder_list) 361 362 363if __name__ == "__main__": 364 input_args = parse_args() 365 main(input_args) 366