• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from optparse import OptionParser
16from optparse import Option, OptionValueError
17import os
18import mini_parser
19import policy
20from policy import MatchPathPrefix
21import re
22import sys
23
24DEBUG=False
25SHARED_LIB_EXTENSION = '.dylib' if sys.platform == 'darwin' else '.so'
26
27'''
28Use file_contexts and policy to verify Treble requirements
29are not violated.
30'''
31coredomainAllowlist = {
32        # TODO: how do we make sure vendor_init doesn't have bad coupling with
33        # /vendor? It is the only system process which is not coredomain.
34        'vendor_init',
35        # TODO(b/152813275): need to avoid allowlist for rootdir
36        "modprobe",
37        "slideshow",
38        }
39
40class scontext:
41    def __init__(self):
42        self.fromSystem = False
43        self.fromVendor = False
44        self.coredomain = False
45        self.appdomain = False
46        self.attributes = set()
47        self.entrypoints = []
48        self.entrypointpaths = []
49        self.error = ""
50
51def PrintScontexts():
52    for d in sorted(alldomains.keys()):
53        sctx = alldomains[d]
54        print(d)
55        print("\tcoredomain="+str(sctx.coredomain))
56        print("\tappdomain="+str(sctx.appdomain))
57        print("\tfromSystem="+str(sctx.fromSystem))
58        print("\tfromVendor="+str(sctx.fromVendor))
59        print("\tattributes="+str(sctx.attributes))
60        print("\tentrypoints="+str(sctx.entrypoints))
61        print("\tentrypointpaths=")
62        if sctx.entrypointpaths is not None:
63            for path in sctx.entrypointpaths:
64                print("\t\t"+str(path))
65
66alldomains = {}
67coredomains = set()
68appdomains = set()
69vendordomains = set()
70pol = None
71
72# compat vars
73alltypes = set()
74oldalltypes = set()
75compatMapping = None
76pubtypes = set()
77
78# Distinguish between PRODUCT_FULL_TREBLE and PRODUCT_FULL_TREBLE_OVERRIDE
79FakeTreble = False
80
81def GetAllDomains(pol):
82    global alldomains
83    for result in pol.QueryTypeAttribute("domain", True):
84        alldomains[result] = scontext()
85
86def GetAppDomains():
87    global appdomains
88    global alldomains
89    for d in alldomains:
90        # The application of the "appdomain" attribute is trusted because core
91        # selinux policy contains neverallow rules that enforce that only zygote
92        # and runas spawned processes may transition to processes that have
93        # the appdomain attribute.
94        if "appdomain" in alldomains[d].attributes:
95            alldomains[d].appdomain = True
96            appdomains.add(d)
97
98def GetCoreDomains():
99    global alldomains
100    global coredomains
101    for d in alldomains:
102        domain = alldomains[d]
103        # TestCoredomainViolations will verify if coredomain was incorrectly
104        # applied.
105        if "coredomain" in domain.attributes:
106            domain.coredomain = True
107            coredomains.add(d)
108        # check whether domains are executed off of /system or /vendor
109        if d in coredomainAllowlist:
110            continue
111        # TODO(b/153112003): add checks to prevent app domains from being
112        # incorrectly labeled as coredomain. Apps don't have entrypoints as
113        # they're always dynamically transitioned to by zygote.
114        if d in appdomains:
115            continue
116        # TODO(b/153112747): need to handle cases where there is a dynamic
117        # transition OR there happens to be no context in AOSP files.
118        if not domain.entrypointpaths:
119            continue
120
121        for path in domain.entrypointpaths:
122            vendor = any(MatchPathPrefix(path, prefix) for prefix in
123                         ["/vendor", "/odm"])
124            system = any(MatchPathPrefix(path, prefix) for prefix in
125                         ["/init", "/system_ext", "/product" ])
126
127            # only mark entrypoint as system if it is not in legacy /system/vendor
128            if MatchPathPrefix(path, "/system/vendor"):
129                vendor = True
130            elif MatchPathPrefix(path, "/system"):
131                system = True
132
133            if not vendor and not system:
134                domain.error += "Unrecognized entrypoint for " + d + " at " + path + "\n"
135
136            domain.fromSystem = domain.fromSystem or system
137            domain.fromVendor = domain.fromVendor or vendor
138
139###
140# Add the entrypoint type and path(s) to each domain.
141#
142def GetDomainEntrypoints(pol):
143    global alldomains
144    for x in pol.QueryExpandedTERule(tclass=set(["file"]), perms=set(["entrypoint"])):
145        if not x.sctx in alldomains:
146            continue
147        alldomains[x.sctx].entrypoints.append(str(x.tctx))
148        # postinstall_file represents a special case specific to A/B OTAs.
149        # Update_engine mounts a partition and relabels it postinstall_file.
150        # There is no file_contexts entry associated with postinstall_file
151        # so skip the lookup.
152        if x.tctx == "postinstall_file":
153            continue
154        entrypointpath = pol.QueryFc(x.tctx)
155        if not entrypointpath:
156            continue
157        alldomains[x.sctx].entrypointpaths.extend(entrypointpath)
158###
159# Get attributes associated with each domain
160#
161def GetAttributes(pol):
162    global alldomains
163    for domain in alldomains:
164        for result in pol.QueryTypeAttribute(domain, False):
165            alldomains[domain].attributes.add(result)
166
167def GetAllTypes(pol, oldpol):
168    global alltypes
169    global oldalltypes
170    alltypes = pol.GetAllTypes(False)
171    oldalltypes = oldpol.GetAllTypes(False)
172
173def setup(pol):
174    GetAllDomains(pol)
175    GetAttributes(pol)
176    GetDomainEntrypoints(pol)
177    GetAppDomains()
178    GetCoreDomains()
179
180# setup for the policy compatibility tests
181def compatSetup(pol, oldpol, mapping, types):
182    global compatMapping
183    global pubtypes
184
185    GetAllTypes(pol, oldpol)
186    compatMapping = mapping
187    pubtypes = types
188
189def DomainsWithAttribute(attr):
190    global alldomains
191    domains = []
192    for domain in alldomains:
193        if attr in alldomains[domain].attributes:
194            domains.append(domain)
195    return domains
196
197#############################################################
198# Tests
199#############################################################
200def TestCoredomainViolations():
201    global alldomains
202    # verify that all domains launched from /system have the coredomain
203    # attribute
204    ret = ""
205
206    for d in alldomains:
207        domain = alldomains[d]
208        if domain.fromSystem and domain.fromVendor:
209            ret += "The following domain is system and vendor: " + d + "\n"
210
211    for domain in alldomains.values():
212        ret += domain.error
213
214    violators = []
215    for d in alldomains:
216        domain = alldomains[d]
217        if domain.fromSystem and "coredomain" not in domain.attributes:
218                violators.append(d);
219    if len(violators) > 0:
220        ret += "The following domain(s) must be associated with the "
221        ret += "\"coredomain\" attribute because they are executed off of "
222        ret += "/system:\n"
223        ret += " ".join(str(x) for x in sorted(violators)) + "\n"
224
225    # verify that all domains launched form /vendor do not have the coredomain
226    # attribute
227    violators = []
228    for d in alldomains:
229        domain = alldomains[d]
230        if domain.fromVendor and "coredomain" in domain.attributes:
231            violators.append(d)
232    if len(violators) > 0:
233        ret += "The following domains must not be associated with the "
234        ret += "\"coredomain\" attribute because they are executed off of "
235        ret += "/vendor or /system/vendor:\n"
236        ret += " ".join(str(x) for x in sorted(violators)) + "\n"
237
238    return ret
239
240###
241# Make sure that any new public type introduced in the new policy that was not
242# present in the old policy has been recorded in the mapping file.
243def TestNoUnmappedNewTypes():
244    global alltypes
245    global oldalltypes
246    global compatMapping
247    global pubtypes
248    newt = alltypes - oldalltypes
249    ret = ""
250    violators = []
251
252    for n in newt:
253        if n in pubtypes and compatMapping.rTypeattributesets.get(n) is None:
254            violators.append(n)
255
256    if len(violators) > 0:
257        ret += "SELinux: The following public types were found added to the "
258        ret += "policy without an entry into the compatibility mapping file(s) "
259        ret += "found in private/compat/V.v/V.v[.ignore].cil, where V.v is the "
260        ret += "latest API level.\n"
261        ret += " ".join(str(x) for x in sorted(violators)) + "\n\n"
262        ret += "See examples of how to fix this:\n"
263        ret += "https://android-review.googlesource.com/c/platform/system/sepolicy/+/781036\n"
264        ret += "https://android-review.googlesource.com/c/platform/system/sepolicy/+/852612\n"
265    return ret
266
267###
268# Make sure that any public type removed in the current policy has its
269# declaration added to the mapping file for use in non-platform policy
270def TestNoUnmappedRmTypes():
271    global alltypes
272    global oldalltypes
273    global compatMapping
274    rmt = oldalltypes - alltypes
275    ret = ""
276    violators = []
277
278    for o in rmt:
279        if o in compatMapping.pubtypes and not o in compatMapping.types:
280            violators.append(o)
281
282    if len(violators) > 0:
283        ret += "SELinux: The following formerly public types were removed from "
284        ret += "policy without a declaration in the compatibility mapping "
285        ret += "found in private/compat/V.v/V.v[.ignore].cil, where V.v is the "
286        ret += "latest API level.\n"
287        ret += " ".join(str(x) for x in sorted(violators)) + "\n\n"
288        ret += "See examples of how to fix this:\n"
289        ret += "https://android-review.googlesource.com/c/platform/system/sepolicy/+/822743\n"
290    return ret
291
292def TestTrebleCompatMapping():
293    ret = TestNoUnmappedNewTypes()
294    ret += TestNoUnmappedRmTypes()
295    return ret
296
297def TestViolatorAttribute(attribute):
298    global FakeTreble
299    ret = ""
300    if FakeTreble:
301        return ret
302
303    violators = DomainsWithAttribute(attribute)
304    if len(violators) > 0:
305        ret += "SELinux: The following domains violate the Treble ban "
306        ret += "against use of the " + attribute + " attribute: "
307        ret += " ".join(str(x) for x in sorted(violators)) + "\n"
308    return ret
309
310def TestViolatorAttributes():
311    ret = ""
312    ret += TestViolatorAttribute("socket_between_core_and_vendor_violators")
313    ret += TestViolatorAttribute("vendor_executes_system_violators")
314    return ret
315
316# TODO move this to sepolicy_tests
317def TestCoreDataTypeViolations():
318    global pol
319    return pol.AssertPathTypesDoNotHaveAttr(["/data/vendor/", "/data/vendor_ce/",
320        "/data/vendor_de/"], [], "core_data_file_type")
321
322###
323# extend OptionParser to allow the same option flag to be used multiple times.
324# This is used to allow multiple file_contexts files and tests to be
325# specified.
326#
327class MultipleOption(Option):
328    ACTIONS = Option.ACTIONS + ("extend",)
329    STORE_ACTIONS = Option.STORE_ACTIONS + ("extend",)
330    TYPED_ACTIONS = Option.TYPED_ACTIONS + ("extend",)
331    ALWAYS_TYPED_ACTIONS = Option.ALWAYS_TYPED_ACTIONS + ("extend",)
332
333    def take_action(self, action, dest, opt, value, values, parser):
334        if action == "extend":
335            values.ensure_value(dest, []).append(value)
336        else:
337            Option.take_action(self, action, dest, opt, value, values, parser)
338
339Tests = {"CoredomainViolations": TestCoredomainViolations,
340         "CoreDatatypeViolations": TestCoreDataTypeViolations,
341         "TrebleCompatMapping": TestTrebleCompatMapping,
342         "ViolatorAttributes": TestViolatorAttributes}
343
344if __name__ == '__main__':
345    usage = "treble_sepolicy_tests "
346    usage += "-f nonplat_file_contexts -f plat_file_contexts "
347    usage += "-p curr_policy -b base_policy -o old_policy "
348    usage +="-m mapping file [--test test] [--help]"
349    parser = OptionParser(option_class=MultipleOption, usage=usage)
350    parser.add_option("-b", "--basepolicy", dest="basepolicy", metavar="FILE")
351    parser.add_option("-u", "--base-pub-policy", dest="base_pub_policy",
352                      metavar="FILE")
353    parser.add_option("-f", "--file_contexts", dest="file_contexts",
354            metavar="FILE", action="extend", type="string")
355    parser.add_option("-m", "--mapping", dest="mapping", metavar="FILE")
356    parser.add_option("-o", "--oldpolicy", dest="oldpolicy", metavar="FILE")
357    parser.add_option("-p", "--policy", dest="policy", metavar="FILE")
358    parser.add_option("-t", "--test", dest="tests", action="extend",
359            help="Test options include "+str(Tests))
360    parser.add_option("--fake-treble", action="store_true", dest="faketreble",
361            default=False)
362
363    (options, args) = parser.parse_args()
364
365    if not options.policy:
366        sys.exit("Must specify current monolithic policy file\n" + parser.usage)
367    if not os.path.exists(options.policy):
368        sys.exit("Error: policy file " + options.policy + " does not exist\n"
369                + parser.usage)
370    if not options.file_contexts:
371        sys.exit("Error: Must specify file_contexts file(s)\n" + parser.usage)
372    for f in options.file_contexts:
373        if not os.path.exists(f):
374            sys.exit("Error: File_contexts file " + f + " does not exist\n" +
375                    parser.usage)
376
377    libpath = os.path.join(os.path.dirname(os.path.realpath(__file__)),
378                           "libsepolwrap" + SHARED_LIB_EXTENSION)
379    if not os.path.exists(libpath):
380        sys.exit("Error: libsepolwrap does not exist. Is this binary corrupted?\n")
381
382    # Mapping files and public platform policy are only necessary for the
383    # TrebleCompatMapping test.
384    if options.tests is None or options.tests == "TrebleCompatMapping":
385        if not options.basepolicy:
386            sys.exit("Must specify the current platform-only policy file\n"
387                     + parser.usage)
388        if not options.mapping:
389            sys.exit("Must specify a compatibility mapping file\n"
390                     + parser.usage)
391        if not options.oldpolicy:
392            sys.exit("Must specify the previous monolithic policy file\n"
393                     + parser.usage)
394        if not options.base_pub_policy:
395            sys.exit("Must specify the current platform-only public policy "
396                     + ".cil file\n" + parser.usage)
397        basepol = policy.Policy(options.basepolicy, None, libpath)
398        oldpol = policy.Policy(options.oldpolicy, None, libpath)
399        mapping = mini_parser.MiniCilParser(options.mapping)
400        pubpol = mini_parser.MiniCilParser(options.base_pub_policy)
401        compatSetup(basepol, oldpol, mapping, pubpol.types)
402
403    if options.faketreble:
404        FakeTreble = True
405
406    pol = policy.Policy(options.policy, options.file_contexts, libpath)
407    setup(pol)
408
409    if DEBUG:
410        PrintScontexts()
411
412    results = ""
413    # If an individual test is not specified, run all tests.
414    if options.tests is None:
415        for t in Tests.values():
416            results += t()
417    else:
418        for tn in options.tests:
419            t = Tests.get(tn)
420            if t:
421                results += t()
422            else:
423                err = "Error: unknown test: " + tn + "\n"
424                err += "Available tests:\n"
425                for tn in Tests.keys():
426                    err += tn + "\n"
427                sys.exit(err)
428
429    if len(results) > 0:
430        sys.exit(results)
431