• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4#
5# Copyright (c) 2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import sys
20import argparse
21import audit_log_analysis as audit_policy
22import generate_code_from_policy as gen_policy
23
24
25class MergePolicy:
26    def __init__(self):
27        self.cur_parse_item = ''
28        self.arches = set()
29        self.seccomp_policy_param = dict()
30
31    def update_parse_item(self, line):
32        item = line[1:]
33        if item in gen_policy.supported_parse_item:
34            self.cur_parse_item = item
35            print('start deal with {}'.format(self.cur_parse_item))
36
37    def parse_line(self, line):
38        if not self.cur_parse_item :
39            return
40        line = line.replace(' ', '')
41        pos = line.rfind(';')
42        if pos < 0:
43            for arch in self.arches:
44                self.seccomp_policy_param.get(arch).value_function.get(self.cur_parse_item)(line)
45        else:
46            arches = line[pos + 1:].split(',')
47            if arches[0] == 'all':
48                arches = gen_policy.supported_architecture
49            for arch in arches:
50                self.seccomp_policy_param.get(arch).value_function.get(self.cur_parse_item)(line[:pos])
51
52    @staticmethod
53    def get_item_content(name_nr_table, item_str, itme_dict):
54        syscall_name_dict = {}
55        flag = False
56        for arch in gen_policy.supported_architecture:
57            func_name_to_nr = dict()
58            for item in itme_dict.get(arch):
59                if ':' in item:
60                    func_name = item[:item.find(':')].strip()
61                else:
62                    func_name = item
63                func_name_to_nr.update({item: name_nr_table.get(arch).get(func_name)})
64            func_name_to_nr_list = sorted(func_name_to_nr.items(), key=lambda x : x[1])
65
66            syscall_name_dict.update({arch: func_name_to_nr_list})
67        for arch in gen_policy.supported_architecture:
68            if syscall_name_dict.get(arch):
69                flag = True
70        if not flag:
71            return ''
72        content = '{}\n'.format(item_str)
73
74        for func_name, _ in syscall_name_dict.get('arm64'):
75            flag = False
76            for func_name_arm, nr_arm in syscall_name_dict.get('arm'):
77                if func_name == func_name_arm:
78                    content = '{}{};all\n'.format(content, func_name)
79                    syscall_name_dict.get('arm').remove((func_name, nr_arm))
80                    flag = True
81                    break
82            if not flag:
83                content = '{}{};arm64\n'.format(content, func_name)
84        if (syscall_name_dict.get('arm')):
85            content = '{}{};arm\n'.format(content, ';arm\n'.join(
86                      [func_name for func_name, _ in syscall_name_dict.get('arm')]))
87
88        return content
89
90    def parse_open_file(self, fp):
91        for line in fp:
92            line = line.strip()
93            if not line:
94                continue
95            if line[0] == '#':
96                continue
97            if line[0] == '@':
98                self.update_parse_item(line)
99                continue
100            if line[0] != '@' and self.cur_parse_item == '':
101                continue
102            self.parse_line(line)
103
104    def parse_file(self, file_path):
105        with open(file_path) as fp:
106            self.parse_open_file(fp)
107
108    def merge_policy(self, args):
109        function_name_nr_table_dict = {}
110        for file_name in args.src_files:
111            file_name_tmp = file_name.split('/')[-1]
112            if not file_name_tmp.lower().startswith('libsyscall_to_nr_'):
113                continue
114            gen_policy.gen_syscall_nr_table(file_name, function_name_nr_table_dict)
115
116        for arch in gen_policy.supported_architecture:
117            self.seccomp_policy_param.update(\
118                {arch: gen_policy.SeccompPolicyParam(arch, function_name_nr_table_dict.get(arch))})
119
120        for file_name in args.src_files:
121            if file_name.lower().endswith('.policy'):
122                self.parse_file(file_name)
123
124        dict_priority = dict()
125        dict_allow_list = dict()
126        dict_priority_with_args = dict()
127        dict_allow_list_with_args = dict()
128        dict_blocklist = dict()
129
130        for arch in gen_policy.supported_architecture:
131            dict_priority.update({arch: self.seccomp_policy_param.get(arch).priority})
132            dict_allow_list.update({arch: self.seccomp_policy_param.get(arch).allow_list})
133            dict_priority_with_args.update({arch: self.seccomp_policy_param.get(arch).priority_with_args})
134            dict_allow_list_with_args.update({arch: self.seccomp_policy_param.get(arch).allow_list_with_args})
135            dict_blocklist.update({arch: self.seccomp_policy_param.get(arch).blocklist})
136
137        content = self.get_item_content(function_name_nr_table_dict, "@priority", dict_priority)
138        content += self.get_item_content(function_name_nr_table_dict, "@allowList", dict_allow_list)
139        content += self.get_item_content(function_name_nr_table_dict, "@priorityWithArgs", dict_priority_with_args)
140        content += self.get_item_content(function_name_nr_table_dict, "@allowListWithArgs", dict_allow_list_with_args)
141        content += self.get_item_content(function_name_nr_table_dict, "@blockList", dict_blocklist)
142        audit_policy.gen_output_file(args.filter_name, content)
143
144
145def main():
146    parser = argparse.ArgumentParser(
147      description='Generates a seccomp-bpf policy')
148    parser.add_argument('--src-files', type=str, action='append',
149                        help=('input libsyscall_to_nr files and policy filse\n'))
150
151    parser.add_argument('--filter-name',  type=str,
152                        help='Name of seccomp bpf array generated by this script')
153
154    args = parser.parse_args()
155
156    generator = MergePolicy()
157    generator.merge_policy(args)
158
159
160if __name__ == '__main__':
161    sys.exit(main())
162