• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from ctypes import *
16import re
17import os
18import sys
19import platform
20import fc_sort
21
22###
23# Check whether the regex will match a file path starting with the provided
24# prefix
25#
26# Compares regex entries in file_contexts with a path prefix. Regex entries
27# are often more specific than this file prefix. For example, the regex could
28# be /system/bin/foo\.sh and the prefix could be /system. This function
29# loops over the regex removing characters from the end until
30# 1) there is a match - return True or 2) run out of characters - return
31#    False.
32#
33def MatchPathPrefix(pathregex, prefix):
34    for i in range(len(pathregex), 0, -1):
35        try:
36            pattern = re.compile('^' + pathregex[0:i] + "$")
37        except:
38            continue
39        if pattern.match(prefix):
40            return True
41    return False
42
43def MatchPathPrefixes(pathregex, Prefixes):
44    for Prefix in Prefixes:
45        if MatchPathPrefix(pathregex, Prefix):
46            return True
47    return False
48
49class TERule:
50    def __init__(self, rule):
51        data = rule.split(',')
52        self.flavor = data[0]
53        self.sctx = data[1]
54        self.tctx = data[2]
55        self.tclass = data[3]
56        self.perms = set((data[4].strip()).split(' '))
57        self.rule = rule
58
59class Policy:
60    __ExpandedRules = set()
61    __Rules = set()
62    __FcDict = None
63    __FcSorted = None
64    __GenfsDict = None
65    __libsepolwrap = None
66    __policydbP = None
67    __BUFSIZE = 2048
68
69    def AssertPathTypesDoNotHaveAttr(self, MatchPrefix, DoNotMatchPrefix, Attr, ExcludedTypes = []):
70        # Query policy for the types associated with Attr
71        TypesPol = self.QueryTypeAttribute(Attr, True) - set(ExcludedTypes)
72        # Search file_contexts to find types associated with input paths.
73        TypesFc, Files = self.__GetTypesAndFilesByFilePathPrefix(MatchPrefix, DoNotMatchPrefix)
74        violators = TypesFc.intersection(TypesPol)
75        ret = ""
76        if len(violators) > 0:
77            ret += "The following types on "
78            ret += " ".join(str(x) for x in sorted(MatchPrefix))
79            ret += " must not be associated with the "
80            ret += "\"" + Attr + "\" attribute: "
81            ret += " ".join(str(x) for x in sorted(violators)) + "\n"
82            ret += " corresponding to files: "
83            ret += " ".join(str(x) for x in sorted(Files)) + "\n"
84        return ret
85
86    # Check that all types for "filesystem" have "attribute" associated with them
87    # for types labeled in genfs_contexts.
88    def AssertGenfsFilesystemTypesHaveAttr(self, Filesystem, Attr):
89        TypesPol = self.QueryTypeAttribute(Attr, True)
90        TypesGenfs = self.__GenfsDict[Filesystem]
91        violators = TypesGenfs.difference(TypesPol)
92
93        ret = ""
94        if len(violators) > 0:
95            ret += "The following types in " + Filesystem
96            ret += " must be associated with the "
97            ret += "\"" + Attr + "\" attribute: "
98            ret += " ".join(str(x) for x in sorted(violators)) + "\n"
99        return ret
100
101    # Check that path prefixes that match MatchPrefix, and do not Match
102    # DoNotMatchPrefix have the attribute Attr.
103    # For example assert that all types in /sys, and not in /sys/kernel/debugfs
104    # have the sysfs_type attribute.
105    def AssertPathTypesHaveAttr(self, MatchPrefix, DoNotMatchPrefix, Attr):
106        # Query policy for the types associated with Attr
107        TypesPol = self.QueryTypeAttribute(Attr, True)
108        # Search file_contexts to find paths/types that should be associated with
109        # Attr.
110        TypesFc, Files = self.__GetTypesAndFilesByFilePathPrefix(MatchPrefix, DoNotMatchPrefix)
111        violators = TypesFc.difference(TypesPol)
112
113        ret = ""
114        if len(violators) > 0:
115            ret += "The following types on "
116            ret += " ".join(str(x) for x in sorted(MatchPrefix))
117            ret += " must be associated with the "
118            ret += "\"" + Attr + "\" attribute: "
119            ret += " ".join(str(x) for x in sorted(violators)) + "\n"
120            ret += " corresponding to files: "
121            ret += " ".join(str(x) for x in sorted(Files)) + "\n"
122        return ret
123
124    def AssertPropertyOwnersAreExclusive(self):
125        systemProps = self.QueryTypeAttribute('system_property_type', True)
126        vendorProps = self.QueryTypeAttribute('vendor_property_type', True)
127        violators = systemProps.intersection(vendorProps)
128        ret = ""
129        if len(violators) > 0:
130            ret += "The following types have both system_property_type "
131            ret += "and vendor_property_type: "
132            ret += " ".join(str(x) for x in sorted(violators)) + "\n"
133        return ret
134
135    # Return all file_contexts entries that map to the input Type.
136    def QueryFc(self, Type):
137        if Type in self.__FcDict:
138            return self.__FcDict[Type]
139        else:
140            return None
141
142    # Return all attributes associated with a type if IsAttr=False or
143    # all types associated with an attribute if IsAttr=True
144    def QueryTypeAttribute(self, Type, IsAttr):
145        TypeIterP = self.__libsepolwrap.init_type_iter(self.__policydbP,
146                        create_string_buffer(Type.encode("ascii")), IsAttr)
147        if (TypeIterP == None):
148            sys.exit("Failed to initialize type iterator")
149        buf = create_string_buffer(self.__BUFSIZE)
150        TypeAttr = set()
151        while True:
152            ret = self.__libsepolwrap.get_type(buf, self.__BUFSIZE,
153                    self.__policydbP, TypeIterP)
154            if ret == 0:
155                TypeAttr.add(buf.value.decode("ascii"))
156                continue
157            if ret == 1:
158                break;
159            # We should never get here.
160            sys.exit("Failed to import policy")
161        self.__libsepolwrap.destroy_type_iter(TypeIterP)
162        return TypeAttr
163
164    def __TERuleMatch(self, Rule, **kwargs):
165        # Match source type
166        if ("scontext" in kwargs and
167                len(kwargs['scontext']) > 0 and
168                Rule.sctx not in kwargs['scontext']):
169            return False
170        # Match target type
171        if ("tcontext" in kwargs and
172                len(kwargs['tcontext']) > 0 and
173                Rule.tctx not in kwargs['tcontext']):
174            return False
175        # Match target class
176        if ("tclass" in kwargs and
177                len(kwargs['tclass']) > 0 and
178                not bool(set([Rule.tclass]) & kwargs['tclass'])):
179            return False
180        # Match any perms
181        if ("perms" in kwargs and
182                len(kwargs['perms']) > 0 and
183                not bool(Rule.perms & kwargs['perms'])):
184            return False
185        return True
186
187    # resolve a type to its attributes or
188    # resolve an attribute to its types and attributes
189    # For example if scontext is the domain attribute, then we need to
190    # include all types with the domain attribute such as untrusted_app and
191    # priv_app and all the attributes of those types such as appdomain.
192    def ResolveTypeAttribute(self, Type):
193        types = self.GetAllTypes(False)
194        attributes = self.GetAllTypes(True)
195
196        if Type in types:
197            return self.QueryTypeAttribute(Type, False)
198        elif Type in attributes:
199            TypesAndAttributes = set()
200            Types = self.QueryTypeAttribute(Type, True)
201            TypesAndAttributes |= Types
202            for T in Types:
203                TypesAndAttributes |= self.QueryTypeAttribute(T, False)
204            return TypesAndAttributes
205        else:
206            return set()
207
208    # Return all TERules that match:
209    # (any scontext) or (any tcontext) or (any tclass) or (any perms),
210    # perms.
211    # Any unspecified paramenter will match all.
212    #
213    # Example: QueryTERule(tcontext=["foo", "bar"], perms=["entrypoint"])
214    # Will return any rule with:
215    # (tcontext="foo" or tcontext="bar") and ("entrypoint" in perms)
216    def QueryTERule(self, **kwargs):
217        if len(self.__Rules) == 0:
218            self.__InitTERules()
219
220        # add any matching types and attributes for scontext and tcontext
221        if ("scontext" in kwargs and len(kwargs['scontext']) > 0):
222            scontext = set()
223            for sctx in kwargs['scontext']:
224                scontext |= self.ResolveTypeAttribute(sctx)
225            kwargs['scontext'] = scontext
226        if ("tcontext" in kwargs and len(kwargs['tcontext']) > 0):
227            tcontext = set()
228            for tctx in kwargs['tcontext']:
229                tcontext |= self.ResolveTypeAttribute(tctx)
230            kwargs['tcontext'] = tcontext
231        for Rule in self.__Rules:
232            if self.__TERuleMatch(Rule, **kwargs):
233                yield Rule
234
235    # Same as QueryTERule but only using the expanded ruleset.
236    # i.e. all attributes have been expanded to their various types.
237    def QueryExpandedTERule(self, **kwargs):
238        if len(self.__ExpandedRules) == 0:
239            self.__InitExpandedTERules()
240        for Rule in self.__ExpandedRules:
241            if self.__TERuleMatch(Rule, **kwargs):
242                yield Rule
243
244    def GetAllTypes(self, isAttr):
245        TypeIterP = self.__libsepolwrap.init_type_iter(self.__policydbP, None, isAttr)
246        if (TypeIterP == None):
247            sys.exit("Failed to initialize type iterator")
248        buf = create_string_buffer(self.__BUFSIZE)
249        AllTypes = set()
250        while True:
251            ret = self.__libsepolwrap.get_type(buf, self.__BUFSIZE,
252                    self.__policydbP, TypeIterP)
253            if ret == 0:
254                AllTypes.add(buf.value.decode("ascii"))
255                continue
256            if ret == 1:
257                break;
258            # We should never get here.
259            sys.exit("Failed to import policy")
260        self.__libsepolwrap.destroy_type_iter(TypeIterP)
261        return AllTypes
262
263    def __ExactMatchPathPrefix(self, pathregex, prefix):
264        pattern = re.compile('^' + pathregex + "$")
265        if pattern.match(prefix):
266            return True
267        return False
268
269    # Return a tuple (prefix, i) where i is the index of the most specific
270    # match of prefix in the sorted file_contexts. This is useful for limiting a
271    # file_contexts search to matches that are more specific and omitting less
272    # specific matches. For example, finding all matches to prefix /data/vendor
273    # should not include /data(/.*)? if /data/vendor(/.*)? is also specified.
274    def __FcSortedIndex(self, prefix):
275        index = 0
276        for i in range(0, len(self.__FcSorted)):
277            if self.__ExactMatchPathPrefix(self.__FcSorted[i].path, prefix):
278                index = i
279        return prefix, index
280
281    # Return a tuple of (path, Type) for all matching paths. Use the sorted
282    # file_contexts and index returned from __FcSortedIndex() to limit results
283    # to results that are more specific than the prefix.
284    def __MatchPathPrefixTypes(self, prefix, index):
285        PathType = []
286        for i in range(index, len(self.__FcSorted)):
287            if MatchPathPrefix(self.__FcSorted[i].path, prefix):
288                PathType.append((self.__FcSorted[i].path, self.__FcSorted[i].type))
289        return PathType
290
291    # Return types that match MatchPrefixes but do not match
292    # DoNotMatchPrefixes
293    def __GetTypesAndFilesByFilePathPrefix(self, MatchPrefixes, DoNotMatchPrefixes):
294        Types = set()
295        Files = set()
296
297        MatchPrefixesWithIndex = []
298        for MatchPrefix in MatchPrefixes:
299            MatchPrefixesWithIndex.append(self.__FcSortedIndex(MatchPrefix))
300
301        for MatchPrefixWithIndex in MatchPrefixesWithIndex:
302            PathTypes = self.__MatchPathPrefixTypes(*MatchPrefixWithIndex)
303            for PathType in PathTypes:
304                if MatchPathPrefixes(PathType[0], DoNotMatchPrefixes):
305                    continue
306                Types.add(PathType[1])
307                Files.add(PathType[0])
308        return Types, Files
309
310    def __GetTERules(self, policydbP, avtabIterP, Rules):
311        if Rules is None:
312            Rules = set()
313        buf = create_string_buffer(self.__BUFSIZE)
314        ret = 0
315        while True:
316            ret = self.__libsepolwrap.get_allow_rule(buf, self.__BUFSIZE,
317                        policydbP, avtabIterP)
318            if ret == 0:
319                Rule = TERule(buf.value.decode("ascii"))
320                Rules.add(Rule)
321                continue
322            if ret == 1:
323                break;
324            # We should never get here.
325            sys.exit("Failed to import policy")
326
327    def __InitTERules(self):
328        avtabIterP = self.__libsepolwrap.init_avtab(self.__policydbP)
329        if (avtabIterP == None):
330            sys.exit("Failed to initialize avtab")
331        self.__GetTERules(self.__policydbP, avtabIterP, self.__Rules)
332        self.__libsepolwrap.destroy_avtab(avtabIterP)
333        avtabIterP = self.__libsepolwrap.init_cond_avtab(self.__policydbP)
334        if (avtabIterP == None):
335            sys.exit("Failed to initialize conditional avtab")
336        self.__GetTERules(self.__policydbP, avtabIterP, self.__Rules)
337        self.__libsepolwrap.destroy_avtab(avtabIterP)
338
339    def __InitExpandedTERules(self):
340        avtabIterP = self.__libsepolwrap.init_expanded_avtab(self.__policydbP)
341        if (avtabIterP == None):
342            sys.exit("Failed to initialize avtab")
343        self.__GetTERules(self.__policydbP, avtabIterP, self.__ExpandedRules)
344        self.__libsepolwrap.destroy_expanded_avtab(avtabIterP)
345        avtabIterP = self.__libsepolwrap.init_expanded_cond_avtab(self.__policydbP)
346        if (avtabIterP == None):
347            sys.exit("Failed to initialize conditional avtab")
348        self.__GetTERules(self.__policydbP, avtabIterP, self.__ExpandedRules)
349        self.__libsepolwrap.destroy_expanded_avtab(avtabIterP)
350
351    # load ctypes-ified libsepol wrapper
352    def __InitLibsepolwrap(self, LibPath):
353        lib = CDLL(LibPath)
354
355        # int get_allow_rule(char *out, size_t len, void *policydbp, void *avtab_iterp);
356        lib.get_allow_rule.restype = c_int
357        lib.get_allow_rule.argtypes = [c_char_p, c_size_t, c_void_p, c_void_p];
358        # void *load_policy(const char *policy_path);
359        lib.load_policy.restype = c_void_p
360        lib.load_policy.argtypes = [c_char_p]
361        # void destroy_policy(void *policydbp);
362        lib.destroy_policy.argtypes = [c_void_p]
363        # void *init_expanded_avtab(void *policydbp);
364        lib.init_expanded_avtab.restype = c_void_p
365        lib.init_expanded_avtab.argtypes = [c_void_p]
366        # void *init_expanded_cond_avtab(void *policydbp);
367        lib.init_expanded_cond_avtab.restype = c_void_p
368        lib.init_expanded_cond_avtab.argtypes = [c_void_p]
369        # void destroy_expanded_avtab(void *avtab_iterp);
370        lib.destroy_expanded_avtab.argtypes = [c_void_p]
371        # void *init_avtab(void *policydbp);
372        lib.init_avtab.restype = c_void_p
373        lib.init_avtab.argtypes = [c_void_p]
374        # void *init_cond_avtab(void *policydbp);
375        lib.init_cond_avtab.restype = c_void_p
376        lib.init_cond_avtab.argtypes = [c_void_p]
377        # void destroy_avtab(void *avtab_iterp);
378        lib.destroy_avtab.argtypes = [c_void_p]
379        # int get_type(char *out, size_t max_size, void *policydbp, void *type_iterp);
380        lib.get_type.restype = c_int
381        lib.get_type.argtypes = [c_char_p, c_size_t, c_void_p, c_void_p]
382        # void *init_type_iter(void *policydbp, const char *type, bool is_attr);
383        lib.init_type_iter.restype = c_void_p
384        lib.init_type_iter.argtypes = [c_void_p, c_char_p, c_bool]
385        # void destroy_type_iter(void *type_iterp);
386        lib.destroy_type_iter.argtypes = [c_void_p]
387        # void *init_genfs_iter(void *policydbp)
388        lib.init_genfs_iter.restype = c_void_p
389        lib.init_genfs_iter.argtypes = [c_void_p]
390        # int get_genfs(char *out, size_t max_size, void *genfs_iterp);
391        lib.get_genfs.restype = c_int
392        lib.get_genfs.argtypes = [c_char_p, c_size_t, c_void_p, c_void_p]
393        # void destroy_genfs_iter(void *genfs_iterp)
394        lib.destroy_genfs_iter.argtypes = [c_void_p]
395
396        self.__libsepolwrap = lib
397
398    def __GenfsDictAdd(self, Dict, buf):
399        fs, buf = buf.split(' ', 1)
400        path, context = buf.rsplit(' ', 1)
401        Type = context.split(":")[2]
402        if not fs in Dict:
403            Dict[fs] = {Type}
404        else:
405            Dict[fs].add(Type)
406
407    def __InitGenfsCon(self):
408        self.__GenfsDict = {}
409        GenfsIterP = self.__libsepolwrap.init_genfs_iter(self.__policydbP)
410        if (GenfsIterP == None):
411            sys.exit("Failed to retreive genfs entries")
412        buf = create_string_buffer(self.__BUFSIZE)
413        while True:
414            ret = self.__libsepolwrap.get_genfs(buf, self.__BUFSIZE,
415                        self.__policydbP, GenfsIterP)
416            if ret == 0:
417                self.__GenfsDictAdd(self.__GenfsDict, buf.value.decode("ascii"))
418                continue
419            if ret == 1:
420                self.__GenfsDictAdd(self.__GenfsDict, buf.value.decode("ascii"))
421                break;
422            # We should never get here.
423            sys.exit("Failed to get genfs entries")
424        self.__libsepolwrap.destroy_genfs_iter(GenfsIterP)
425
426    # load file_contexts
427    def __InitFC(self, FcPaths):
428        if FcPaths is None:
429            return
430        fc = []
431        for path in FcPaths:
432            if not os.path.exists(path):
433                sys.exit("file_contexts file " + path + " does not exist.")
434            fd = open(path, "r")
435            fc += fd.readlines()
436            fd.close()
437        self.__FcDict = {}
438        for i in fc:
439            rec = i.split()
440            try:
441                t = rec[-1].split(":")[2]
442                if t in self.__FcDict:
443                    self.__FcDict[t].append(rec[0])
444                else:
445                    self.__FcDict[t] = [rec[0]]
446            except:
447                pass
448        self.__FcSorted = fc_sort.sort(FcPaths)
449
450    # load policy
451    def __InitPolicy(self, PolicyPath):
452        cPolicyPath = create_string_buffer(PolicyPath.encode("ascii"))
453        self.__policydbP = self.__libsepolwrap.load_policy(cPolicyPath)
454        if (self.__policydbP is None):
455            sys.exit("Failed to load policy")
456
457    def __init__(self, PolicyPath, FcPaths, LibPath):
458        self.__InitLibsepolwrap(LibPath)
459        self.__InitFC(FcPaths)
460        self.__InitPolicy(PolicyPath)
461        self.__InitGenfsCon()
462
463    def __del__(self):
464        if self.__policydbP is not None:
465            self.__libsepolwrap.destroy_policy(self.__policydbP)
466