• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1from optparse import OptionParser
2from optparse import Option, OptionValueError
3import os
4import mini_parser
5import policy
6from policy import MatchPathPrefix
7import re
8import sys
9
10DEBUG=False
11
12'''
13Use file_contexts and policy to verify Treble requirements
14are not violated.
15'''
16###
17# Differentiate between domains that are part of the core Android platform and
18# domains introduced by vendors
19coreAppdomain = {
20        'bluetooth',
21        'ephemeral_app',
22        'isolated_app',
23        'nfc',
24        'platform_app',
25        'priv_app',
26        'radio',
27        'shared_relro',
28        'shell',
29        'system_app',
30        'untrusted_app',
31        'untrusted_app_25',
32        'untrusted_v2_app',
33        }
34coredomainWhitelist = {
35        'adbd',
36        'kernel',
37        'postinstall',
38        'postinstall_dexopt',
39        'recovery',
40        'system_server',
41        'vendor_init',
42        }
43coredomainWhitelist |= coreAppdomain
44
45class scontext:
46    def __init__(self):
47        self.fromSystem = False
48        self.fromVendor = False
49        self.coredomain = False
50        self.appdomain = False
51        self.attributes = set()
52        self.entrypoints = []
53        self.entrypointpaths = []
54
55def PrintScontexts():
56    for d in sorted(alldomains.keys()):
57        sctx = alldomains[d]
58        print d
59        print "\tcoredomain="+str(sctx.coredomain)
60        print "\tappdomain="+str(sctx.appdomain)
61        print "\tfromSystem="+str(sctx.fromSystem)
62        print "\tfromVendor="+str(sctx.fromVendor)
63        print "\tattributes="+str(sctx.attributes)
64        print "\tentrypoints="+str(sctx.entrypoints)
65        print "\tentrypointpaths="
66        if sctx.entrypointpaths is not None:
67            for path in sctx.entrypointpaths:
68                print "\t\t"+str(path)
69
70alldomains = {}
71coredomains = set()
72appdomains = set()
73vendordomains = set()
74pol = None
75
76# compat vars
77alltypes = set()
78oldalltypes = set()
79compatMapping = None
80
81# Distinguish between PRODUCT_FULL_TREBLE and PRODUCT_FULL_TREBLE_OVERRIDE
82FakeTreble = False
83
84def GetAllDomains(pol):
85    global alldomains
86    for result in pol.QueryTypeAttribute("domain", True):
87        alldomains[result] = scontext()
88
89def GetAppDomains():
90    global appdomains
91    global alldomains
92    for d in alldomains:
93        # The application of the "appdomain" attribute is trusted because core
94        # selinux policy contains neverallow rules that enforce that only zygote
95        # and runas spawned processes may transition to processes that have
96        # the appdomain attribute.
97        if "appdomain" in alldomains[d].attributes:
98            alldomains[d].appdomain = True
99            appdomains.add(d)
100
101def GetCoreDomains():
102    global alldomains
103    global coredomains
104    for d in alldomains:
105        # TestCoredomainViolations will verify if coredomain was incorrectly
106        # applied.
107        if "coredomain" in alldomains[d].attributes:
108            alldomains[d].coredomain = True
109            coredomains.add(d)
110        # check whether domains are executed off of /system or /vendor
111        if d in coredomainWhitelist:
112            continue
113        # TODO, add checks to prevent app domains from being incorrectly
114        # labeled as coredomain. Apps don't have entrypoints as they're always
115        # dynamically transitioned to by zygote.
116        if d in appdomains:
117            continue
118        if not alldomains[d].entrypointpaths:
119            continue
120        for path in alldomains[d].entrypointpaths:
121            # Processes with entrypoint on /system
122            if ((MatchPathPrefix(path, "/system") and not
123                    MatchPathPrefix(path, "/system/vendor")) or
124                    MatchPathPrefix(path, "/init") or
125                    MatchPathPrefix(path, "/charger")):
126                alldomains[d].fromSystem = True
127            # Processes with entrypoint on /vendor or /system/vendor
128            if (MatchPathPrefix(path, "/vendor") or
129                    MatchPathPrefix(path, "/system/vendor")):
130                alldomains[d].fromVendor = True
131
132###
133# Add the entrypoint type and path(s) to each domain.
134#
135def GetDomainEntrypoints(pol):
136    global alldomains
137    for x in pol.QueryExpandedTERule(tclass=set(["file"]), perms=set(["entrypoint"])):
138        if not x.sctx in alldomains:
139            continue
140        alldomains[x.sctx].entrypoints.append(str(x.tctx))
141        # postinstall_file represents a special case specific to A/B OTAs.
142        # Update_engine mounts a partition and relabels it postinstall_file.
143        # There is no file_contexts entry associated with postinstall_file
144        # so skip the lookup.
145        if x.tctx == "postinstall_file":
146            continue
147        entrypointpath = pol.QueryFc(x.tctx)
148        if not entrypointpath:
149            continue
150        alldomains[x.sctx].entrypointpaths.extend(entrypointpath)
151###
152# Get attributes associated with each domain
153#
154def GetAttributes(pol):
155    global alldomains
156    for domain in alldomains:
157        for result in pol.QueryTypeAttribute(domain, False):
158            alldomains[domain].attributes.add(result)
159
160def GetAllTypes(pol, oldpol):
161    global alltypes
162    global oldalltypes
163    alltypes = pol.GetAllTypes(False)
164    oldalltypes = oldpol.GetAllTypes(False)
165
166def setup(pol):
167    GetAllDomains(pol)
168    GetAttributes(pol)
169    GetDomainEntrypoints(pol)
170    GetAppDomains()
171    GetCoreDomains()
172
173# setup for the policy compatibility tests
174def compatSetup(pol, oldpol, mapping):
175    global compatMapping
176
177    GetAllTypes(pol, oldpol)
178    compatMapping = mapping
179
180def DomainsWithAttribute(attr):
181    global alldomains
182    domains = []
183    for domain in alldomains:
184        if attr in alldomains[domain].attributes:
185            domains.append(domain)
186    return domains
187
188#############################################################
189# Tests
190#############################################################
191def TestCoredomainViolations():
192    global alldomains
193    # verify that all domains launched from /system have the coredomain
194    # attribute
195    ret = ""
196    violators = []
197    for d in alldomains:
198        domain = alldomains[d]
199        if domain.fromSystem and "coredomain" not in domain.attributes:
200                violators.append(d);
201    if len(violators) > 0:
202        ret += "The following domain(s) must be associated with the "
203        ret += "\"coredomain\" attribute because they are executed off of "
204        ret += "/system:\n"
205        ret += " ".join(str(x) for x in sorted(violators)) + "\n"
206
207    # verify that all domains launched form /vendor do not have the coredomain
208    # attribute
209    violators = []
210    for d in alldomains:
211        domain = alldomains[d]
212        if domain.fromVendor and "coredomain" in domain.attributes:
213            violators.append(d)
214    if len(violators) > 0:
215        ret += "The following domains must not be associated with the "
216        ret += "\"coredomain\" attribute because they are executed off of "
217        ret += "/vendor or /system/vendor:\n"
218        ret += " ".join(str(x) for x in sorted(violators)) + "\n"
219
220    return ret
221
222###
223# Make sure that any new type introduced in the new policy that was not present
224# in the old policy has been recorded in the mapping file.
225def TestNoUnmappedNewTypes():
226    global alltypes
227    global oldalltypes
228    global compatMapping
229    newt = alltypes - oldalltypes
230    ret = ""
231    violators = []
232
233    for n in newt:
234        if compatMapping.rTypeattributesets.get(n) is None:
235            violators.append(n)
236
237    if len(violators) > 0:
238        ret += "SELinux: The following types were found added to the policy "
239        ret += "without an entry into the compatibility mapping file(s) found "
240        ret += "in private/compat/" + compatMapping.apiLevel + "/"
241        ret +=  compatMapping.apiLevel + "[.ignore].cil\n"
242        ret += " ".join(str(x) for x in sorted(violators)) + "\n"
243    return ret
244
245###
246# Make sure that any public type removed in the current policy has its
247# declaration added to the mapping file for use in non-platform policy
248def TestNoUnmappedRmTypes():
249    global alltypes
250    global oldalltypes
251    global compatMapping
252    rmt = oldalltypes - alltypes
253    ret = ""
254    violators = []
255
256    for o in rmt:
257        if o in compatMapping.pubtypes and not o in compatMapping.types:
258            violators.append(o)
259
260    if len(violators) > 0:
261        ret += "SELinux: The following formerly public types were removed from "
262        ret += "policy without a declaration in the compatibility mapping "
263        ret += "file(s) found in prebuilts/api/" + compatMapping.apiLevel + "/\n"
264        ret += " ".join(str(x) for x in sorted(violators)) + "\n"
265    return ret
266
267def TestTrebleCompatMapping():
268    ret = TestNoUnmappedNewTypes()
269    ret += TestNoUnmappedRmTypes()
270    return ret
271
272def TestViolatorAttribute(attribute):
273    global FakeTreble
274    ret = ""
275    if FakeTreble:
276        return ret
277
278    violators = DomainsWithAttribute(attribute)
279    if len(violators) > 0:
280        ret += "SELinux: The following domains violate the Treble ban "
281        ret += "against use of the " + attribute + " attribute: "
282        ret += " ".join(str(x) for x in sorted(violators)) + "\n"
283    return ret
284
285def TestViolatorAttributes():
286    ret = TestViolatorAttribute("binder_in_vendor_violators")
287    ret += TestViolatorAttribute("socket_between_core_and_vendor_violators")
288    ret += TestViolatorAttribute("vendor_executes_system_violators")
289    return ret
290
291# TODO move this to sepolicy_tests
292def TestCoreDataTypeViolations():
293    global pol
294    return pol.AssertPathTypesDoNotHaveAttr(["/data/vendor/", "/data/vendor_ce/",
295        "/data/vendor_de/"], [], "core_data_file_type")
296
297###
298# extend OptionParser to allow the same option flag to be used multiple times.
299# This is used to allow multiple file_contexts files and tests to be
300# specified.
301#
302class MultipleOption(Option):
303    ACTIONS = Option.ACTIONS + ("extend",)
304    STORE_ACTIONS = Option.STORE_ACTIONS + ("extend",)
305    TYPED_ACTIONS = Option.TYPED_ACTIONS + ("extend",)
306    ALWAYS_TYPED_ACTIONS = Option.ALWAYS_TYPED_ACTIONS + ("extend",)
307
308    def take_action(self, action, dest, opt, value, values, parser):
309        if action == "extend":
310            values.ensure_value(dest, []).append(value)
311        else:
312            Option.take_action(self, action, dest, opt, value, values, parser)
313
314Tests = {"CoredomainViolations": TestCoredomainViolations,
315         "CoreDatatypeViolations": TestCoreDataTypeViolations,
316         "TrebleCompatMapping": TestTrebleCompatMapping,
317         "ViolatorAttributes": TestViolatorAttributes}
318
319if __name__ == '__main__':
320    usage = "treble_sepolicy_tests -l $(ANDROID_HOST_OUT)/lib64/libsepolwrap.so "
321    usage += "-f nonplat_file_contexts -f plat_file_contexts "
322    usage += "-p curr_policy -b base_policy -o old_policy "
323    usage +="-m mapping file [--test test] [--help]"
324    parser = OptionParser(option_class=MultipleOption, usage=usage)
325    parser.add_option("-b", "--basepolicy", dest="basepolicy", metavar="FILE")
326    parser.add_option("-f", "--file_contexts", dest="file_contexts",
327            metavar="FILE", action="extend", type="string")
328    parser.add_option("-l", "--library-path", dest="libpath", metavar="FILE")
329    parser.add_option("-m", "--mapping", dest="mapping", metavar="FILE")
330    parser.add_option("-o", "--oldpolicy", dest="oldpolicy", metavar="FILE")
331    parser.add_option("-p", "--policy", dest="policy", metavar="FILE")
332    parser.add_option("-t", "--test", dest="tests", action="extend",
333            help="Test options include "+str(Tests))
334    parser.add_option("--fake-treble", action="store_true", dest="faketreble",
335            default=False)
336
337    (options, args) = parser.parse_args()
338
339    if not options.libpath:
340        sys.exit("Must specify path to libsepolwrap library\n" + parser.usage)
341    if not os.path.exists(options.libpath):
342        sys.exit("Error: library-path " + options.libpath + " does not exist\n"
343                + parser.usage)
344    if not options.policy:
345        sys.exit("Must specify current monolithic policy file\n" + parser.usage)
346    if not os.path.exists(options.policy):
347        sys.exit("Error: policy file " + options.policy + " does not exist\n"
348                + parser.usage)
349    if not options.file_contexts:
350        sys.exit("Error: Must specify file_contexts file(s)\n" + parser.usage)
351    for f in options.file_contexts:
352        if not os.path.exists(f):
353            sys.exit("Error: File_contexts file " + f + " does not exist\n" +
354                    parser.usage)
355
356    # Mapping files are only necessary for the TrebleCompatMapping test
357    if options.tests is None or options.tests is "TrebleCompatMapping":
358        if not options.basepolicy:
359            sys.exit("Must specify the current platform-only policy file\n" + parser.usage)
360        if not options.mapping:
361            sys.exit("Must specify a compatibility mapping file\n" + parser.usage)
362        if not options.oldpolicy:
363            sys.exit("Must specify the previous monolithic policy file\n" + parser.usage)
364        basepol = policy.Policy(options.basepolicy, None, options.libpath)
365        oldpol = policy.Policy(options.oldpolicy, None, options.libpath)
366        mapping = mini_parser.MiniCilParser(options.mapping)
367        compatSetup(basepol, oldpol, mapping)
368
369
370    if options.faketreble:
371        FakeTreble = True
372
373    pol = policy.Policy(options.policy, options.file_contexts, options.libpath)
374    setup(pol)
375
376    if DEBUG:
377        PrintScontexts()
378
379    results = ""
380    # If an individual test is not specified, run all tests.
381    if options.tests is None:
382        for t in Tests.values():
383            results += t()
384    else:
385        for tn in options.tests:
386            t = Tests.get(tn)
387            if t:
388                results += t()
389            else:
390                err = "Error: unknown test: " + tn + "\n"
391                err += "Available tests:\n"
392                for tn in Tests.keys():
393                    err += tn + "\n"
394                sys.exit(err)
395
396    if len(results) > 0:
397        sys.exit(results)
398