• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2
3import argparse
4import logging
5import operator
6import os
7import re
8import sys
9import textwrap
10
11from gensyscalls import SysCallsTxtParser
12
13
14BPF_JGE = "BPF_JUMP(BPF_JMP|BPF_JGE|BPF_K, {0}, {1}, {2})"
15BPF_JEQ = "BPF_JUMP(BPF_JMP|BPF_JEQ|BPF_K, {0}, {1}, {2})"
16BPF_ALLOW = "BPF_STMT(BPF_RET|BPF_K, SECCOMP_RET_ALLOW)"
17
18
19class SyscallRange:
20  def __init__(self, name, value):
21    self.names = [name]
22    self.begin = value
23    self.end = self.begin + 1
24
25  def __str__(self):
26    return "(%s, %s, %s)" % (self.begin, self.end, self.names)
27
28  def add(self, name, value):
29    if value != self.end:
30      raise ValueError
31    self.end += 1
32    self.names.append(name)
33
34
35def load_syscall_names_from_file(file_path, architecture):
36  parser = SysCallsTxtParser()
37  parser.parse_open_file(open(file_path))
38  return {x["name"] for x in parser.syscalls if x.get(architecture)}
39
40
41def load_syscall_priorities_from_file(file_path):
42  format_re = re.compile(r'^\s*([A-Za-z_][A-Za-z0-9_]+)\s*$')
43  priorities = []
44  with open(file_path) as priority_file:
45    for line in priority_file:
46      match = format_re.match(line)
47      if match is None:
48        continue
49      try:
50        name = match.group(1)
51        priorities.append(name)
52      except IndexError:
53        # TODO: This should be impossible becauase it wouldn't have matched?
54        logging.exception('Failed to parse %s from %s', line, file_path)
55
56  return priorities
57
58
59def merge_names(base_names, allowlist_names, blocklist_names):
60  if bool(blocklist_names - base_names):
61    raise RuntimeError("blocklist item not in bionic - aborting " + str(
62        blocklist_names - base_names))
63
64  return (base_names - blocklist_names) | allowlist_names
65
66
67def extract_priority_syscalls(syscalls, priorities):
68  # Extract syscalls that are not in the priority list
69  other_syscalls = \
70    [syscall for syscall in syscalls if syscall[0] not in priorities]
71  # For prioritized syscalls, keep the order in which they appear in th
72  # priority list
73  syscall_dict = {syscall[0]: syscall[1] for syscall in syscalls}
74  priority_syscalls = []
75  for name in priorities:
76    if name in syscall_dict.keys():
77      priority_syscalls.append((name, syscall_dict[name]))
78  return priority_syscalls, other_syscalls
79
80
81def parse_syscall_NRs(names_path):
82  # The input is now the preprocessed source file. This will contain a lot
83  # of junk from the preprocessor, but our lines will be in the format:
84  #
85  #    #define __(ARM_)?NR_${NAME} ${VALUE}
86  #
87  # Where ${VALUE} is a preprocessor expression.
88  #
89  # Newer architectures have things like this though:
90  #
91  #    #define __NR3264_fcntl 25
92  #    #define __NR_fcntl __NR3264_fcntl
93  #
94  # So we need to keep track of the __NR3264_* constants and substitute them.
95
96  line_re = re.compile(r'^# \d+ ".*".*')
97  undef_re = re.compile(r'^#undef\s.*')
98  define_re = re.compile(r'^\s*#define\s+([A-Za-z0-9_(,)]+)(?:\s+(.+))?\s*$')
99  token_re = re.compile(r'\b[A-Za-z_][A-Za-z0-9_]+\b')
100  constants = {}
101  nr3264s = {}
102  with open(names_path) as f:
103    for line in f:
104      line = line.strip()
105      m = define_re.match(line)
106      if m:
107        name = m.group(1)
108        value = m.group(2)
109        if name.startswith('__NR3264'):
110          nr3264s[name] = value
111        elif name.startswith('__NR_') or name.startswith('__ARM_NR_'):
112          if value in nr3264s:
113            value = nr3264s[value]
114          # eval() takes care of any arithmetic that may be done
115          value = eval(token_re.sub(lambda x: str(constants[x.group(0)]), value))
116
117          constants[name] = value
118      else:
119        if not line_re.match(line) and not undef_re.match(line) and line:
120          print('%s: failed to parse line `%s`' % (names_path, line))
121          sys.exit(1)
122
123  syscalls = {}
124  for name, value in constants.items():
125    # Remove the __NR_ prefix.
126    # TODO: why not __ARM_NR too?
127    if name.startswith("__NR_"):
128      name = name[len("__NR_"):]
129    syscalls[name] = value
130
131  return syscalls
132
133
134def convert_NRs_to_ranges(syscalls):
135  # Sort the values so we convert to ranges and binary chop
136  syscalls = sorted(syscalls, key=operator.itemgetter(1))
137
138  # Turn into a list of ranges. Keep the names for the comments
139  ranges = []
140  for name, value in syscalls:
141    if not ranges:
142      ranges.append(SyscallRange(name, value))
143      continue
144
145    last_range = ranges[-1]
146    if last_range.end == value:
147      last_range.add(name, value)
148    else:
149      ranges.append(SyscallRange(name, value))
150  return ranges
151
152
153# Converts the sorted ranges of allowed syscalls to a binary tree bpf
154# For a single range, output a simple jump to {fail} or {allow}. We can't set
155# the jump ranges yet, since we don't know the size of the filter, so use a
156# placeholder
157# For multiple ranges, split into two, convert the two halves and output a jump
158# to the correct half
159def convert_to_intermediate_bpf(ranges):
160  if len(ranges) == 1:
161    # We will replace {fail} and {allow} with appropriate range jumps later
162    return [BPF_JGE.format(ranges[0].end, "{fail}", "{allow}") +
163            ", //" + "|".join(ranges[0].names)]
164
165  half = (len(ranges) + 1) // 2
166  first = convert_to_intermediate_bpf(ranges[:half])
167  second = convert_to_intermediate_bpf(ranges[half:])
168  jump = [BPF_JGE.format(ranges[half].begin, len(first), 0) + ","]
169  return jump + first + second
170
171
172# Converts the prioritized syscalls to a bpf list that  is prepended to the
173# tree generated by convert_to_intermediate_bpf(). If we hit one of these
174# syscalls, shortcut to the allow statement at the bottom of the tree
175# immediately
176def convert_priority_to_intermediate_bpf(priority_syscalls):
177  result = []
178  for syscall in priority_syscalls:
179    result.append(BPF_JEQ.format(syscall[1], "{allow}", 0) +
180                  ", //" + syscall[0])
181  return result
182
183
184def convert_ranges_to_bpf(ranges, priority_syscalls):
185  bpf = convert_priority_to_intermediate_bpf(priority_syscalls) + \
186    convert_to_intermediate_bpf(ranges)
187
188  # Now we know the size of the tree, we can substitute the {fail} and {allow}
189  # placeholders
190  for i, statement in enumerate(bpf):
191    # Replace placeholder with
192    # "distance to jump to fail, distance to jump to allow"
193    # We will add a kill statement and an allow statement after the tree
194    # With bpfs jmp 0 means the next statement, so the distance to the end is
195    # len(bpf) - i - 1, which is where we will put the kill statement, and
196    # then the statement after that is the allow statement
197    bpf[i] = statement.format(fail=str(len(bpf) - i),
198                              allow=str(len(bpf) - i - 1))
199
200  # Add the allow calls at the end. If the syscall is not matched, we will
201  # continue. This allows the user to choose to match further syscalls, and
202  # also to choose the action when we want to block
203  bpf.append(BPF_ALLOW + ",")
204
205  # Add check that we aren't off the bottom of the syscalls
206  bpf.insert(0, BPF_JGE.format(ranges[0].begin, 0, str(len(bpf))) + ',')
207  return bpf
208
209
210def convert_bpf_to_output(bpf, architecture, name_modifier):
211  if name_modifier:
212    name_modifier = name_modifier + "_"
213  else:
214    name_modifier = ""
215  header = textwrap.dedent("""\
216    // File autogenerated by {self_path} - edit at your peril!!
217
218    #include <linux/filter.h>
219    #include <errno.h>
220
221    #include "seccomp/seccomp_bpfs.h"
222    const sock_filter {architecture}_{suffix}filter[] = {{
223    """).format(self_path=os.path.basename(__file__), architecture=architecture,
224                suffix=name_modifier)
225
226  footer = textwrap.dedent("""\
227
228    }};
229
230    const size_t {architecture}_{suffix}filter_size = sizeof({architecture}_{suffix}filter) / sizeof(struct sock_filter);
231    """).format(architecture=architecture,suffix=name_modifier)
232  return header + "\n".join(bpf) + footer
233
234
235def construct_bpf(syscalls, architecture, name_modifier, priorities):
236  priority_syscalls, other_syscalls = \
237    extract_priority_syscalls(syscalls, priorities)
238  ranges = convert_NRs_to_ranges(other_syscalls)
239  bpf = convert_ranges_to_bpf(ranges, priority_syscalls)
240  return convert_bpf_to_output(bpf, architecture, name_modifier)
241
242
243def gen_policy(name_modifier, out_dir, base_syscall_file, syscall_files,
244               syscall_NRs, priority_file):
245  for arch in syscall_NRs.keys():
246    base_names = load_syscall_names_from_file(base_syscall_file, arch)
247    allowlist_names = set()
248    blocklist_names = set()
249    for f in syscall_files:
250      if "blocklist" in f.lower():
251        blocklist_names |= load_syscall_names_from_file(f, arch)
252      else:
253        allowlist_names |= load_syscall_names_from_file(f, arch)
254    priorities = []
255    if priority_file:
256      priorities = load_syscall_priorities_from_file(priority_file)
257
258    allowed_syscalls = []
259    for name in sorted(merge_names(base_names, allowlist_names, blocklist_names)):
260      try:
261        allowed_syscalls.append((name, syscall_NRs[arch][name]))
262      except:
263        logging.exception("Failed to find %s in %s (%s)", name, arch, syscall_NRs[arch])
264        raise
265    output = construct_bpf(allowed_syscalls, arch, name_modifier, priorities)
266
267    # And output policy
268    filename_modifier = "_" + name_modifier if name_modifier else ""
269    output_path = os.path.join(out_dir,
270                               "{}{}_policy.cpp".format(arch, filename_modifier))
271    with open(output_path, "w") as output_file:
272      output_file.write(output)
273
274
275def main():
276  parser = argparse.ArgumentParser(
277      description="Generates a seccomp-bpf policy")
278  parser.add_argument("--verbose", "-v", help="Enables verbose logging.")
279  parser.add_argument("--name-modifier",
280                      help=("Specifies the name modifier for the policy. "
281                            "One of {app,system}."))
282  parser.add_argument("--out-dir",
283                      help="The output directory for the policy files")
284  parser.add_argument("base_file", metavar="base-file", type=str,
285                      help="The path of the base syscall list (SYSCALLS.TXT).")
286  parser.add_argument("files", metavar="FILE", type=str, nargs="+",
287                      help=("The path of the input files. In order to "
288                            "simplify the build rules, it can take any of the "
289                            "following files: \n"
290                            "* /blocklist.*\\.txt$/ syscall blocklist.\n"
291                            "* /allowlist.*\\.txt$/ syscall allowlist.\n"
292                            "* /priority.txt$/ priorities for bpf rules.\n"
293                            "* otherwise, syscall name-number mapping.\n"))
294  args = parser.parse_args()
295
296  if args.verbose:
297    logging.basicConfig(level=logging.DEBUG)
298  else:
299    logging.basicConfig(level=logging.INFO)
300
301  syscall_files = []
302  priority_file = None
303  syscall_NRs = {}
304  for filename in args.files:
305    if filename.lower().endswith('.txt'):
306      if filename.lower().endswith('priority.txt'):
307        priority_file = filename
308      else:
309        syscall_files.append(filename)
310    else:
311      m = re.search(r"libseccomp_gen_syscall_nrs_([^/]+)", filename)
312      syscall_NRs[m.group(1)] = parse_syscall_NRs(filename)
313
314  gen_policy(name_modifier=args.name_modifier, out_dir=args.out_dir,
315             syscall_NRs=syscall_NRs, base_syscall_file=args.base_file,
316             syscall_files=syscall_files, priority_file=priority_file)
317
318
319if __name__ == "__main__":
320  main()
321