1from optparse import OptionParser 2from optparse import Option, OptionValueError 3import os 4import mini_parser 5import policy 6from policy import MatchPathPrefix 7import re 8import sys 9 10DEBUG=False 11 12''' 13Use file_contexts and policy to verify Treble requirements 14are not violated. 15''' 16### 17# Differentiate between domains that are part of the core Android platform and 18# domains introduced by vendors 19coreAppdomain = { 20 'bluetooth', 21 'ephemeral_app', 22 'isolated_app', 23 'nfc', 24 'platform_app', 25 'priv_app', 26 'radio', 27 'shared_relro', 28 'shell', 29 'system_app', 30 'untrusted_app', 31 'untrusted_app_25', 32 'untrusted_v2_app', 33 } 34coredomainWhitelist = { 35 'adbd', 36 'kernel', 37 'postinstall', 38 'postinstall_dexopt', 39 'recovery', 40 'system_server', 41 'vendor_init', 42 } 43coredomainWhitelist |= coreAppdomain 44 45class scontext: 46 def __init__(self): 47 self.fromSystem = False 48 self.fromVendor = False 49 self.coredomain = False 50 self.appdomain = False 51 self.attributes = set() 52 self.entrypoints = [] 53 self.entrypointpaths = [] 54 55def PrintScontexts(): 56 for d in sorted(alldomains.keys()): 57 sctx = alldomains[d] 58 print d 59 print "\tcoredomain="+str(sctx.coredomain) 60 print "\tappdomain="+str(sctx.appdomain) 61 print "\tfromSystem="+str(sctx.fromSystem) 62 print "\tfromVendor="+str(sctx.fromVendor) 63 print "\tattributes="+str(sctx.attributes) 64 print "\tentrypoints="+str(sctx.entrypoints) 65 print "\tentrypointpaths=" 66 if sctx.entrypointpaths is not None: 67 for path in sctx.entrypointpaths: 68 print "\t\t"+str(path) 69 70alldomains = {} 71coredomains = set() 72appdomains = set() 73vendordomains = set() 74pol = None 75 76# compat vars 77alltypes = set() 78oldalltypes = set() 79compatMapping = None 80 81# Distinguish between PRODUCT_FULL_TREBLE and PRODUCT_FULL_TREBLE_OVERRIDE 82FakeTreble = False 83 84def GetAllDomains(pol): 85 global alldomains 86 for result in pol.QueryTypeAttribute("domain", True): 87 alldomains[result] = scontext() 88 89def GetAppDomains(): 90 global appdomains 91 global alldomains 92 for d in alldomains: 93 # The application of the "appdomain" attribute is trusted because core 94 # selinux policy contains neverallow rules that enforce that only zygote 95 # and runas spawned processes may transition to processes that have 96 # the appdomain attribute. 97 if "appdomain" in alldomains[d].attributes: 98 alldomains[d].appdomain = True 99 appdomains.add(d) 100 101def GetCoreDomains(): 102 global alldomains 103 global coredomains 104 for d in alldomains: 105 # TestCoredomainViolations will verify if coredomain was incorrectly 106 # applied. 107 if "coredomain" in alldomains[d].attributes: 108 alldomains[d].coredomain = True 109 coredomains.add(d) 110 # check whether domains are executed off of /system or /vendor 111 if d in coredomainWhitelist: 112 continue 113 # TODO, add checks to prevent app domains from being incorrectly 114 # labeled as coredomain. Apps don't have entrypoints as they're always 115 # dynamically transitioned to by zygote. 116 if d in appdomains: 117 continue 118 if not alldomains[d].entrypointpaths: 119 continue 120 for path in alldomains[d].entrypointpaths: 121 # Processes with entrypoint on /system 122 if ((MatchPathPrefix(path, "/system") and not 123 MatchPathPrefix(path, "/system/vendor")) or 124 MatchPathPrefix(path, "/init") or 125 MatchPathPrefix(path, "/charger")): 126 alldomains[d].fromSystem = True 127 # Processes with entrypoint on /vendor or /system/vendor 128 if (MatchPathPrefix(path, "/vendor") or 129 MatchPathPrefix(path, "/system/vendor")): 130 alldomains[d].fromVendor = True 131 132### 133# Add the entrypoint type and path(s) to each domain. 134# 135def GetDomainEntrypoints(pol): 136 global alldomains 137 for x in pol.QueryExpandedTERule(tclass=set(["file"]), perms=set(["entrypoint"])): 138 if not x.sctx in alldomains: 139 continue 140 alldomains[x.sctx].entrypoints.append(str(x.tctx)) 141 # postinstall_file represents a special case specific to A/B OTAs. 142 # Update_engine mounts a partition and relabels it postinstall_file. 143 # There is no file_contexts entry associated with postinstall_file 144 # so skip the lookup. 145 if x.tctx == "postinstall_file": 146 continue 147 entrypointpath = pol.QueryFc(x.tctx) 148 if not entrypointpath: 149 continue 150 alldomains[x.sctx].entrypointpaths.extend(entrypointpath) 151### 152# Get attributes associated with each domain 153# 154def GetAttributes(pol): 155 global alldomains 156 for domain in alldomains: 157 for result in pol.QueryTypeAttribute(domain, False): 158 alldomains[domain].attributes.add(result) 159 160def GetAllTypes(pol, oldpol): 161 global alltypes 162 global oldalltypes 163 alltypes = pol.GetAllTypes(False) 164 oldalltypes = oldpol.GetAllTypes(False) 165 166def setup(pol): 167 GetAllDomains(pol) 168 GetAttributes(pol) 169 GetDomainEntrypoints(pol) 170 GetAppDomains() 171 GetCoreDomains() 172 173# setup for the policy compatibility tests 174def compatSetup(pol, oldpol, mapping): 175 global compatMapping 176 177 GetAllTypes(pol, oldpol) 178 compatMapping = mapping 179 180def DomainsWithAttribute(attr): 181 global alldomains 182 domains = [] 183 for domain in alldomains: 184 if attr in alldomains[domain].attributes: 185 domains.append(domain) 186 return domains 187 188############################################################# 189# Tests 190############################################################# 191def TestCoredomainViolations(): 192 global alldomains 193 # verify that all domains launched from /system have the coredomain 194 # attribute 195 ret = "" 196 violators = [] 197 for d in alldomains: 198 domain = alldomains[d] 199 if domain.fromSystem and "coredomain" not in domain.attributes: 200 violators.append(d); 201 if len(violators) > 0: 202 ret += "The following domain(s) must be associated with the " 203 ret += "\"coredomain\" attribute because they are executed off of " 204 ret += "/system:\n" 205 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 206 207 # verify that all domains launched form /vendor do not have the coredomain 208 # attribute 209 violators = [] 210 for d in alldomains: 211 domain = alldomains[d] 212 if domain.fromVendor and "coredomain" in domain.attributes: 213 violators.append(d) 214 if len(violators) > 0: 215 ret += "The following domains must not be associated with the " 216 ret += "\"coredomain\" attribute because they are executed off of " 217 ret += "/vendor or /system/vendor:\n" 218 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 219 220 return ret 221 222### 223# Make sure that any new type introduced in the new policy that was not present 224# in the old policy has been recorded in the mapping file. 225def TestNoUnmappedNewTypes(): 226 global alltypes 227 global oldalltypes 228 global compatMapping 229 newt = alltypes - oldalltypes 230 ret = "" 231 violators = [] 232 233 for n in newt: 234 if compatMapping.rTypeattributesets.get(n) is None: 235 violators.append(n) 236 237 if len(violators) > 0: 238 ret += "SELinux: The following types were found added to the policy " 239 ret += "without an entry into the compatibility mapping file(s) found " 240 ret += "in private/compat/" + compatMapping.apiLevel + "/" 241 ret += compatMapping.apiLevel + "[.ignore].cil\n" 242 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 243 return ret 244 245### 246# Make sure that any public type removed in the current policy has its 247# declaration added to the mapping file for use in non-platform policy 248def TestNoUnmappedRmTypes(): 249 global alltypes 250 global oldalltypes 251 global compatMapping 252 rmt = oldalltypes - alltypes 253 ret = "" 254 violators = [] 255 256 for o in rmt: 257 if o in compatMapping.pubtypes and not o in compatMapping.types: 258 violators.append(o) 259 260 if len(violators) > 0: 261 ret += "SELinux: The following formerly public types were removed from " 262 ret += "policy without a declaration in the compatibility mapping " 263 ret += "file(s) found in prebuilts/api/" + compatMapping.apiLevel + "/\n" 264 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 265 return ret 266 267def TestTrebleCompatMapping(): 268 ret = TestNoUnmappedNewTypes() 269 ret += TestNoUnmappedRmTypes() 270 return ret 271 272def TestViolatorAttribute(attribute): 273 global FakeTreble 274 ret = "" 275 if FakeTreble: 276 return ret 277 278 violators = DomainsWithAttribute(attribute) 279 if len(violators) > 0: 280 ret += "SELinux: The following domains violate the Treble ban " 281 ret += "against use of the " + attribute + " attribute: " 282 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 283 return ret 284 285def TestViolatorAttributes(): 286 ret = TestViolatorAttribute("binder_in_vendor_violators") 287 ret += TestViolatorAttribute("socket_between_core_and_vendor_violators") 288 ret += TestViolatorAttribute("vendor_executes_system_violators") 289 return ret 290 291# TODO move this to sepolicy_tests 292def TestCoreDataTypeViolations(): 293 global pol 294 return pol.AssertPathTypesDoNotHaveAttr(["/data/vendor/", "/data/vendor_ce/", 295 "/data/vendor_de/"], [], "core_data_file_type") 296 297### 298# extend OptionParser to allow the same option flag to be used multiple times. 299# This is used to allow multiple file_contexts files and tests to be 300# specified. 301# 302class MultipleOption(Option): 303 ACTIONS = Option.ACTIONS + ("extend",) 304 STORE_ACTIONS = Option.STORE_ACTIONS + ("extend",) 305 TYPED_ACTIONS = Option.TYPED_ACTIONS + ("extend",) 306 ALWAYS_TYPED_ACTIONS = Option.ALWAYS_TYPED_ACTIONS + ("extend",) 307 308 def take_action(self, action, dest, opt, value, values, parser): 309 if action == "extend": 310 values.ensure_value(dest, []).append(value) 311 else: 312 Option.take_action(self, action, dest, opt, value, values, parser) 313 314Tests = {"CoredomainViolations": TestCoredomainViolations, 315 "CoreDatatypeViolations": TestCoreDataTypeViolations, 316 "TrebleCompatMapping": TestTrebleCompatMapping, 317 "ViolatorAttributes": TestViolatorAttributes} 318 319if __name__ == '__main__': 320 usage = "treble_sepolicy_tests -l $(ANDROID_HOST_OUT)/lib64/libsepolwrap.so " 321 usage += "-f nonplat_file_contexts -f plat_file_contexts " 322 usage += "-p curr_policy -b base_policy -o old_policy " 323 usage +="-m mapping file [--test test] [--help]" 324 parser = OptionParser(option_class=MultipleOption, usage=usage) 325 parser.add_option("-b", "--basepolicy", dest="basepolicy", metavar="FILE") 326 parser.add_option("-f", "--file_contexts", dest="file_contexts", 327 metavar="FILE", action="extend", type="string") 328 parser.add_option("-l", "--library-path", dest="libpath", metavar="FILE") 329 parser.add_option("-m", "--mapping", dest="mapping", metavar="FILE") 330 parser.add_option("-o", "--oldpolicy", dest="oldpolicy", metavar="FILE") 331 parser.add_option("-p", "--policy", dest="policy", metavar="FILE") 332 parser.add_option("-t", "--test", dest="tests", action="extend", 333 help="Test options include "+str(Tests)) 334 parser.add_option("--fake-treble", action="store_true", dest="faketreble", 335 default=False) 336 337 (options, args) = parser.parse_args() 338 339 if not options.libpath: 340 sys.exit("Must specify path to libsepolwrap library\n" + parser.usage) 341 if not os.path.exists(options.libpath): 342 sys.exit("Error: library-path " + options.libpath + " does not exist\n" 343 + parser.usage) 344 if not options.policy: 345 sys.exit("Must specify current monolithic policy file\n" + parser.usage) 346 if not os.path.exists(options.policy): 347 sys.exit("Error: policy file " + options.policy + " does not exist\n" 348 + parser.usage) 349 if not options.file_contexts: 350 sys.exit("Error: Must specify file_contexts file(s)\n" + parser.usage) 351 for f in options.file_contexts: 352 if not os.path.exists(f): 353 sys.exit("Error: File_contexts file " + f + " does not exist\n" + 354 parser.usage) 355 356 # Mapping files are only necessary for the TrebleCompatMapping test 357 if options.tests is None or options.tests is "TrebleCompatMapping": 358 if not options.basepolicy: 359 sys.exit("Must specify the current platform-only policy file\n" + parser.usage) 360 if not options.mapping: 361 sys.exit("Must specify a compatibility mapping file\n" + parser.usage) 362 if not options.oldpolicy: 363 sys.exit("Must specify the previous monolithic policy file\n" + parser.usage) 364 basepol = policy.Policy(options.basepolicy, None, options.libpath) 365 oldpol = policy.Policy(options.oldpolicy, None, options.libpath) 366 mapping = mini_parser.MiniCilParser(options.mapping) 367 compatSetup(basepol, oldpol, mapping) 368 369 370 if options.faketreble: 371 FakeTreble = True 372 373 pol = policy.Policy(options.policy, options.file_contexts, options.libpath) 374 setup(pol) 375 376 if DEBUG: 377 PrintScontexts() 378 379 results = "" 380 # If an individual test is not specified, run all tests. 381 if options.tests is None: 382 for t in Tests.values(): 383 results += t() 384 else: 385 for tn in options.tests: 386 t = Tests.get(tn) 387 if t: 388 results += t() 389 else: 390 err = "Error: unknown test: " + tn + "\n" 391 err += "Available tests:\n" 392 for tn in Tests.keys(): 393 err += tn + "\n" 394 sys.exit(err) 395 396 if len(results) > 0: 397 sys.exit(results) 398