import pdb import re from xml.etree.ElementTree import Element, SubElement, tostring #define equivalents TYPE = 0 ATTRIBUTE = 1 TYPEATTRIBUTE = 2 CLASS = 3 COMMON = 4 ALLOW_RULE = 5 NEVERALLOW_RULE = 6 OTHER = 7 #define helper methods # advance_past_whitespace(): helper function to skip whitespace at current # position in file. # returns: the non-whitespace character at the file's new position #TODO: should I deal with comments here as well? def advance_past_whitespace(file_obj): c = file_obj.read(1) while c.isspace(): c = file_obj.read(1) file_obj.seek(-1, 1) return c # advance_until_whitespace(): helper function to grab the string represented # by the current position in file until next whitespace. # returns: string until next whitespace. overlooks comments. def advance_until_whitespace(file_obj): ret_string = "" c = file_obj.read(1) #TODO: make a better way to deal with ':' and ';' while not (c.isspace() or c == ':' or c == '' or c == ';'): #don't count comments if c == '#': file_obj.readline() return ret_string else: ret_string+=c c = file_obj.read(1) if not c == ':': file_obj.seek(-1, 1) return ret_string # expand_avc_rule - takes a processed avc rule and converts it into a list of # 4-tuples for use in an access check of form: # (source_type, target_type, class, permission) def expand_avc_rule(policy, avc_rule): ret_list = [ ] #expand source_types source_types = avc_rule['source_types']['set'] source_types = policy.expand_types(source_types) if(avc_rule['source_types']['flags']['complement']): #TODO: deal with negated 'self', not present in current policy.conf, though (I think) source_types = policy.types - source_types #complement these types if len(source_types) == 0: print "ERROR: source_types empty after expansion" print "Before: " print avc_rule['source_types']['set'] return #expand target_types target_types = avc_rule['target_types']['set'] target_types = policy.expand_types(target_types) if(avc_rule['target_types']['flags']['complement']): #TODO: deal with negated 'self', not present in current policy.conf, though (I think) target_types = policy.types - target_types #complement these types if len(target_types) == 0: print "ERROR: target_types empty after expansion" print "Before: " print avc_rule['target_types']['set'] return # get classes rule_classes = avc_rule['classes']['set'] if '' in rule_classes: print "FOUND EMPTY STRING IN CLASSES" print "Total sets:" print avc_rule['source_types']['set'] print avc_rule['target_types']['set'] print rule_classes print avc_rule['permissions']['set'] if len(rule_classes) == 0: print "ERROR: empy set of object classes in avc rule" return # get permissions permissions = avc_rule['permissions']['set'] if len(permissions) == 0: print "ERROR: empy set of permissions in avc rule\n" return #create the list with collosal nesting, n^4 baby! for s in source_types: for t in target_types: for c in rule_classes: if c == '': continue #expand permissions on a per-class basis exp_permissions = policy.expand_permissions(c, permissions) if(avc_rule['permissions']['flags']['complement']): exp_permissions = policy.classes[c] - exp_permissions if len(exp_permissions) == 0: print "ERROR: permissions empty after expansion\n" print "Before: " print avc_rule['permissions']['set'] return for p in exp_permissions: source = s if t == 'self': target = s else: target = t obj_class = c permission = p ret_list.append((source, target, obj_class, permission)) return ret_list # expand_avc_rule - takes a processed avc rule and converts it into an xml # representation with the information needed in a checkSELinuxAccess() call. # (source_type, target_type, class, permission) def expand_avc_rule_to_xml(policy, avc_rule, rule_name, rule_type): rule_xml = Element('avc_rule') rule_xml.set('name', rule_name) rule_xml.set('type', rule_type) #expand source_types source_types = avc_rule['source_types']['set'] source_types = policy.expand_types(source_types) if(avc_rule['source_types']['flags']['complement']): #TODO: deal with negated 'self', not present in current policy.conf, though (I think) source_types = policy.types - source_types #complement these types if len(source_types) == 0: print "ERROR: source_types empty after expansion" print "Before: " print avc_rule['source_types']['set'] return for s in source_types: elem = SubElement(rule_xml, 'type') elem.set('type', 'source') elem.text = s #expand target_types target_types = avc_rule['target_types']['set'] target_types = policy.expand_types(target_types) if(avc_rule['target_types']['flags']['complement']): #TODO: deal with negated 'self', not present in current policy.conf, though (I think) target_types = policy.types - target_types #complement these types if len(target_types) == 0: print "ERROR: target_types empty after expansion" print "Before: " print avc_rule['target_types']['set'] return for t in target_types: elem = SubElement(rule_xml, 'type') elem.set('type', 'target') elem.text = t # get classes rule_classes = avc_rule['classes']['set'] if len(rule_classes) == 0: print "ERROR: empy set of object classes in avc rule" return # get permissions permissions = avc_rule['permissions']['set'] if len(permissions) == 0: print "ERROR: empy set of permissions in avc rule\n" return # permissions are class-dependent, so bundled together for c in rule_classes: if c == '': print "AH!!! empty class found!\n" continue c_elem = SubElement(rule_xml, 'obj_class') c_elem.set('name', c) #expand permissions on a per-class basis exp_permissions = policy.expand_permissions(c, permissions) if(avc_rule['permissions']['flags']['complement']): exp_permissions = policy.classes[c] - exp_permissions if len(exp_permissions) == 0: print "ERROR: permissions empty after expansion\n" print "Before: " print avc_rule['permissions']['set'] return for p in exp_permissions: p_elem = SubElement(c_elem, 'permission') p_elem.text = p return rule_xml # expand_brackets - helper function which reads a file into a string until '{ }'s # are balanced. Brackets are removed from the string. This function is based # on the understanding that nested brackets in our policy.conf file occur only due # to macro expansion, and we just need to know how much is included in a given # policy sub-component. def expand_brackets(file_obj): ret_string = "" c = file_obj.read(1) if not c == '{': print "Invalid bracket expression: " + c + "\n" file_obj.seek(-1, 1) return "" else: bracket_count = 1 while bracket_count > 0: c = file_obj.read(1) if c == '{': bracket_count+=1 elif c == '}': bracket_count-=1 elif c == '#': #get rid of comment and replace with whitespace file_obj.readline() ret_string+=' ' else: ret_string+=c return ret_string # get_avc_rule_component - grabs the next component from an avc rule. Basically, # just reads the next word or bracketed set of words. # returns - a set of the word, or words with metadata def get_avc_rule_component(file_obj): ret_dict = { 'flags': {}, 'set': set() } c = advance_past_whitespace(file_obj) if c == '~': ret_dict['flags']['complement'] = True file_obj.read(1) #move to next char c = advance_past_whitespace(file_obj) else: ret_dict['flags']['complement'] = False if not c == '{': #TODO: change operations on file to operations on string? single_type = advance_until_whitespace(file_obj) ret_dict['set'].add(single_type) else: mult_types = expand_brackets(file_obj) mult_types = mult_types.split() for t in mult_types: ret_dict['set'].add(t) return ret_dict def get_line_type(line): if re.search(r'^type\s', line): return TYPE if re.search(r'^attribute\s', line): return ATTRIBUTE if re.search(r'^typeattribute\s', line): return TYPEATTRIBUTE if re.search(r'^class\s', line): return CLASS if re.search(r'^common\s', line): return COMMON if re.search(r'^allow\s', line): return ALLOW_RULE if re.search(r'^neverallow\s', line): return NEVERALLOW_RULE else: return OTHER def is_multi_line(line_type): if line_type == CLASS: return True elif line_type == COMMON: return True elif line_type == ALLOW_RULE: return True elif line_type == NEVERALLOW_RULE: return True else: return False #should only be called with file pointing to the 'i' in 'inherits' segment def process_inherits_segment(file_obj): inherit_keyword = file_obj.read(8) if not inherit_keyword == 'inherits': #TODO: handle error, invalid class statement print "ERROR: invalid inherits statement" return else: advance_past_whitespace(file_obj) ret_inherited_common = advance_until_whitespace(file_obj) return ret_inherited_common class SELinuxPolicy: def __init__(self): self.types = set() self.attributes = { } self.classes = { } self.common_classes = { } self.allow_rules = [ ] self.neverallow_rules = [ ] # create policy directly from policy file #@classmethod def from_file_name(self, policy_file_name): self.types = set() self.attributes = { } self.classes = { } self.common_classes = { } self.allow_rules = [ ] self.neverallow_rules = [ ] with open(policy_file_name, 'r') as policy_file: line = policy_file.readline() while line: line_type = get_line_type(line) if is_multi_line(line_type): self.parse_multi_line(line, line_type, policy_file) else: self.parse_single_line(line, line_type) line = policy_file.readline() # expand_permissions - generates the actual permission set based on the listed # permissions with wildcards and the given class on which they're based. def expand_permissions(self, obj_class, permission_set): ret_set = set() neg_set = set() for p in permission_set: if p[0] == '-': real_p = p[1:] if real_p in self.classes[obj_class]: neg_set.add(real_p) else: print "ERROR: invalid permission in avc rule " + real_t + "\n" return else: if p in self.classes[obj_class]: ret_set.add(p) elif p == '*': #pretty sure this can't be negated? eg -* ret_set |= self.classes[obj_class] #All of the permissions else: print "ERROR: invalid permission in avc rule " + p + "\n" return return ret_set - neg_set # expand_types - generates the actual type set based on the listed types, # attributes, wildcards and negation. self is left as-is, and is processed # specially when generating checkAccess() 4-tuples def expand_types(self, type_set): ret_set = set() neg_set = set() for t in type_set: if t[0] == '-': real_t = t[1:] if real_t in self.attributes: neg_set |= self.attributes[real_t] elif real_t in self.types: neg_set.add(real_t) elif real_t == 'self': ret_set |= real_t else: print "ERROR: invalid type in avc rule " + real_t + "\nTYPE SET:" print type_set return else: if t in self.attributes: ret_set |= self.attributes[t] elif t in self.types: ret_set.add(t) elif t == 'self': ret_set.add(t) elif t == '*': #pretty sure this can't be negated? ret_set |= self.types #All of the types else: print "ERROR: invalid type in avc rule " + t + "\nTYPE SET" print type_set return return ret_set - neg_set def parse_multi_line(self, line, line_type, file_obj): if line_type == CLASS: self.process_class_line(line, file_obj) elif line_type == COMMON: self.process_common_line(line, file_obj) elif line_type == ALLOW_RULE: self.process_avc_rule_line(line, file_obj) elif line_type == NEVERALLOW_RULE: self.process_avc_rule_line(line, file_obj) else: print "Error: This is not a multi-line input" def parse_single_line(self, line, line_type): if line_type == TYPE: self.process_type_line(line) elif line_type == ATTRIBUTE: self.process_attribute_line(line) elif line_type == TYPEATTRIBUTE: self.process_typeattribute_line(line) return def process_attribute_line(self, line): match = re.search(r'^attribute\s+(.+);', line) if match: declared_attribute = match.group(1) self.attributes[declared_attribute] = set() else: #TODO: handle error? (no state changed) return def process_class_line(self, line, file_obj): match = re.search(r'^class\s([^\s]+)\s(.*$)', line) if match: declared_class = match.group(1) #first class declaration has no perms if not declared_class in self.classes: self.classes[declared_class] = set() return else: #need to parse file from after class name until end of '{ }'s file_obj.seek(-(len(match.group(2)) + 1), 1) c = advance_past_whitespace(file_obj) if not (c == 'i' or c == '{'): print "ERROR: invalid class statement" return elif c == 'i': #add inherited permissions inherited = process_inherits_segment(file_obj) self.classes[declared_class] |= self.common_classes[inherited] c = advance_past_whitespace(file_obj) if c == '{': permissions = expand_brackets(file_obj) permissions = re.sub(r'#[^\n]*\n','\n' , permissions) #get rid of all comments permissions = permissions.split() for p in permissions: self.classes[declared_class].add(p) def process_common_line(self, line, file_obj): match = re.search(r'^common\s([^\s]+)(.*$)', line) if match: declared_common_class = match.group(1) #TODO: common classes should only be declared once... if not declared_common_class in self.common_classes: self.common_classes[declared_common_class] = set() #need to parse file from after common_class name until end of '{ }'s file_obj.seek(-(len(match.group(2)) + 1), 1) c = advance_past_whitespace(file_obj) if not c == '{': print "ERROR: invalid common statement" return permissions = expand_brackets(file_obj) permissions = permissions.split() for p in permissions: self.common_classes[declared_common_class].add(p) return def process_avc_rule_line(self, line, file_obj): match = re.search(r'^(never)?allow\s(.*$)', line) if match: if(match.group(1)): rule_type = 'neverallow' else: rule_type = 'allow' #need to parse file from after class name until end of '{ }'s file_obj.seek(-(len(match.group(2)) + 1), 1) #grab source type(s) source_types = get_avc_rule_component(file_obj) if len(source_types['set']) == 0: print "ERROR: no source types for avc rule at line: " + line return #grab target type(s) target_types = get_avc_rule_component(file_obj) if len(target_types['set']) == 0: print "ERROR: no target types for avc rule at line: " + line return #skip ':' potentially already handled by advance_until_whitespace c = advance_past_whitespace(file_obj) if c == ':': file_obj.read(1) #grab class(es) classes = get_avc_rule_component(file_obj) if len(classes['set']) == 0: print "ERROR: no classes for avc rule at line: " + line return #grab permission(s) permissions = get_avc_rule_component(file_obj) if len(permissions['set']) == 0: print "ERROR: no permissions for avc rule at line: " + line return rule_dict = { 'source_types': source_types, 'target_types': target_types, 'classes': classes, 'permissions': permissions } if rule_type == 'allow': self.allow_rules.append(rule_dict) elif rule_type == 'neverallow': self.neverallow_rules.append(rule_dict) def process_type_line(self, line): #TODO: add support for aliases (not yet in current policy.conf) match = re.search(r'^type\s([^,]+),?(.*);', line) if match: declared_type = match.group(1) self.types.add(declared_type) if match.group(2): declared_attributes = match.group(2) declared_attributes = declared_attributes.replace(" ", "") #remove whitespace declared_attributes = declared_attributes.split(',') #separate based on delimiter for a in declared_attributes: if not a in self.attributes: #TODO: hanlde error? attribute should already exist self.attributes[a] = set() self.attributes[a].add(declared_type) else: #TODO: handle error? (no state changed) return def process_typeattribute_line(self, line): match = re.search(r'^typeattribute\s([^\s]+)\s(.*);', line) if match: declared_type = match.group(1) if not declared_type in self.types: #TODO: handle error? type should already exist self.types.add(declared_type) if match.group(2): declared_attributes = match.group(2) declared_attributes = declared_attributes.replace(" ", "") #remove whitespace declared_attributes = declared_attributes.split(',') #separate based on delimiter for a in declared_attributes: if not a in self.attributes: #TODO: hanlde error? attribute should already exist self.attributes[a] = set() self.attributes[a].add(declared_type) else: return else: #TODO: handle error? (no state changed) return