1# Copyright 2021 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from ctypes import * 16import re 17import os 18import sys 19import platform 20import fc_sort 21 22### 23# Check whether the regex will match a file path starting with the provided 24# prefix 25# 26# Compares regex entries in file_contexts with a path prefix. Regex entries 27# are often more specific than this file prefix. For example, the regex could 28# be /system/bin/foo\.sh and the prefix could be /system. This function 29# loops over the regex removing characters from the end until 30# 1) there is a match - return True or 2) run out of characters - return 31# False. 32# 33def MatchPathPrefix(pathregex, prefix): 34 for i in range(len(pathregex), 0, -1): 35 try: 36 pattern = re.compile('^' + pathregex[0:i] + "$") 37 except: 38 continue 39 if pattern.match(prefix): 40 return True 41 return False 42 43def MatchPathPrefixes(pathregex, Prefixes): 44 for Prefix in Prefixes: 45 if MatchPathPrefix(pathregex, Prefix): 46 return True 47 return False 48 49class TERule: 50 def __init__(self, rule): 51 data = rule.split(',') 52 self.flavor = data[0] 53 self.sctx = data[1] 54 self.tctx = data[2] 55 self.tclass = data[3] 56 self.perms = set((data[4].strip()).split(' ')) 57 self.rule = rule 58 59class Policy: 60 __ExpandedRules = set() 61 __Rules = set() 62 __FcDict = None 63 __FcSorted = None 64 __GenfsDict = None 65 __libsepolwrap = None 66 __policydbP = None 67 __BUFSIZE = 2048 68 69 def AssertPathTypesDoNotHaveAttr(self, MatchPrefix, DoNotMatchPrefix, Attr, ExcludedTypes = []): 70 # Query policy for the types associated with Attr 71 TypesPol = self.QueryTypeAttribute(Attr, True) - set(ExcludedTypes) 72 # Search file_contexts to find types associated with input paths. 73 TypesFc, Files = self.__GetTypesAndFilesByFilePathPrefix(MatchPrefix, DoNotMatchPrefix) 74 violators = TypesFc.intersection(TypesPol) 75 ret = "" 76 if len(violators) > 0: 77 ret += "The following types on " 78 ret += " ".join(str(x) for x in sorted(MatchPrefix)) 79 ret += " must not be associated with the " 80 ret += "\"" + Attr + "\" attribute: " 81 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 82 ret += " corresponding to files: " 83 ret += " ".join(str(x) for x in sorted(Files)) + "\n" 84 return ret 85 86 # Check that all types for "filesystem" have "attribute" associated with them 87 # for types labeled in genfs_contexts. 88 def AssertGenfsFilesystemTypesHaveAttr(self, Filesystem, Attr): 89 TypesPol = self.QueryTypeAttribute(Attr, True) 90 TypesGenfs = self.__GenfsDict[Filesystem] 91 violators = TypesGenfs.difference(TypesPol) 92 93 ret = "" 94 if len(violators) > 0: 95 ret += "The following types in " + Filesystem 96 ret += " must be associated with the " 97 ret += "\"" + Attr + "\" attribute: " 98 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 99 return ret 100 101 # Check that path prefixes that match MatchPrefix, and do not Match 102 # DoNotMatchPrefix have the attribute Attr. 103 # For example assert that all types in /sys, and not in /sys/kernel/debugfs 104 # have the sysfs_type attribute. 105 def AssertPathTypesHaveAttr(self, MatchPrefix, DoNotMatchPrefix, Attr): 106 # Query policy for the types associated with Attr 107 TypesPol = self.QueryTypeAttribute(Attr, True) 108 # Search file_contexts to find paths/types that should be associated with 109 # Attr. 110 TypesFc, Files = self.__GetTypesAndFilesByFilePathPrefix(MatchPrefix, DoNotMatchPrefix) 111 violators = TypesFc.difference(TypesPol) 112 113 ret = "" 114 if len(violators) > 0: 115 ret += "The following types on " 116 ret += " ".join(str(x) for x in sorted(MatchPrefix)) 117 ret += " must be associated with the " 118 ret += "\"" + Attr + "\" attribute: " 119 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 120 ret += " corresponding to files: " 121 ret += " ".join(str(x) for x in sorted(Files)) + "\n" 122 return ret 123 124 def AssertPropertyOwnersAreExclusive(self): 125 systemProps = self.QueryTypeAttribute('system_property_type', True) 126 vendorProps = self.QueryTypeAttribute('vendor_property_type', True) 127 violators = systemProps.intersection(vendorProps) 128 ret = "" 129 if len(violators) > 0: 130 ret += "The following types have both system_property_type " 131 ret += "and vendor_property_type: " 132 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 133 return ret 134 135 # Return all file_contexts entries that map to the input Type. 136 def QueryFc(self, Type): 137 if Type in self.__FcDict: 138 return self.__FcDict[Type] 139 else: 140 return None 141 142 # Return all attributes associated with a type if IsAttr=False or 143 # all types associated with an attribute if IsAttr=True 144 def QueryTypeAttribute(self, Type, IsAttr): 145 TypeIterP = self.__libsepolwrap.init_type_iter(self.__policydbP, 146 create_string_buffer(Type.encode("ascii")), IsAttr) 147 if (TypeIterP == None): 148 sys.exit("Failed to initialize type iterator") 149 buf = create_string_buffer(self.__BUFSIZE) 150 TypeAttr = set() 151 while True: 152 ret = self.__libsepolwrap.get_type(buf, self.__BUFSIZE, 153 self.__policydbP, TypeIterP) 154 if ret == 0: 155 TypeAttr.add(buf.value.decode("ascii")) 156 continue 157 if ret == 1: 158 break; 159 # We should never get here. 160 sys.exit("Failed to import policy") 161 self.__libsepolwrap.destroy_type_iter(TypeIterP) 162 return TypeAttr 163 164 def __TERuleMatch(self, Rule, **kwargs): 165 # Match source type 166 if ("scontext" in kwargs and 167 len(kwargs['scontext']) > 0 and 168 Rule.sctx not in kwargs['scontext']): 169 return False 170 # Match target type 171 if ("tcontext" in kwargs and 172 len(kwargs['tcontext']) > 0 and 173 Rule.tctx not in kwargs['tcontext']): 174 return False 175 # Match target class 176 if ("tclass" in kwargs and 177 len(kwargs['tclass']) > 0 and 178 not bool(set([Rule.tclass]) & kwargs['tclass'])): 179 return False 180 # Match any perms 181 if ("perms" in kwargs and 182 len(kwargs['perms']) > 0 and 183 not bool(Rule.perms & kwargs['perms'])): 184 return False 185 return True 186 187 # resolve a type to its attributes or 188 # resolve an attribute to its types and attributes 189 # For example if scontext is the domain attribute, then we need to 190 # include all types with the domain attribute such as untrusted_app and 191 # priv_app and all the attributes of those types such as appdomain. 192 def ResolveTypeAttribute(self, Type): 193 types = self.GetAllTypes(False) 194 attributes = self.GetAllTypes(True) 195 196 if Type in types: 197 return self.QueryTypeAttribute(Type, False) 198 elif Type in attributes: 199 TypesAndAttributes = set() 200 Types = self.QueryTypeAttribute(Type, True) 201 TypesAndAttributes |= Types 202 for T in Types: 203 TypesAndAttributes |= self.QueryTypeAttribute(T, False) 204 return TypesAndAttributes 205 else: 206 return set() 207 208 # Return all TERules that match: 209 # (any scontext) or (any tcontext) or (any tclass) or (any perms), 210 # perms. 211 # Any unspecified paramenter will match all. 212 # 213 # Example: QueryTERule(tcontext=["foo", "bar"], perms=["entrypoint"]) 214 # Will return any rule with: 215 # (tcontext="foo" or tcontext="bar") and ("entrypoint" in perms) 216 def QueryTERule(self, **kwargs): 217 if len(self.__Rules) == 0: 218 self.__InitTERules() 219 220 # add any matching types and attributes for scontext and tcontext 221 if ("scontext" in kwargs and len(kwargs['scontext']) > 0): 222 scontext = set() 223 for sctx in kwargs['scontext']: 224 scontext |= self.ResolveTypeAttribute(sctx) 225 if (len(scontext) == 0): 226 return [] 227 kwargs['scontext'] = scontext 228 if ("tcontext" in kwargs and len(kwargs['tcontext']) > 0): 229 tcontext = set() 230 for tctx in kwargs['tcontext']: 231 tcontext |= self.ResolveTypeAttribute(tctx) 232 if (len(tcontext) == 0): 233 return [] 234 kwargs['tcontext'] = tcontext 235 for Rule in self.__Rules: 236 if self.__TERuleMatch(Rule, **kwargs): 237 yield Rule 238 239 # Same as QueryTERule but only using the expanded ruleset. 240 # i.e. all attributes have been expanded to their various types. 241 def QueryExpandedTERule(self, **kwargs): 242 if len(self.__ExpandedRules) == 0: 243 self.__InitExpandedTERules() 244 for Rule in self.__ExpandedRules: 245 if self.__TERuleMatch(Rule, **kwargs): 246 yield Rule 247 248 def GetAllTypes(self, isAttr): 249 TypeIterP = self.__libsepolwrap.init_type_iter(self.__policydbP, None, isAttr) 250 if (TypeIterP == None): 251 sys.exit("Failed to initialize type iterator") 252 buf = create_string_buffer(self.__BUFSIZE) 253 AllTypes = set() 254 while True: 255 ret = self.__libsepolwrap.get_type(buf, self.__BUFSIZE, 256 self.__policydbP, TypeIterP) 257 if ret == 0: 258 AllTypes.add(buf.value.decode("ascii")) 259 continue 260 if ret == 1: 261 break; 262 # We should never get here. 263 sys.exit("Failed to import policy") 264 self.__libsepolwrap.destroy_type_iter(TypeIterP) 265 return AllTypes 266 267 def __ExactMatchPathPrefix(self, pathregex, prefix): 268 pattern = re.compile('^' + pathregex + "$") 269 if pattern.match(prefix): 270 return True 271 return False 272 273 # Return a tuple (prefix, i) where i is the index of the most specific 274 # match of prefix in the sorted file_contexts. This is useful for limiting a 275 # file_contexts search to matches that are more specific and omitting less 276 # specific matches. For example, finding all matches to prefix /data/vendor 277 # should not include /data(/.*)? if /data/vendor(/.*)? is also specified. 278 def __FcSortedIndex(self, prefix): 279 index = 0 280 for i in range(0, len(self.__FcSorted)): 281 if self.__ExactMatchPathPrefix(self.__FcSorted[i].path, prefix): 282 index = i 283 return prefix, index 284 285 # Return a tuple of (path, Type) for all matching paths. Use the sorted 286 # file_contexts and index returned from __FcSortedIndex() to limit results 287 # to results that are more specific than the prefix. 288 def __MatchPathPrefixTypes(self, prefix, index): 289 PathType = [] 290 for i in range(index, len(self.__FcSorted)): 291 if MatchPathPrefix(self.__FcSorted[i].path, prefix): 292 PathType.append((self.__FcSorted[i].path, self.__FcSorted[i].type)) 293 return PathType 294 295 # Return types that match MatchPrefixes but do not match 296 # DoNotMatchPrefixes 297 def __GetTypesAndFilesByFilePathPrefix(self, MatchPrefixes, DoNotMatchPrefixes): 298 Types = set() 299 Files = set() 300 301 MatchPrefixesWithIndex = [] 302 for MatchPrefix in MatchPrefixes: 303 MatchPrefixesWithIndex.append(self.__FcSortedIndex(MatchPrefix)) 304 305 for MatchPrefixWithIndex in MatchPrefixesWithIndex: 306 PathTypes = self.__MatchPathPrefixTypes(*MatchPrefixWithIndex) 307 for PathType in PathTypes: 308 if MatchPathPrefixes(PathType[0], DoNotMatchPrefixes): 309 continue 310 Types.add(PathType[1]) 311 Files.add(PathType[0]) 312 return Types, Files 313 314 def __GetTERules(self, policydbP, avtabIterP, Rules): 315 if Rules is None: 316 Rules = set() 317 buf = create_string_buffer(self.__BUFSIZE) 318 ret = 0 319 while True: 320 ret = self.__libsepolwrap.get_allow_rule(buf, self.__BUFSIZE, 321 policydbP, avtabIterP) 322 if ret == 0: 323 Rule = TERule(buf.value.decode("ascii")) 324 Rules.add(Rule) 325 continue 326 if ret == 1: 327 break; 328 # We should never get here. 329 sys.exit("Failed to import policy") 330 331 def __InitTERules(self): 332 avtabIterP = self.__libsepolwrap.init_avtab(self.__policydbP) 333 if (avtabIterP == None): 334 sys.exit("Failed to initialize avtab") 335 self.__GetTERules(self.__policydbP, avtabIterP, self.__Rules) 336 self.__libsepolwrap.destroy_avtab(avtabIterP) 337 avtabIterP = self.__libsepolwrap.init_cond_avtab(self.__policydbP) 338 if (avtabIterP == None): 339 sys.exit("Failed to initialize conditional avtab") 340 self.__GetTERules(self.__policydbP, avtabIterP, self.__Rules) 341 self.__libsepolwrap.destroy_avtab(avtabIterP) 342 343 def __InitExpandedTERules(self): 344 avtabIterP = self.__libsepolwrap.init_expanded_avtab(self.__policydbP) 345 if (avtabIterP == None): 346 sys.exit("Failed to initialize avtab") 347 self.__GetTERules(self.__policydbP, avtabIterP, self.__ExpandedRules) 348 self.__libsepolwrap.destroy_expanded_avtab(avtabIterP) 349 avtabIterP = self.__libsepolwrap.init_expanded_cond_avtab(self.__policydbP) 350 if (avtabIterP == None): 351 sys.exit("Failed to initialize conditional avtab") 352 self.__GetTERules(self.__policydbP, avtabIterP, self.__ExpandedRules) 353 self.__libsepolwrap.destroy_expanded_avtab(avtabIterP) 354 355 # load ctypes-ified libsepol wrapper 356 def __InitLibsepolwrap(self, LibPath): 357 lib = CDLL(LibPath) 358 359 # int get_allow_rule(char *out, size_t len, void *policydbp, void *avtab_iterp); 360 lib.get_allow_rule.restype = c_int 361 lib.get_allow_rule.argtypes = [c_char_p, c_size_t, c_void_p, c_void_p]; 362 # void *load_policy(const char *policy_path); 363 lib.load_policy.restype = c_void_p 364 lib.load_policy.argtypes = [c_char_p] 365 # void destroy_policy(void *policydbp); 366 lib.destroy_policy.argtypes = [c_void_p] 367 # void *init_expanded_avtab(void *policydbp); 368 lib.init_expanded_avtab.restype = c_void_p 369 lib.init_expanded_avtab.argtypes = [c_void_p] 370 # void *init_expanded_cond_avtab(void *policydbp); 371 lib.init_expanded_cond_avtab.restype = c_void_p 372 lib.init_expanded_cond_avtab.argtypes = [c_void_p] 373 # void destroy_expanded_avtab(void *avtab_iterp); 374 lib.destroy_expanded_avtab.argtypes = [c_void_p] 375 # void *init_avtab(void *policydbp); 376 lib.init_avtab.restype = c_void_p 377 lib.init_avtab.argtypes = [c_void_p] 378 # void *init_cond_avtab(void *policydbp); 379 lib.init_cond_avtab.restype = c_void_p 380 lib.init_cond_avtab.argtypes = [c_void_p] 381 # void destroy_avtab(void *avtab_iterp); 382 lib.destroy_avtab.argtypes = [c_void_p] 383 # int get_type(char *out, size_t max_size, void *policydbp, void *type_iterp); 384 lib.get_type.restype = c_int 385 lib.get_type.argtypes = [c_char_p, c_size_t, c_void_p, c_void_p] 386 # void *init_type_iter(void *policydbp, const char *type, bool is_attr); 387 lib.init_type_iter.restype = c_void_p 388 lib.init_type_iter.argtypes = [c_void_p, c_char_p, c_bool] 389 # void destroy_type_iter(void *type_iterp); 390 lib.destroy_type_iter.argtypes = [c_void_p] 391 # void *init_genfs_iter(void *policydbp) 392 lib.init_genfs_iter.restype = c_void_p 393 lib.init_genfs_iter.argtypes = [c_void_p] 394 # int get_genfs(char *out, size_t max_size, void *genfs_iterp); 395 lib.get_genfs.restype = c_int 396 lib.get_genfs.argtypes = [c_char_p, c_size_t, c_void_p, c_void_p] 397 # void destroy_genfs_iter(void *genfs_iterp) 398 lib.destroy_genfs_iter.argtypes = [c_void_p] 399 400 self.__libsepolwrap = lib 401 402 def __GenfsDictAdd(self, Dict, buf): 403 fs, buf = buf.split(' ', 1) 404 path, context = buf.rsplit(' ', 1) 405 Type = context.split(":")[2] 406 if not fs in Dict: 407 Dict[fs] = {Type} 408 else: 409 Dict[fs].add(Type) 410 411 def __InitGenfsCon(self): 412 self.__GenfsDict = {} 413 GenfsIterP = self.__libsepolwrap.init_genfs_iter(self.__policydbP) 414 if (GenfsIterP == None): 415 sys.exit("Failed to retreive genfs entries") 416 buf = create_string_buffer(self.__BUFSIZE) 417 while True: 418 ret = self.__libsepolwrap.get_genfs(buf, self.__BUFSIZE, 419 self.__policydbP, GenfsIterP) 420 if ret == 0: 421 self.__GenfsDictAdd(self.__GenfsDict, buf.value.decode("ascii")) 422 continue 423 if ret == 1: 424 self.__GenfsDictAdd(self.__GenfsDict, buf.value.decode("ascii")) 425 break; 426 # We should never get here. 427 sys.exit("Failed to get genfs entries") 428 self.__libsepolwrap.destroy_genfs_iter(GenfsIterP) 429 430 # load file_contexts 431 def __InitFC(self, FcPaths): 432 if FcPaths is None: 433 return 434 fc = [] 435 for path in FcPaths: 436 if not os.path.exists(path): 437 sys.exit("file_contexts file " + path + " does not exist.") 438 fd = open(path, "r") 439 fc += fd.readlines() 440 fd.close() 441 self.__FcDict = {} 442 for i in fc: 443 rec = i.split() 444 try: 445 t = rec[-1].split(":")[2] 446 if t in self.__FcDict: 447 self.__FcDict[t].append(rec[0]) 448 else: 449 self.__FcDict[t] = [rec[0]] 450 except: 451 pass 452 self.__FcSorted = fc_sort.sort(FcPaths) 453 454 # load policy 455 def __InitPolicy(self, PolicyPath): 456 cPolicyPath = create_string_buffer(PolicyPath.encode("ascii")) 457 self.__policydbP = self.__libsepolwrap.load_policy(cPolicyPath) 458 if (self.__policydbP is None): 459 sys.exit("Failed to load policy") 460 461 def __init__(self, PolicyPath, FcPaths, LibPath): 462 self.__InitLibsepolwrap(LibPath) 463 self.__InitFC(FcPaths) 464 self.__InitPolicy(PolicyPath) 465 self.__InitGenfsCon() 466 467 def __del__(self): 468 if self.__policydbP is not None: 469 self.__libsepolwrap.destroy_policy(self.__policydbP) 470