• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from ctypes import *
16import re
17import os
18import sys
19import platform
20import fc_sort
21
22###
23# Check whether the regex will match a file path starting with the provided
24# prefix
25#
26# Compares regex entries in file_contexts with a path prefix. Regex entries
27# are often more specific than this file prefix. For example, the regex could
28# be /system/bin/foo\.sh and the prefix could be /system. This function
29# loops over the regex removing characters from the end until
30# 1) there is a match - return True or 2) run out of characters - return
31#    False.
32#
33def MatchPathPrefix(pathregex, prefix):
34    for i in range(len(pathregex), 0, -1):
35        try:
36            pattern = re.compile('^' + pathregex[0:i] + "$")
37        except:
38            continue
39        if pattern.match(prefix):
40            return True
41    return False
42
43def MatchPathPrefixes(pathregex, Prefixes):
44    for Prefix in Prefixes:
45        if MatchPathPrefix(pathregex, Prefix):
46            return True
47    return False
48
49class TERule:
50    def __init__(self, rule):
51        data = rule.split(',')
52        self.flavor = data[0]
53        self.sctx = data[1]
54        self.tctx = data[2]
55        self.tclass = data[3]
56        self.perms = set((data[4].strip()).split(' '))
57        self.rule = rule
58
59class Policy:
60    __ExpandedRules = set()
61    __Rules = set()
62    __FcDict = None
63    __FcSorted = None
64    __GenfsDict = None
65    __libsepolwrap = None
66    __policydbP = None
67    __BUFSIZE = 2048
68
69    def AssertPathTypesDoNotHaveAttr(self, MatchPrefix, DoNotMatchPrefix, Attr, ExcludedTypes = []):
70        # Query policy for the types associated with Attr
71        TypesPol = self.QueryTypeAttribute(Attr, True) - set(ExcludedTypes)
72        # Search file_contexts to find types associated with input paths.
73        TypesFc, Files = self.__GetTypesAndFilesByFilePathPrefix(MatchPrefix, DoNotMatchPrefix)
74        violators = TypesFc.intersection(TypesPol)
75        ret = ""
76        if len(violators) > 0:
77            ret += "The following types on "
78            ret += " ".join(str(x) for x in sorted(MatchPrefix))
79            ret += " must not be associated with the "
80            ret += "\"" + Attr + "\" attribute: "
81            ret += " ".join(str(x) for x in sorted(violators)) + "\n"
82            ret += " corresponding to files: "
83            ret += " ".join(str(x) for x in sorted(Files)) + "\n"
84        return ret
85
86    # Check that all types for "filesystem" have "attribute" associated with them
87    # for types labeled in genfs_contexts.
88    def AssertGenfsFilesystemTypesHaveAttr(self, Filesystem, Attr):
89        TypesPol = self.QueryTypeAttribute(Attr, True)
90        TypesGenfs = self.__GenfsDict[Filesystem]
91        violators = TypesGenfs.difference(TypesPol)
92
93        ret = ""
94        if len(violators) > 0:
95            ret += "The following types in " + Filesystem
96            ret += " must be associated with the "
97            ret += "\"" + Attr + "\" attribute: "
98            ret += " ".join(str(x) for x in sorted(violators)) + "\n"
99        return ret
100
101    # Check that path prefixes that match MatchPrefix, and do not Match
102    # DoNotMatchPrefix have the attribute Attr.
103    # For example assert that all types in /sys, and not in /sys/kernel/debugfs
104    # have the sysfs_type attribute.
105    def AssertPathTypesHaveAttr(self, MatchPrefix, DoNotMatchPrefix, Attr):
106        # Query policy for the types associated with Attr
107        TypesPol = self.QueryTypeAttribute(Attr, True)
108        # Search file_contexts to find paths/types that should be associated with
109        # Attr.
110        TypesFc, Files = self.__GetTypesAndFilesByFilePathPrefix(MatchPrefix, DoNotMatchPrefix)
111        violators = TypesFc.difference(TypesPol)
112
113        ret = ""
114        if len(violators) > 0:
115            ret += "The following types on "
116            ret += " ".join(str(x) for x in sorted(MatchPrefix))
117            ret += " must be associated with the "
118            ret += "\"" + Attr + "\" attribute: "
119            ret += " ".join(str(x) for x in sorted(violators)) + "\n"
120            ret += " corresponding to files: "
121            ret += " ".join(str(x) for x in sorted(Files)) + "\n"
122        return ret
123
124    def AssertPropertyOwnersAreExclusive(self):
125        systemProps = self.QueryTypeAttribute('system_property_type', True)
126        vendorProps = self.QueryTypeAttribute('vendor_property_type', True)
127        violators = systemProps.intersection(vendorProps)
128        ret = ""
129        if len(violators) > 0:
130            ret += "The following types have both system_property_type "
131            ret += "and vendor_property_type: "
132            ret += " ".join(str(x) for x in sorted(violators)) + "\n"
133        return ret
134
135    # Return all file_contexts entries that map to the input Type.
136    def QueryFc(self, Type):
137        if Type in self.__FcDict:
138            return self.__FcDict[Type]
139        else:
140            return None
141
142    # Return all attributes associated with a type if IsAttr=False or
143    # all types associated with an attribute if IsAttr=True
144    def QueryTypeAttribute(self, Type, IsAttr):
145        TypeIterP = self.__libsepolwrap.init_type_iter(self.__policydbP,
146                        create_string_buffer(Type.encode("ascii")), IsAttr)
147        if (TypeIterP == None):
148            sys.exit("Failed to initialize type iterator")
149        buf = create_string_buffer(self.__BUFSIZE)
150        TypeAttr = set()
151        while True:
152            ret = self.__libsepolwrap.get_type(buf, self.__BUFSIZE,
153                    self.__policydbP, TypeIterP)
154            if ret == 0:
155                TypeAttr.add(buf.value.decode("ascii"))
156                continue
157            if ret == 1:
158                break;
159            # We should never get here.
160            sys.exit("Failed to import policy")
161        self.__libsepolwrap.destroy_type_iter(TypeIterP)
162        return TypeAttr
163
164    def __TERuleMatch(self, Rule, **kwargs):
165        # Match source type
166        if ("scontext" in kwargs and
167                len(kwargs['scontext']) > 0 and
168                Rule.sctx not in kwargs['scontext']):
169            return False
170        # Match target type
171        if ("tcontext" in kwargs and
172                len(kwargs['tcontext']) > 0 and
173                Rule.tctx not in kwargs['tcontext']):
174            return False
175        # Match target class
176        if ("tclass" in kwargs and
177                len(kwargs['tclass']) > 0 and
178                not bool(set([Rule.tclass]) & kwargs['tclass'])):
179            return False
180        # Match any perms
181        if ("perms" in kwargs and
182                len(kwargs['perms']) > 0 and
183                not bool(Rule.perms & kwargs['perms'])):
184            return False
185        return True
186
187    # resolve a type to its attributes or
188    # resolve an attribute to its types and attributes
189    # For example if scontext is the domain attribute, then we need to
190    # include all types with the domain attribute such as untrusted_app and
191    # priv_app and all the attributes of those types such as appdomain.
192    def ResolveTypeAttribute(self, Type):
193        types = self.GetAllTypes(False)
194        attributes = self.GetAllTypes(True)
195
196        if Type in types:
197            return self.QueryTypeAttribute(Type, False)
198        elif Type in attributes:
199            TypesAndAttributes = set()
200            Types = self.QueryTypeAttribute(Type, True)
201            TypesAndAttributes |= Types
202            for T in Types:
203                TypesAndAttributes |= self.QueryTypeAttribute(T, False)
204            return TypesAndAttributes
205        else:
206            return set()
207
208    # Return all TERules that match:
209    # (any scontext) or (any tcontext) or (any tclass) or (any perms),
210    # perms.
211    # Any unspecified paramenter will match all.
212    #
213    # Example: QueryTERule(tcontext=["foo", "bar"], perms=["entrypoint"])
214    # Will return any rule with:
215    # (tcontext="foo" or tcontext="bar") and ("entrypoint" in perms)
216    def QueryTERule(self, **kwargs):
217        if len(self.__Rules) == 0:
218            self.__InitTERules()
219
220        # add any matching types and attributes for scontext and tcontext
221        if ("scontext" in kwargs and len(kwargs['scontext']) > 0):
222            scontext = set()
223            for sctx in kwargs['scontext']:
224                scontext |= self.ResolveTypeAttribute(sctx)
225            if (len(scontext) == 0):
226                return []
227            kwargs['scontext'] = scontext
228        if ("tcontext" in kwargs and len(kwargs['tcontext']) > 0):
229            tcontext = set()
230            for tctx in kwargs['tcontext']:
231                tcontext |= self.ResolveTypeAttribute(tctx)
232            if (len(tcontext) == 0):
233                return []
234            kwargs['tcontext'] = tcontext
235        for Rule in self.__Rules:
236            if self.__TERuleMatch(Rule, **kwargs):
237                yield Rule
238
239    # Same as QueryTERule but only using the expanded ruleset.
240    # i.e. all attributes have been expanded to their various types.
241    def QueryExpandedTERule(self, **kwargs):
242        if len(self.__ExpandedRules) == 0:
243            self.__InitExpandedTERules()
244        for Rule in self.__ExpandedRules:
245            if self.__TERuleMatch(Rule, **kwargs):
246                yield Rule
247
248    def GetAllTypes(self, isAttr):
249        TypeIterP = self.__libsepolwrap.init_type_iter(self.__policydbP, None, isAttr)
250        if (TypeIterP == None):
251            sys.exit("Failed to initialize type iterator")
252        buf = create_string_buffer(self.__BUFSIZE)
253        AllTypes = set()
254        while True:
255            ret = self.__libsepolwrap.get_type(buf, self.__BUFSIZE,
256                    self.__policydbP, TypeIterP)
257            if ret == 0:
258                AllTypes.add(buf.value.decode("ascii"))
259                continue
260            if ret == 1:
261                break;
262            # We should never get here.
263            sys.exit("Failed to import policy")
264        self.__libsepolwrap.destroy_type_iter(TypeIterP)
265        return AllTypes
266
267    def __ExactMatchPathPrefix(self, pathregex, prefix):
268        pattern = re.compile('^' + pathregex + "$")
269        if pattern.match(prefix):
270            return True
271        return False
272
273    # Return a tuple (prefix, i) where i is the index of the most specific
274    # match of prefix in the sorted file_contexts. This is useful for limiting a
275    # file_contexts search to matches that are more specific and omitting less
276    # specific matches. For example, finding all matches to prefix /data/vendor
277    # should not include /data(/.*)? if /data/vendor(/.*)? is also specified.
278    def __FcSortedIndex(self, prefix):
279        index = 0
280        for i in range(0, len(self.__FcSorted)):
281            if self.__ExactMatchPathPrefix(self.__FcSorted[i].path, prefix):
282                index = i
283        return prefix, index
284
285    # Return a tuple of (path, Type) for all matching paths. Use the sorted
286    # file_contexts and index returned from __FcSortedIndex() to limit results
287    # to results that are more specific than the prefix.
288    def __MatchPathPrefixTypes(self, prefix, index):
289        PathType = []
290        for i in range(index, len(self.__FcSorted)):
291            if MatchPathPrefix(self.__FcSorted[i].path, prefix):
292                PathType.append((self.__FcSorted[i].path, self.__FcSorted[i].type))
293        return PathType
294
295    # Return types that match MatchPrefixes but do not match
296    # DoNotMatchPrefixes
297    def __GetTypesAndFilesByFilePathPrefix(self, MatchPrefixes, DoNotMatchPrefixes):
298        Types = set()
299        Files = set()
300
301        MatchPrefixesWithIndex = []
302        for MatchPrefix in MatchPrefixes:
303            MatchPrefixesWithIndex.append(self.__FcSortedIndex(MatchPrefix))
304
305        for MatchPrefixWithIndex in MatchPrefixesWithIndex:
306            PathTypes = self.__MatchPathPrefixTypes(*MatchPrefixWithIndex)
307            for PathType in PathTypes:
308                if MatchPathPrefixes(PathType[0], DoNotMatchPrefixes):
309                    continue
310                Types.add(PathType[1])
311                Files.add(PathType[0])
312        return Types, Files
313
314    def __GetTERules(self, policydbP, avtabIterP, Rules):
315        if Rules is None:
316            Rules = set()
317        buf = create_string_buffer(self.__BUFSIZE)
318        ret = 0
319        while True:
320            ret = self.__libsepolwrap.get_allow_rule(buf, self.__BUFSIZE,
321                        policydbP, avtabIterP)
322            if ret == 0:
323                Rule = TERule(buf.value.decode("ascii"))
324                Rules.add(Rule)
325                continue
326            if ret == 1:
327                break;
328            # We should never get here.
329            sys.exit("Failed to import policy")
330
331    def __InitTERules(self):
332        avtabIterP = self.__libsepolwrap.init_avtab(self.__policydbP)
333        if (avtabIterP == None):
334            sys.exit("Failed to initialize avtab")
335        self.__GetTERules(self.__policydbP, avtabIterP, self.__Rules)
336        self.__libsepolwrap.destroy_avtab(avtabIterP)
337        avtabIterP = self.__libsepolwrap.init_cond_avtab(self.__policydbP)
338        if (avtabIterP == None):
339            sys.exit("Failed to initialize conditional avtab")
340        self.__GetTERules(self.__policydbP, avtabIterP, self.__Rules)
341        self.__libsepolwrap.destroy_avtab(avtabIterP)
342
343    def __InitExpandedTERules(self):
344        avtabIterP = self.__libsepolwrap.init_expanded_avtab(self.__policydbP)
345        if (avtabIterP == None):
346            sys.exit("Failed to initialize avtab")
347        self.__GetTERules(self.__policydbP, avtabIterP, self.__ExpandedRules)
348        self.__libsepolwrap.destroy_expanded_avtab(avtabIterP)
349        avtabIterP = self.__libsepolwrap.init_expanded_cond_avtab(self.__policydbP)
350        if (avtabIterP == None):
351            sys.exit("Failed to initialize conditional avtab")
352        self.__GetTERules(self.__policydbP, avtabIterP, self.__ExpandedRules)
353        self.__libsepolwrap.destroy_expanded_avtab(avtabIterP)
354
355    # load ctypes-ified libsepol wrapper
356    def __InitLibsepolwrap(self, LibPath):
357        lib = CDLL(LibPath)
358
359        # int get_allow_rule(char *out, size_t len, void *policydbp, void *avtab_iterp);
360        lib.get_allow_rule.restype = c_int
361        lib.get_allow_rule.argtypes = [c_char_p, c_size_t, c_void_p, c_void_p];
362        # void *load_policy(const char *policy_path);
363        lib.load_policy.restype = c_void_p
364        lib.load_policy.argtypes = [c_char_p]
365        # void destroy_policy(void *policydbp);
366        lib.destroy_policy.argtypes = [c_void_p]
367        # void *init_expanded_avtab(void *policydbp);
368        lib.init_expanded_avtab.restype = c_void_p
369        lib.init_expanded_avtab.argtypes = [c_void_p]
370        # void *init_expanded_cond_avtab(void *policydbp);
371        lib.init_expanded_cond_avtab.restype = c_void_p
372        lib.init_expanded_cond_avtab.argtypes = [c_void_p]
373        # void destroy_expanded_avtab(void *avtab_iterp);
374        lib.destroy_expanded_avtab.argtypes = [c_void_p]
375        # void *init_avtab(void *policydbp);
376        lib.init_avtab.restype = c_void_p
377        lib.init_avtab.argtypes = [c_void_p]
378        # void *init_cond_avtab(void *policydbp);
379        lib.init_cond_avtab.restype = c_void_p
380        lib.init_cond_avtab.argtypes = [c_void_p]
381        # void destroy_avtab(void *avtab_iterp);
382        lib.destroy_avtab.argtypes = [c_void_p]
383        # int get_type(char *out, size_t max_size, void *policydbp, void *type_iterp);
384        lib.get_type.restype = c_int
385        lib.get_type.argtypes = [c_char_p, c_size_t, c_void_p, c_void_p]
386        # void *init_type_iter(void *policydbp, const char *type, bool is_attr);
387        lib.init_type_iter.restype = c_void_p
388        lib.init_type_iter.argtypes = [c_void_p, c_char_p, c_bool]
389        # void destroy_type_iter(void *type_iterp);
390        lib.destroy_type_iter.argtypes = [c_void_p]
391        # void *init_genfs_iter(void *policydbp)
392        lib.init_genfs_iter.restype = c_void_p
393        lib.init_genfs_iter.argtypes = [c_void_p]
394        # int get_genfs(char *out, size_t max_size, void *genfs_iterp);
395        lib.get_genfs.restype = c_int
396        lib.get_genfs.argtypes = [c_char_p, c_size_t, c_void_p, c_void_p]
397        # void destroy_genfs_iter(void *genfs_iterp)
398        lib.destroy_genfs_iter.argtypes = [c_void_p]
399
400        self.__libsepolwrap = lib
401
402    def __GenfsDictAdd(self, Dict, buf):
403        fs, buf = buf.split(' ', 1)
404        path, context = buf.rsplit(' ', 1)
405        Type = context.split(":")[2]
406        if not fs in Dict:
407            Dict[fs] = {Type}
408        else:
409            Dict[fs].add(Type)
410
411    def __InitGenfsCon(self):
412        self.__GenfsDict = {}
413        GenfsIterP = self.__libsepolwrap.init_genfs_iter(self.__policydbP)
414        if (GenfsIterP == None):
415            sys.exit("Failed to retreive genfs entries")
416        buf = create_string_buffer(self.__BUFSIZE)
417        while True:
418            ret = self.__libsepolwrap.get_genfs(buf, self.__BUFSIZE,
419                        self.__policydbP, GenfsIterP)
420            if ret == 0:
421                self.__GenfsDictAdd(self.__GenfsDict, buf.value.decode("ascii"))
422                continue
423            if ret == 1:
424                self.__GenfsDictAdd(self.__GenfsDict, buf.value.decode("ascii"))
425                break;
426            # We should never get here.
427            sys.exit("Failed to get genfs entries")
428        self.__libsepolwrap.destroy_genfs_iter(GenfsIterP)
429
430    # load file_contexts
431    def __InitFC(self, FcPaths):
432        if FcPaths is None:
433            return
434        fc = []
435        for path in FcPaths:
436            if not os.path.exists(path):
437                sys.exit("file_contexts file " + path + " does not exist.")
438            fd = open(path, "r")
439            fc += fd.readlines()
440            fd.close()
441        self.__FcDict = {}
442        for i in fc:
443            rec = i.split()
444            try:
445                t = rec[-1].split(":")[2]
446                if t in self.__FcDict:
447                    self.__FcDict[t].append(rec[0])
448                else:
449                    self.__FcDict[t] = [rec[0]]
450            except:
451                pass
452        self.__FcSorted = fc_sort.sort(FcPaths)
453
454    # load policy
455    def __InitPolicy(self, PolicyPath):
456        cPolicyPath = create_string_buffer(PolicyPath.encode("ascii"))
457        self.__policydbP = self.__libsepolwrap.load_policy(cPolicyPath)
458        if (self.__policydbP is None):
459            sys.exit("Failed to load policy")
460
461    def __init__(self, PolicyPath, FcPaths, LibPath):
462        self.__InitLibsepolwrap(LibPath)
463        self.__InitFC(FcPaths)
464        self.__InitPolicy(PolicyPath)
465        self.__InitGenfsCon()
466
467    def __del__(self):
468        if self.__policydbP is not None:
469            self.__libsepolwrap.destroy_policy(self.__policydbP)
470