1# Copyright 2021 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from ctypes import * 16import re 17import os 18import sys 19import platform 20import fc_sort 21 22### 23# Check whether the regex will match a file path starting with the provided 24# prefix 25# 26# Compares regex entries in file_contexts with a path prefix. Regex entries 27# are often more specific than this file prefix. For example, the regex could 28# be /system/bin/foo\.sh and the prefix could be /system. This function 29# loops over the regex removing characters from the end until 30# 1) there is a match - return True or 2) run out of characters - return 31# False. 32# 33def MatchPathPrefix(pathregex, prefix): 34 for i in range(len(pathregex), 0, -1): 35 try: 36 pattern = re.compile('^' + pathregex[0:i] + "$") 37 except: 38 continue 39 if pattern.match(prefix): 40 return True 41 return False 42 43def MatchPathPrefixes(pathregex, Prefixes): 44 for Prefix in Prefixes: 45 if MatchPathPrefix(pathregex, Prefix): 46 return True 47 return False 48 49class TERule: 50 def __init__(self, rule): 51 data = rule.split(',') 52 self.flavor = data[0] 53 self.sctx = data[1] 54 self.tctx = data[2] 55 self.tclass = data[3] 56 self.perms = set((data[4].strip()).split(' ')) 57 self.rule = rule 58 59class Policy: 60 __ExpandedRules = set() 61 __Rules = set() 62 __FcDict = None 63 __FcSorted = None 64 __GenfsDict = None 65 __libsepolwrap = None 66 __policydbP = None 67 __BUFSIZE = 2048 68 69 def AssertPathTypesDoNotHaveAttr(self, MatchPrefix, DoNotMatchPrefix, Attr, ExcludedTypes = []): 70 # Query policy for the types associated with Attr 71 TypesPol = self.QueryTypeAttribute(Attr, True) - set(ExcludedTypes) 72 # Search file_contexts to find types associated with input paths. 73 TypesFc, Files = self.__GetTypesAndFilesByFilePathPrefix(MatchPrefix, DoNotMatchPrefix) 74 violators = TypesFc.intersection(TypesPol) 75 ret = "" 76 if len(violators) > 0: 77 ret += "The following types on " 78 ret += " ".join(str(x) for x in sorted(MatchPrefix)) 79 ret += " must not be associated with the " 80 ret += "\"" + Attr + "\" attribute: " 81 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 82 ret += " corresponding to files: " 83 ret += " ".join(str(x) for x in sorted(Files)) + "\n" 84 return ret 85 86 # Check that all types for "filesystem" have "attribute" associated with them 87 # for types labeled in genfs_contexts. 88 def AssertGenfsFilesystemTypesHaveAttr(self, Filesystem, Attr): 89 TypesPol = self.QueryTypeAttribute(Attr, True) 90 TypesGenfs = self.__GenfsDict[Filesystem] 91 violators = TypesGenfs.difference(TypesPol) 92 93 ret = "" 94 if len(violators) > 0: 95 ret += "The following types in " + Filesystem 96 ret += " must be associated with the " 97 ret += "\"" + Attr + "\" attribute: " 98 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 99 return ret 100 101 # Check that path prefixes that match MatchPrefix, and do not Match 102 # DoNotMatchPrefix have the attribute Attr. 103 # For example assert that all types in /sys, and not in /sys/kernel/debugfs 104 # have the sysfs_type attribute. 105 def AssertPathTypesHaveAttr(self, MatchPrefix, DoNotMatchPrefix, Attr): 106 # Query policy for the types associated with Attr 107 TypesPol = self.QueryTypeAttribute(Attr, True) 108 # Search file_contexts to find paths/types that should be associated with 109 # Attr. 110 TypesFc, Files = self.__GetTypesAndFilesByFilePathPrefix(MatchPrefix, DoNotMatchPrefix) 111 violators = TypesFc.difference(TypesPol) 112 113 ret = "" 114 if len(violators) > 0: 115 ret += "The following types on " 116 ret += " ".join(str(x) for x in sorted(MatchPrefix)) 117 ret += " must be associated with the " 118 ret += "\"" + Attr + "\" attribute: " 119 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 120 ret += " corresponding to files: " 121 ret += " ".join(str(x) for x in sorted(Files)) + "\n" 122 return ret 123 124 def AssertPropertyOwnersAreExclusive(self): 125 systemProps = self.QueryTypeAttribute('system_property_type', True) 126 vendorProps = self.QueryTypeAttribute('vendor_property_type', True) 127 violators = systemProps.intersection(vendorProps) 128 ret = "" 129 if len(violators) > 0: 130 ret += "The following types have both system_property_type " 131 ret += "and vendor_property_type: " 132 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 133 return ret 134 135 # Return all file_contexts entries that map to the input Type. 136 def QueryFc(self, Type): 137 if Type in self.__FcDict: 138 return self.__FcDict[Type] 139 else: 140 return None 141 142 # Return all attributes associated with a type if IsAttr=False or 143 # all types associated with an attribute if IsAttr=True 144 def QueryTypeAttribute(self, Type, IsAttr): 145 TypeIterP = self.__libsepolwrap.init_type_iter(self.__policydbP, 146 create_string_buffer(Type.encode("ascii")), IsAttr) 147 if (TypeIterP == None): 148 sys.exit("Failed to initialize type iterator") 149 buf = create_string_buffer(self.__BUFSIZE) 150 TypeAttr = set() 151 while True: 152 ret = self.__libsepolwrap.get_type(buf, self.__BUFSIZE, 153 self.__policydbP, TypeIterP) 154 if ret == 0: 155 TypeAttr.add(buf.value.decode("ascii")) 156 continue 157 if ret == 1: 158 break; 159 # We should never get here. 160 sys.exit("Failed to import policy") 161 self.__libsepolwrap.destroy_type_iter(TypeIterP) 162 return TypeAttr 163 164 def __TERuleMatch(self, Rule, **kwargs): 165 # Match source type 166 if ("scontext" in kwargs and 167 len(kwargs['scontext']) > 0 and 168 Rule.sctx not in kwargs['scontext']): 169 return False 170 # Match target type 171 if ("tcontext" in kwargs and 172 len(kwargs['tcontext']) > 0 and 173 Rule.tctx not in kwargs['tcontext']): 174 return False 175 # Match target class 176 if ("tclass" in kwargs and 177 len(kwargs['tclass']) > 0 and 178 not bool(set([Rule.tclass]) & kwargs['tclass'])): 179 return False 180 # Match any perms 181 if ("perms" in kwargs and 182 len(kwargs['perms']) > 0 and 183 not bool(Rule.perms & kwargs['perms'])): 184 return False 185 return True 186 187 # resolve a type to its attributes or 188 # resolve an attribute to its types and attributes 189 # For example if scontext is the domain attribute, then we need to 190 # include all types with the domain attribute such as untrusted_app and 191 # priv_app and all the attributes of those types such as appdomain. 192 def ResolveTypeAttribute(self, Type): 193 types = self.GetAllTypes(False) 194 attributes = self.GetAllTypes(True) 195 196 if Type in types: 197 return self.QueryTypeAttribute(Type, False) 198 elif Type in attributes: 199 TypesAndAttributes = set() 200 Types = self.QueryTypeAttribute(Type, True) 201 TypesAndAttributes |= Types 202 for T in Types: 203 TypesAndAttributes |= self.QueryTypeAttribute(T, False) 204 return TypesAndAttributes 205 else: 206 return set() 207 208 # Return all TERules that match: 209 # (any scontext) or (any tcontext) or (any tclass) or (any perms), 210 # perms. 211 # Any unspecified paramenter will match all. 212 # 213 # Example: QueryTERule(tcontext=["foo", "bar"], perms=["entrypoint"]) 214 # Will return any rule with: 215 # (tcontext="foo" or tcontext="bar") and ("entrypoint" in perms) 216 def QueryTERule(self, **kwargs): 217 if len(self.__Rules) == 0: 218 self.__InitTERules() 219 220 # add any matching types and attributes for scontext and tcontext 221 if ("scontext" in kwargs and len(kwargs['scontext']) > 0): 222 scontext = set() 223 for sctx in kwargs['scontext']: 224 scontext |= self.ResolveTypeAttribute(sctx) 225 kwargs['scontext'] = scontext 226 if ("tcontext" in kwargs and len(kwargs['tcontext']) > 0): 227 tcontext = set() 228 for tctx in kwargs['tcontext']: 229 tcontext |= self.ResolveTypeAttribute(tctx) 230 kwargs['tcontext'] = tcontext 231 for Rule in self.__Rules: 232 if self.__TERuleMatch(Rule, **kwargs): 233 yield Rule 234 235 # Same as QueryTERule but only using the expanded ruleset. 236 # i.e. all attributes have been expanded to their various types. 237 def QueryExpandedTERule(self, **kwargs): 238 if len(self.__ExpandedRules) == 0: 239 self.__InitExpandedTERules() 240 for Rule in self.__ExpandedRules: 241 if self.__TERuleMatch(Rule, **kwargs): 242 yield Rule 243 244 def GetAllTypes(self, isAttr): 245 TypeIterP = self.__libsepolwrap.init_type_iter(self.__policydbP, None, isAttr) 246 if (TypeIterP == None): 247 sys.exit("Failed to initialize type iterator") 248 buf = create_string_buffer(self.__BUFSIZE) 249 AllTypes = set() 250 while True: 251 ret = self.__libsepolwrap.get_type(buf, self.__BUFSIZE, 252 self.__policydbP, TypeIterP) 253 if ret == 0: 254 AllTypes.add(buf.value.decode("ascii")) 255 continue 256 if ret == 1: 257 break; 258 # We should never get here. 259 sys.exit("Failed to import policy") 260 self.__libsepolwrap.destroy_type_iter(TypeIterP) 261 return AllTypes 262 263 def __ExactMatchPathPrefix(self, pathregex, prefix): 264 pattern = re.compile('^' + pathregex + "$") 265 if pattern.match(prefix): 266 return True 267 return False 268 269 # Return a tuple (prefix, i) where i is the index of the most specific 270 # match of prefix in the sorted file_contexts. This is useful for limiting a 271 # file_contexts search to matches that are more specific and omitting less 272 # specific matches. For example, finding all matches to prefix /data/vendor 273 # should not include /data(/.*)? if /data/vendor(/.*)? is also specified. 274 def __FcSortedIndex(self, prefix): 275 index = 0 276 for i in range(0, len(self.__FcSorted)): 277 if self.__ExactMatchPathPrefix(self.__FcSorted[i].path, prefix): 278 index = i 279 return prefix, index 280 281 # Return a tuple of (path, Type) for all matching paths. Use the sorted 282 # file_contexts and index returned from __FcSortedIndex() to limit results 283 # to results that are more specific than the prefix. 284 def __MatchPathPrefixTypes(self, prefix, index): 285 PathType = [] 286 for i in range(index, len(self.__FcSorted)): 287 if MatchPathPrefix(self.__FcSorted[i].path, prefix): 288 PathType.append((self.__FcSorted[i].path, self.__FcSorted[i].type)) 289 return PathType 290 291 # Return types that match MatchPrefixes but do not match 292 # DoNotMatchPrefixes 293 def __GetTypesAndFilesByFilePathPrefix(self, MatchPrefixes, DoNotMatchPrefixes): 294 Types = set() 295 Files = set() 296 297 MatchPrefixesWithIndex = [] 298 for MatchPrefix in MatchPrefixes: 299 MatchPrefixesWithIndex.append(self.__FcSortedIndex(MatchPrefix)) 300 301 for MatchPrefixWithIndex in MatchPrefixesWithIndex: 302 PathTypes = self.__MatchPathPrefixTypes(*MatchPrefixWithIndex) 303 for PathType in PathTypes: 304 if MatchPathPrefixes(PathType[0], DoNotMatchPrefixes): 305 continue 306 Types.add(PathType[1]) 307 Files.add(PathType[0]) 308 return Types, Files 309 310 def __GetTERules(self, policydbP, avtabIterP, Rules): 311 if Rules is None: 312 Rules = set() 313 buf = create_string_buffer(self.__BUFSIZE) 314 ret = 0 315 while True: 316 ret = self.__libsepolwrap.get_allow_rule(buf, self.__BUFSIZE, 317 policydbP, avtabIterP) 318 if ret == 0: 319 Rule = TERule(buf.value.decode("ascii")) 320 Rules.add(Rule) 321 continue 322 if ret == 1: 323 break; 324 # We should never get here. 325 sys.exit("Failed to import policy") 326 327 def __InitTERules(self): 328 avtabIterP = self.__libsepolwrap.init_avtab(self.__policydbP) 329 if (avtabIterP == None): 330 sys.exit("Failed to initialize avtab") 331 self.__GetTERules(self.__policydbP, avtabIterP, self.__Rules) 332 self.__libsepolwrap.destroy_avtab(avtabIterP) 333 avtabIterP = self.__libsepolwrap.init_cond_avtab(self.__policydbP) 334 if (avtabIterP == None): 335 sys.exit("Failed to initialize conditional avtab") 336 self.__GetTERules(self.__policydbP, avtabIterP, self.__Rules) 337 self.__libsepolwrap.destroy_avtab(avtabIterP) 338 339 def __InitExpandedTERules(self): 340 avtabIterP = self.__libsepolwrap.init_expanded_avtab(self.__policydbP) 341 if (avtabIterP == None): 342 sys.exit("Failed to initialize avtab") 343 self.__GetTERules(self.__policydbP, avtabIterP, self.__ExpandedRules) 344 self.__libsepolwrap.destroy_expanded_avtab(avtabIterP) 345 avtabIterP = self.__libsepolwrap.init_expanded_cond_avtab(self.__policydbP) 346 if (avtabIterP == None): 347 sys.exit("Failed to initialize conditional avtab") 348 self.__GetTERules(self.__policydbP, avtabIterP, self.__ExpandedRules) 349 self.__libsepolwrap.destroy_expanded_avtab(avtabIterP) 350 351 # load ctypes-ified libsepol wrapper 352 def __InitLibsepolwrap(self, LibPath): 353 lib = CDLL(LibPath) 354 355 # int get_allow_rule(char *out, size_t len, void *policydbp, void *avtab_iterp); 356 lib.get_allow_rule.restype = c_int 357 lib.get_allow_rule.argtypes = [c_char_p, c_size_t, c_void_p, c_void_p]; 358 # void *load_policy(const char *policy_path); 359 lib.load_policy.restype = c_void_p 360 lib.load_policy.argtypes = [c_char_p] 361 # void destroy_policy(void *policydbp); 362 lib.destroy_policy.argtypes = [c_void_p] 363 # void *init_expanded_avtab(void *policydbp); 364 lib.init_expanded_avtab.restype = c_void_p 365 lib.init_expanded_avtab.argtypes = [c_void_p] 366 # void *init_expanded_cond_avtab(void *policydbp); 367 lib.init_expanded_cond_avtab.restype = c_void_p 368 lib.init_expanded_cond_avtab.argtypes = [c_void_p] 369 # void destroy_expanded_avtab(void *avtab_iterp); 370 lib.destroy_expanded_avtab.argtypes = [c_void_p] 371 # void *init_avtab(void *policydbp); 372 lib.init_avtab.restype = c_void_p 373 lib.init_avtab.argtypes = [c_void_p] 374 # void *init_cond_avtab(void *policydbp); 375 lib.init_cond_avtab.restype = c_void_p 376 lib.init_cond_avtab.argtypes = [c_void_p] 377 # void destroy_avtab(void *avtab_iterp); 378 lib.destroy_avtab.argtypes = [c_void_p] 379 # int get_type(char *out, size_t max_size, void *policydbp, void *type_iterp); 380 lib.get_type.restype = c_int 381 lib.get_type.argtypes = [c_char_p, c_size_t, c_void_p, c_void_p] 382 # void *init_type_iter(void *policydbp, const char *type, bool is_attr); 383 lib.init_type_iter.restype = c_void_p 384 lib.init_type_iter.argtypes = [c_void_p, c_char_p, c_bool] 385 # void destroy_type_iter(void *type_iterp); 386 lib.destroy_type_iter.argtypes = [c_void_p] 387 # void *init_genfs_iter(void *policydbp) 388 lib.init_genfs_iter.restype = c_void_p 389 lib.init_genfs_iter.argtypes = [c_void_p] 390 # int get_genfs(char *out, size_t max_size, void *genfs_iterp); 391 lib.get_genfs.restype = c_int 392 lib.get_genfs.argtypes = [c_char_p, c_size_t, c_void_p, c_void_p] 393 # void destroy_genfs_iter(void *genfs_iterp) 394 lib.destroy_genfs_iter.argtypes = [c_void_p] 395 396 self.__libsepolwrap = lib 397 398 def __GenfsDictAdd(self, Dict, buf): 399 fs, buf = buf.split(' ', 1) 400 path, context = buf.rsplit(' ', 1) 401 Type = context.split(":")[2] 402 if not fs in Dict: 403 Dict[fs] = {Type} 404 else: 405 Dict[fs].add(Type) 406 407 def __InitGenfsCon(self): 408 self.__GenfsDict = {} 409 GenfsIterP = self.__libsepolwrap.init_genfs_iter(self.__policydbP) 410 if (GenfsIterP == None): 411 sys.exit("Failed to retreive genfs entries") 412 buf = create_string_buffer(self.__BUFSIZE) 413 while True: 414 ret = self.__libsepolwrap.get_genfs(buf, self.__BUFSIZE, 415 self.__policydbP, GenfsIterP) 416 if ret == 0: 417 self.__GenfsDictAdd(self.__GenfsDict, buf.value.decode("ascii")) 418 continue 419 if ret == 1: 420 self.__GenfsDictAdd(self.__GenfsDict, buf.value.decode("ascii")) 421 break; 422 # We should never get here. 423 sys.exit("Failed to get genfs entries") 424 self.__libsepolwrap.destroy_genfs_iter(GenfsIterP) 425 426 # load file_contexts 427 def __InitFC(self, FcPaths): 428 if FcPaths is None: 429 return 430 fc = [] 431 for path in FcPaths: 432 if not os.path.exists(path): 433 sys.exit("file_contexts file " + path + " does not exist.") 434 fd = open(path, "r") 435 fc += fd.readlines() 436 fd.close() 437 self.__FcDict = {} 438 for i in fc: 439 rec = i.split() 440 try: 441 t = rec[-1].split(":")[2] 442 if t in self.__FcDict: 443 self.__FcDict[t].append(rec[0]) 444 else: 445 self.__FcDict[t] = [rec[0]] 446 except: 447 pass 448 self.__FcSorted = fc_sort.sort(FcPaths) 449 450 # load policy 451 def __InitPolicy(self, PolicyPath): 452 cPolicyPath = create_string_buffer(PolicyPath.encode("ascii")) 453 self.__policydbP = self.__libsepolwrap.load_policy(cPolicyPath) 454 if (self.__policydbP is None): 455 sys.exit("Failed to load policy") 456 457 def __init__(self, PolicyPath, FcPaths, LibPath): 458 self.__InitLibsepolwrap(LibPath) 459 self.__InitFC(FcPaths) 460 self.__InitPolicy(PolicyPath) 461 self.__InitGenfsCon() 462 463 def __del__(self): 464 if self.__policydbP is not None: 465 self.__libsepolwrap.destroy_policy(self.__policydbP) 466