1from ctypes import * 2import re 3import os 4import sys 5import platform 6import fc_sort 7 8### 9# Check whether the regex will match a file path starting with the provided 10# prefix 11# 12# Compares regex entries in file_contexts with a path prefix. Regex entries 13# are often more specific than this file prefix. For example, the regex could 14# be /system/bin/foo\.sh and the prefix could be /system. This function 15# loops over the regex removing characters from the end until 16# 1) there is a match - return True or 2) run out of characters - return 17# False. 18# 19def MatchPathPrefix(pathregex, prefix): 20 for i in range(len(pathregex), 0, -1): 21 try: 22 pattern = re.compile('^' + pathregex[0:i] + "$") 23 except: 24 continue 25 if pattern.match(prefix): 26 return True 27 return False 28 29def MatchPathPrefixes(pathregex, Prefixes): 30 for Prefix in Prefixes: 31 if MatchPathPrefix(pathregex, Prefix): 32 return True 33 return False 34 35class TERule: 36 def __init__(self, rule): 37 data = rule.split(',') 38 self.flavor = data[0] 39 self.sctx = data[1] 40 self.tctx = data[2] 41 self.tclass = data[3] 42 self.perms = set((data[4].strip()).split(' ')) 43 self.rule = rule 44 45class Policy: 46 __ExpandedRules = set() 47 __Rules = set() 48 __FcDict = None 49 __FcSorted = None 50 __GenfsDict = None 51 __libsepolwrap = None 52 __policydbP = None 53 __BUFSIZE = 2048 54 55 def AssertPathTypesDoNotHaveAttr(self, MatchPrefix, DoNotMatchPrefix, Attr, ExcludedTypes = []): 56 # Query policy for the types associated with Attr 57 TypesPol = self.QueryTypeAttribute(Attr, True) - set(ExcludedTypes) 58 # Search file_contexts to find types associated with input paths. 59 TypesFc, Files = self.__GetTypesAndFilesByFilePathPrefix(MatchPrefix, DoNotMatchPrefix) 60 violators = TypesFc.intersection(TypesPol) 61 ret = "" 62 if len(violators) > 0: 63 ret += "The following types on " 64 ret += " ".join(str(x) for x in sorted(MatchPrefix)) 65 ret += " must not be associated with the " 66 ret += "\"" + Attr + "\" attribute: " 67 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 68 ret += " corresponding to files: " 69 ret += " ".join(str(x) for x in sorted(Files)) + "\n" 70 return ret 71 72 # Check that all types for "filesystem" have "attribute" associated with them 73 # for types labeled in genfs_contexts. 74 def AssertGenfsFilesystemTypesHaveAttr(self, Filesystem, Attr): 75 TypesPol = self.QueryTypeAttribute(Attr, True) 76 TypesGenfs = self.__GenfsDict[Filesystem] 77 violators = TypesGenfs.difference(TypesPol) 78 79 ret = "" 80 if len(violators) > 0: 81 ret += "The following types in " + Filesystem 82 ret += " must be associated with the " 83 ret += "\"" + Attr + "\" attribute: " 84 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 85 return ret 86 87 # Check that path prefixes that match MatchPrefix, and do not Match 88 # DoNotMatchPrefix have the attribute Attr. 89 # For example assert that all types in /sys, and not in /sys/kernel/debugfs 90 # have the sysfs_type attribute. 91 def AssertPathTypesHaveAttr(self, MatchPrefix, DoNotMatchPrefix, Attr): 92 # Query policy for the types associated with Attr 93 TypesPol = self.QueryTypeAttribute(Attr, True) 94 # Search file_contexts to find paths/types that should be associated with 95 # Attr. 96 TypesFc, Files = self.__GetTypesAndFilesByFilePathPrefix(MatchPrefix, DoNotMatchPrefix) 97 violators = TypesFc.difference(TypesPol) 98 99 ret = "" 100 if len(violators) > 0: 101 ret += "The following types on " 102 ret += " ".join(str(x) for x in sorted(MatchPrefix)) 103 ret += " must be associated with the " 104 ret += "\"" + Attr + "\" attribute: " 105 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 106 ret += " corresponding to files: " 107 ret += " ".join(str(x) for x in sorted(Files)) + "\n" 108 return ret 109 110 def AssertPropertyOwnersAreExclusive(self): 111 systemProps = self.QueryTypeAttribute('system_property_type', True) 112 vendorProps = self.QueryTypeAttribute('vendor_property_type', True) 113 violators = systemProps.intersection(vendorProps) 114 ret = "" 115 if len(violators) > 0: 116 ret += "The following types have both system_property_type " 117 ret += "and vendor_property_type: " 118 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 119 return ret 120 121 # Return all file_contexts entries that map to the input Type. 122 def QueryFc(self, Type): 123 if Type in self.__FcDict: 124 return self.__FcDict[Type] 125 else: 126 return None 127 128 # Return all attributes associated with a type if IsAttr=False or 129 # all types associated with an attribute if IsAttr=True 130 def QueryTypeAttribute(self, Type, IsAttr): 131 TypeIterP = self.__libsepolwrap.init_type_iter(self.__policydbP, 132 create_string_buffer(Type), IsAttr) 133 if (TypeIterP == None): 134 sys.exit("Failed to initialize type iterator") 135 buf = create_string_buffer(self.__BUFSIZE) 136 TypeAttr = set() 137 while True: 138 ret = self.__libsepolwrap.get_type(buf, self.__BUFSIZE, 139 self.__policydbP, TypeIterP) 140 if ret == 0: 141 TypeAttr.add(buf.value) 142 continue 143 if ret == 1: 144 break; 145 # We should never get here. 146 sys.exit("Failed to import policy") 147 self.__libsepolwrap.destroy_type_iter(TypeIterP) 148 return TypeAttr 149 150 def __TERuleMatch(self, Rule, **kwargs): 151 # Match source type 152 if ("scontext" in kwargs and 153 len(kwargs['scontext']) > 0 and 154 Rule.sctx not in kwargs['scontext']): 155 return False 156 # Match target type 157 if ("tcontext" in kwargs and 158 len(kwargs['tcontext']) > 0 and 159 Rule.tctx not in kwargs['tcontext']): 160 return False 161 # Match target class 162 if ("tclass" in kwargs and 163 len(kwargs['tclass']) > 0 and 164 not bool(set([Rule.tclass]) & kwargs['tclass'])): 165 return False 166 # Match any perms 167 if ("perms" in kwargs and 168 len(kwargs['perms']) > 0 and 169 not bool(Rule.perms & kwargs['perms'])): 170 return False 171 return True 172 173 # resolve a type to its attributes or 174 # resolve an attribute to its types and attributes 175 # For example if scontext is the domain attribute, then we need to 176 # include all types with the domain attribute such as untrusted_app and 177 # priv_app and all the attributes of those types such as appdomain. 178 def ResolveTypeAttribute(self, Type): 179 types = self.GetAllTypes(False) 180 attributes = self.GetAllTypes(True) 181 182 if Type in types: 183 return self.QueryTypeAttribute(Type, False) 184 elif Type in attributes: 185 TypesAndAttributes = set() 186 Types = self.QueryTypeAttribute(Type, True) 187 TypesAndAttributes |= Types 188 for T in Types: 189 TypesAndAttributes |= self.QueryTypeAttribute(T, False) 190 return TypesAndAttributes 191 else: 192 return set() 193 194 # Return all TERules that match: 195 # (any scontext) or (any tcontext) or (any tclass) or (any perms), 196 # perms. 197 # Any unspecified paramenter will match all. 198 # 199 # Example: QueryTERule(tcontext=["foo", "bar"], perms=["entrypoint"]) 200 # Will return any rule with: 201 # (tcontext="foo" or tcontext="bar") and ("entrypoint" in perms) 202 def QueryTERule(self, **kwargs): 203 if len(self.__Rules) == 0: 204 self.__InitTERules() 205 206 # add any matching types and attributes for scontext and tcontext 207 if ("scontext" in kwargs and len(kwargs['scontext']) > 0): 208 scontext = set() 209 for sctx in kwargs['scontext']: 210 scontext |= self.ResolveTypeAttribute(sctx) 211 kwargs['scontext'] = scontext 212 if ("tcontext" in kwargs and len(kwargs['tcontext']) > 0): 213 tcontext = set() 214 for tctx in kwargs['tcontext']: 215 tcontext |= self.ResolveTypeAttribute(tctx) 216 kwargs['tcontext'] = tcontext 217 for Rule in self.__Rules: 218 if self.__TERuleMatch(Rule, **kwargs): 219 yield Rule 220 221 # Same as QueryTERule but only using the expanded ruleset. 222 # i.e. all attributes have been expanded to their various types. 223 def QueryExpandedTERule(self, **kwargs): 224 if len(self.__ExpandedRules) == 0: 225 self.__InitExpandedTERules() 226 for Rule in self.__ExpandedRules: 227 if self.__TERuleMatch(Rule, **kwargs): 228 yield Rule 229 230 def GetAllTypes(self, isAttr): 231 TypeIterP = self.__libsepolwrap.init_type_iter(self.__policydbP, None, isAttr) 232 if (TypeIterP == None): 233 sys.exit("Failed to initialize type iterator") 234 buf = create_string_buffer(self.__BUFSIZE) 235 AllTypes = set() 236 while True: 237 ret = self.__libsepolwrap.get_type(buf, self.__BUFSIZE, 238 self.__policydbP, TypeIterP) 239 if ret == 0: 240 AllTypes.add(buf.value) 241 continue 242 if ret == 1: 243 break; 244 # We should never get here. 245 sys.exit("Failed to import policy") 246 self.__libsepolwrap.destroy_type_iter(TypeIterP) 247 return AllTypes 248 249 def __ExactMatchPathPrefix(self, pathregex, prefix): 250 pattern = re.compile('^' + pathregex + "$") 251 if pattern.match(prefix): 252 return True 253 return False 254 255 # Return a tuple (prefix, i) where i is the index of the most specific 256 # match of prefix in the sorted file_contexts. This is useful for limiting a 257 # file_contexts search to matches that are more specific and omitting less 258 # specific matches. For example, finding all matches to prefix /data/vendor 259 # should not include /data(/.*)? if /data/vendor(/.*)? is also specified. 260 def __FcSortedIndex(self, prefix): 261 index = 0 262 for i in range(0, len(self.__FcSorted)): 263 if self.__ExactMatchPathPrefix(self.__FcSorted[i].path, prefix): 264 index = i 265 return prefix, index 266 267 # Return a tuple of (path, Type) for all matching paths. Use the sorted 268 # file_contexts and index returned from __FcSortedIndex() to limit results 269 # to results that are more specific than the prefix. 270 def __MatchPathPrefixTypes(self, prefix, index): 271 PathType = [] 272 for i in range(index, len(self.__FcSorted)): 273 if MatchPathPrefix(self.__FcSorted[i].path, prefix): 274 PathType.append((self.__FcSorted[i].path, self.__FcSorted[i].Type)) 275 return PathType 276 277 # Return types that match MatchPrefixes but do not match 278 # DoNotMatchPrefixes 279 def __GetTypesAndFilesByFilePathPrefix(self, MatchPrefixes, DoNotMatchPrefixes): 280 Types = set() 281 Files = set() 282 283 MatchPrefixesWithIndex = [] 284 for MatchPrefix in MatchPrefixes: 285 MatchPrefixesWithIndex.append(self.__FcSortedIndex(MatchPrefix)) 286 287 for MatchPrefixWithIndex in MatchPrefixesWithIndex: 288 PathTypes = self.__MatchPathPrefixTypes(*MatchPrefixWithIndex) 289 for PathType in PathTypes: 290 if MatchPathPrefixes(PathType[0], DoNotMatchPrefixes): 291 continue 292 Types.add(PathType[1]) 293 Files.add(PathType[0]) 294 return Types, Files 295 296 def __GetTERules(self, policydbP, avtabIterP, Rules): 297 if Rules is None: 298 Rules = set() 299 buf = create_string_buffer(self.__BUFSIZE) 300 ret = 0 301 while True: 302 ret = self.__libsepolwrap.get_allow_rule(buf, self.__BUFSIZE, 303 policydbP, avtabIterP) 304 if ret == 0: 305 Rule = TERule(buf.value) 306 Rules.add(Rule) 307 continue 308 if ret == 1: 309 break; 310 # We should never get here. 311 sys.exit("Failed to import policy") 312 313 def __InitTERules(self): 314 avtabIterP = self.__libsepolwrap.init_avtab(self.__policydbP) 315 if (avtabIterP == None): 316 sys.exit("Failed to initialize avtab") 317 self.__GetTERules(self.__policydbP, avtabIterP, self.__Rules) 318 self.__libsepolwrap.destroy_avtab(avtabIterP) 319 avtabIterP = self.__libsepolwrap.init_cond_avtab(self.__policydbP) 320 if (avtabIterP == None): 321 sys.exit("Failed to initialize conditional avtab") 322 self.__GetTERules(self.__policydbP, avtabIterP, self.__Rules) 323 self.__libsepolwrap.destroy_avtab(avtabIterP) 324 325 def __InitExpandedTERules(self): 326 avtabIterP = self.__libsepolwrap.init_expanded_avtab(self.__policydbP) 327 if (avtabIterP == None): 328 sys.exit("Failed to initialize avtab") 329 self.__GetTERules(self.__policydbP, avtabIterP, self.__ExpandedRules) 330 self.__libsepolwrap.destroy_expanded_avtab(avtabIterP) 331 avtabIterP = self.__libsepolwrap.init_expanded_cond_avtab(self.__policydbP) 332 if (avtabIterP == None): 333 sys.exit("Failed to initialize conditional avtab") 334 self.__GetTERules(self.__policydbP, avtabIterP, self.__ExpandedRules) 335 self.__libsepolwrap.destroy_expanded_avtab(avtabIterP) 336 337 # load ctypes-ified libsepol wrapper 338 def __InitLibsepolwrap(self, LibPath): 339 lib = CDLL(LibPath) 340 341 # int get_allow_rule(char *out, size_t len, void *policydbp, void *avtab_iterp); 342 lib.get_allow_rule.restype = c_int 343 lib.get_allow_rule.argtypes = [c_char_p, c_size_t, c_void_p, c_void_p]; 344 # void *load_policy(const char *policy_path); 345 lib.load_policy.restype = c_void_p 346 lib.load_policy.argtypes = [c_char_p] 347 # void destroy_policy(void *policydbp); 348 lib.destroy_policy.argtypes = [c_void_p] 349 # void *init_expanded_avtab(void *policydbp); 350 lib.init_expanded_avtab.restype = c_void_p 351 lib.init_expanded_avtab.argtypes = [c_void_p] 352 # void *init_expanded_cond_avtab(void *policydbp); 353 lib.init_expanded_cond_avtab.restype = c_void_p 354 lib.init_expanded_cond_avtab.argtypes = [c_void_p] 355 # void destroy_expanded_avtab(void *avtab_iterp); 356 lib.destroy_expanded_avtab.argtypes = [c_void_p] 357 # void *init_avtab(void *policydbp); 358 lib.init_avtab.restype = c_void_p 359 lib.init_avtab.argtypes = [c_void_p] 360 # void *init_cond_avtab(void *policydbp); 361 lib.init_cond_avtab.restype = c_void_p 362 lib.init_cond_avtab.argtypes = [c_void_p] 363 # void destroy_avtab(void *avtab_iterp); 364 lib.destroy_avtab.argtypes = [c_void_p] 365 # int get_type(char *out, size_t max_size, void *policydbp, void *type_iterp); 366 lib.get_type.restype = c_int 367 lib.get_type.argtypes = [c_char_p, c_size_t, c_void_p, c_void_p] 368 # void *init_type_iter(void *policydbp, const char *type, bool is_attr); 369 lib.init_type_iter.restype = c_void_p 370 lib.init_type_iter.argtypes = [c_void_p, c_char_p, c_bool] 371 # void destroy_type_iter(void *type_iterp); 372 lib.destroy_type_iter.argtypes = [c_void_p] 373 # void *init_genfs_iter(void *policydbp) 374 lib.init_genfs_iter.restype = c_void_p 375 lib.init_genfs_iter.argtypes = [c_void_p] 376 # int get_genfs(char *out, size_t max_size, void *genfs_iterp); 377 lib.get_genfs.restype = c_int 378 lib.get_genfs.argtypes = [c_char_p, c_size_t, c_void_p, c_void_p] 379 # void destroy_genfs_iter(void *genfs_iterp) 380 lib.destroy_genfs_iter.argtypes = [c_void_p] 381 382 self.__libsepolwrap = lib 383 384 def __GenfsDictAdd(self, Dict, buf): 385 fs, path, context = buf.split(" ") 386 Type = context.split(":")[2] 387 if not fs in Dict: 388 Dict[fs] = {Type} 389 else: 390 Dict[fs].add(Type) 391 392 def __InitGenfsCon(self): 393 self.__GenfsDict = {} 394 GenfsIterP = self.__libsepolwrap.init_genfs_iter(self.__policydbP) 395 if (GenfsIterP == None): 396 sys.exit("Failed to retreive genfs entries") 397 buf = create_string_buffer(self.__BUFSIZE) 398 while True: 399 ret = self.__libsepolwrap.get_genfs(buf, self.__BUFSIZE, 400 self.__policydbP, GenfsIterP) 401 if ret == 0: 402 self.__GenfsDictAdd(self.__GenfsDict, buf.value) 403 continue 404 if ret == 1: 405 self.__GenfsDictAdd(self.__GenfsDict, buf.value) 406 break; 407 # We should never get here. 408 sys.exit("Failed to get genfs entries") 409 self.__libsepolwrap.destroy_genfs_iter(GenfsIterP) 410 411 # load file_contexts 412 def __InitFC(self, FcPaths): 413 if FcPaths is None: 414 return 415 fc = [] 416 for path in FcPaths: 417 if not os.path.exists(path): 418 sys.exit("file_contexts file " + path + " does not exist.") 419 fd = open(path, "r") 420 fc += fd.readlines() 421 fd.close() 422 self.__FcDict = {} 423 for i in fc: 424 rec = i.split() 425 try: 426 t = rec[-1].split(":")[2] 427 if t in self.__FcDict: 428 self.__FcDict[t].append(rec[0]) 429 else: 430 self.__FcDict[t] = [rec[0]] 431 except: 432 pass 433 self.__FcSorted = fc_sort.FcSort(FcPaths) 434 435 # load policy 436 def __InitPolicy(self, PolicyPath): 437 cPolicyPath = create_string_buffer(PolicyPath) 438 self.__policydbP = self.__libsepolwrap.load_policy(cPolicyPath) 439 if (self.__policydbP is None): 440 sys.exit("Failed to load policy") 441 442 def __init__(self, PolicyPath, FcPaths, LibPath): 443 self.__InitLibsepolwrap(LibPath) 444 self.__InitFC(FcPaths) 445 self.__InitPolicy(PolicyPath) 446 self.__InitGenfsCon() 447 448 def __del__(self): 449 if self.__policydbP is not None: 450 self.__libsepolwrap.destroy_policy(self.__policydbP) 451