1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# 5# Copyright (c) 2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import sys 20import argparse 21import audit_log_analysis as audit_policy 22import generate_code_from_policy as gen_policy 23 24 25class MergePolicy: 26 def __init__(self): 27 self.cur_parse_item = '' 28 self.arches = set() 29 self.seccomp_policy_param = dict() 30 31 def update_parse_item(self, line): 32 item = line[1:] 33 if item in gen_policy.supported_parse_item: 34 self.cur_parse_item = item 35 print('start deal with {}'.format(self.cur_parse_item)) 36 37 def parse_line(self, line): 38 if not self.cur_parse_item : 39 return 40 line = line.replace(' ', '') 41 pos = line.rfind(';') 42 if pos < 0: 43 for arch in self.arches: 44 self.seccomp_policy_param.get(arch).value_function.get(self.cur_parse_item)(line) 45 else: 46 arches = line[pos + 1:].split(',') 47 if arches[0] == 'all': 48 arches = gen_policy.supported_architecture 49 for arch in arches: 50 self.seccomp_policy_param.get(arch).value_function.get(self.cur_parse_item)(line[:pos]) 51 52 @staticmethod 53 def get_item_content(name_nr_table, item_str, itme_dict): 54 syscall_name_dict = {} 55 flag = False 56 for arch in gen_policy.supported_architecture: 57 func_name_to_nr = dict() 58 for item in itme_dict.get(arch): 59 if ':' in item: 60 func_name = item[:item.find(':')].strip() 61 else: 62 func_name = item 63 func_name_to_nr.update({item: name_nr_table.get(arch).get(func_name)}) 64 func_name_to_nr_list = sorted(func_name_to_nr.items(), key=lambda x : x[1]) 65 66 syscall_name_dict.update({arch: func_name_to_nr_list}) 67 for arch in gen_policy.supported_architecture: 68 if syscall_name_dict.get(arch): 69 flag = True 70 if not flag: 71 return '' 72 content = '{}\n'.format(item_str) 73 74 for func_name, _ in syscall_name_dict.get('arm64'): 75 flag = False 76 for func_name_arm, nr_arm in syscall_name_dict.get('arm'): 77 if func_name == func_name_arm: 78 content = '{}{};all\n'.format(content, func_name) 79 syscall_name_dict.get('arm').remove((func_name, nr_arm)) 80 flag = True 81 break 82 if not flag: 83 content = '{}{};arm64\n'.format(content, func_name) 84 if (syscall_name_dict.get('arm')): 85 content = '{}{};arm\n'.format(content, ';arm\n'.join( 86 [func_name for func_name, _ in syscall_name_dict.get('arm')])) 87 88 return content 89 90 def parse_open_file(self, fp): 91 for line in fp: 92 line = line.strip() 93 if not line: 94 continue 95 if line[0] == '#': 96 continue 97 if line[0] == '@': 98 self.update_parse_item(line) 99 continue 100 if line[0] != '@' and self.cur_parse_item == '': 101 continue 102 self.parse_line(line) 103 104 def parse_file(self, file_path): 105 with open(file_path) as fp: 106 self.parse_open_file(fp) 107 108 def merge_policy(self, args): 109 function_name_nr_table_dict = {} 110 for file_name in args.src_files: 111 file_name_tmp = file_name.split('/')[-1] 112 if not file_name_tmp.lower().startswith('libsyscall_to_nr_'): 113 continue 114 gen_policy.gen_syscall_nr_table(file_name, function_name_nr_table_dict) 115 116 for arch in gen_policy.supported_architecture: 117 self.seccomp_policy_param.update(\ 118 {arch: gen_policy.SeccompPolicyParam(arch, function_name_nr_table_dict.get(arch))}) 119 120 for file_name in args.src_files: 121 if file_name.lower().endswith('.policy'): 122 self.parse_file(file_name) 123 124 dict_priority = dict() 125 dict_allow_list = dict() 126 dict_priority_with_args = dict() 127 dict_allow_list_with_args = dict() 128 dict_blocklist = dict() 129 130 for arch in gen_policy.supported_architecture: 131 dict_priority.update({arch: self.seccomp_policy_param.get(arch).priority}) 132 dict_allow_list.update({arch: self.seccomp_policy_param.get(arch).allow_list}) 133 dict_priority_with_args.update({arch: self.seccomp_policy_param.get(arch).priority_with_args}) 134 dict_allow_list_with_args.update({arch: self.seccomp_policy_param.get(arch).allow_list_with_args}) 135 dict_blocklist.update({arch: self.seccomp_policy_param.get(arch).blocklist}) 136 137 content = self.get_item_content(function_name_nr_table_dict, "@priority", dict_priority) 138 content += self.get_item_content(function_name_nr_table_dict, "@allowList", dict_allow_list) 139 content += self.get_item_content(function_name_nr_table_dict, "@priorityWithArgs", dict_priority_with_args) 140 content += self.get_item_content(function_name_nr_table_dict, "@allowListWithArgs", dict_allow_list_with_args) 141 content += self.get_item_content(function_name_nr_table_dict, "@blockList", dict_blocklist) 142 audit_policy.gen_output_file(args.filter_name, content) 143 144 145def main(): 146 parser = argparse.ArgumentParser( 147 description='Generates a seccomp-bpf policy') 148 parser.add_argument('--src-files', type=str, action='append', 149 help=('input libsyscall_to_nr files and policy filse\n')) 150 151 parser.add_argument('--filter-name', type=str, 152 help='Name of seccomp bpf array generated by this script') 153 154 args = parser.parse_args() 155 156 generator = MergePolicy() 157 generator.merge_policy(args) 158 159 160if __name__ == '__main__': 161 sys.exit(main()) 162