#!/usr/bin/env python3 # Copyright 2020 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. # # This script will take any number of trace files generated by strace(1) # and output a system call filtering policy suitable for use with Minijail. """Tool to generate a minijail seccomp filter from strace or audit output.""" import argparse import collections import datetime import os import re import sys # auparse may not be installed and is currently optional. try: import auparse except ImportError: auparse = None YEAR = datetime.datetime.now().year NOTICE = f"""# Copyright {YEAR} The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """ ALLOW = "1" # This ignores any leading PID tag and trailing , and extracts # the syscall name and the argument list. LINE_RE = re.compile(r"^\s*(?:\[[^]]*\]|\d+)?\s*([a-zA-Z0-9_]+)\(([^)<]*)") SOCKETCALLS = { "accept", "bind", "connect", "getpeername", "getsockname", "getsockopt", "listen", "recv", "recvfrom", "recvmsg", "send", "sendmsg", "sendto", "setsockopt", "shutdown", "socket", "socketpair", } # List of private ARM syscalls. These can be found in any ARM specific unistd.h # such as Linux's arch/arm/include/uapi/asm/unistd.h. PRIVATE_ARM_SYSCALLS = { 983041: "ARM_breakpoint", 983042: "ARM_cacheflush", 983043: "ARM_usr26", 983044: "ARM_usr32", 983045: "ARM_set_tls", } ArgInspectionEntry = collections.namedtuple( "ArgInspectionEntry", ("arg_index", "value_set") ) # pylint: disable=too-few-public-methods class BucketInputFiles(argparse.Action): """Buckets input files using simple content based heuristics. Attributes: audit_logs: Mutually exclusive list of audit log filenames. traces: Mutually exclusive list of strace log filenames. """ def __call__(self, parser, namespace, values, option_string=None): audit_logs = [] traces = [] strace_line_re = re.compile(r"[a-z]+[0-9]*\(.+\) += ") audit_line_re = re.compile(r"type=(SYSCALL|SECCOMP)") for filename in values: if not os.path.exists(filename): parser.error(f"Input file {filename} not found.") with open(filename, mode="r", encoding="utf-8") as input_file: for line in input_file.readlines(): if strace_line_re.search(line): traces.append(filename) break if audit_line_re.search(line): audit_logs.append(filename) break else: # Treat it as an strace log to retain legacy behaviour and # also just in case the strace regex is imperfect. traces.append(filename) setattr(namespace, "audit_logs", audit_logs) setattr(namespace, "traces", traces) # pylint: enable=too-few-public-methods def parse_args(argv): """Returns the parsed CLI arguments for this tool.""" parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--verbose", action="store_true", help="output informational messages to stderr", ) parser.add_argument( "--frequency", type=argparse.FileType("w"), help="frequency file" ) parser.add_argument( "--policy", type=argparse.FileType("w"), default=sys.stdout, help="policy file", ) parser.add_argument( "input-logs", action=BucketInputFiles, help="strace and/or audit logs", nargs="+", ) parser.add_argument( "--audit-comm", type=str, metavar="PROCESS_NAME", help="relevant process name from the audit.log files", ) opts = parser.parse_args(argv) if opts.audit_logs and not auparse: parser.error( "Python bindings for the audit subsystem were not found.\n" "Please install the python3-audit (sometimes python-audit)" " package for your distro to process audit logs: " f"{opts.audit_logs}" ) if opts.audit_logs and not opts.audit_comm: parser.error( f"--audit-comm is required when using audit logs as input:" f" {opts.audit_logs}" ) if not opts.audit_logs and opts.audit_comm: parser.error( "--audit-comm was specified yet none of the input files " "matched our hueristic for an audit log" ) return opts def get_seccomp_bpf_filter(syscall, entry): """Returns a minijail seccomp-bpf filter expression for the syscall.""" arg_index = entry.arg_index arg_values = entry.value_set atoms = [] if syscall in ("mmap", "mmap2", "mprotect") and arg_index == 2: # See if there is at least one instance of any of these syscalls trying # to map memory with both PROT_EXEC and PROT_WRITE. If there isn't, we # can craft a concise expression to forbid this. write_and_exec = set(("PROT_EXEC", "PROT_WRITE")) for arg_value in arg_values: if write_and_exec.issubset( set(p.strip() for p in arg_value.split("|")) ): break else: atoms.extend(["arg2 in ~PROT_EXEC", "arg2 in ~PROT_WRITE"]) arg_values = set() atoms.extend(f"arg{arg_index} == {arg_value}" for arg_value in arg_values) return " || ".join(atoms) def parse_trace_file(trace_filename, syscalls, arg_inspection): """Parses one file produced by strace.""" uses_socketcall = "i386" in trace_filename or ( "x86" in trace_filename and "64" not in trace_filename ) with open(trace_filename, encoding="utf-8") as trace_file: for line in trace_file: matches = LINE_RE.match(line) if not matches: continue syscall, args = matches.groups() if uses_socketcall and syscall in SOCKETCALLS: syscall = "socketcall" # strace omits the 'ARM_' prefix on all private ARM syscalls. Add # it manually here as a workaround. These syscalls are exclusive # to ARM so we don't need to predicate this on a trace_filename # based heuristic for the arch. if f"ARM_{syscall}" in PRIVATE_ARM_SYSCALLS.values(): syscall = f"ARM_{syscall}" syscalls[syscall] += 1 args = [arg.strip() for arg in args.split(",")] if syscall in arg_inspection: arg_value = args[arg_inspection[syscall].arg_index] arg_inspection[syscall].value_set.add(arg_value) def parse_audit_log(audit_log, audit_comm, syscalls, arg_inspection): """Parses one audit.log file generated by the Linux audit subsystem.""" unknown_syscall_re = re.compile(r"unknown-syscall\((?P\d+)\)") au = auparse.AuParser(auparse.AUSOURCE_FILE, audit_log) # Quick validity check for whether this parses as a valid audit log. The # first event should have at least one record. if not au.first_record(): raise ValueError(f"Unable to parse audit log file {audit_log.name}") # Iterate through events where _any_ contained record matches # ((type == SECCOMP || type == SYSCALL) && comm == audit_comm). au.search_add_item("type", "=", "SECCOMP", auparse.AUSEARCH_RULE_CLEAR) au.search_add_item("type", "=", "SYSCALL", auparse.AUSEARCH_RULE_OR) au.search_add_item( "comm", "=", f'"{audit_comm}"', auparse.AUSEARCH_RULE_AND ) # auparse_find_field(3) will ignore preceding fields in the record and # at the same time happily cross record boundaries when looking for the # field. This helper method always seeks the cursor back to the first # field in the record and stops searching before crossing over to the # next record; making the search far less error prone. # Also implicitly seeks the internal 'cursor' to the matching field # for any subsequent calls like auparse_interpret_field. def _find_field_in_current_record(name): au.first_field() while True: if au.get_field_name() == name: return au.get_field_str() if not au.next_field(): return None while au.search_next_event(): # The event may have multiple records. Loop through all. au.first_record() for _ in range(au.get_num_records()): event_type = _find_field_in_current_record("type") comm = _find_field_in_current_record("comm") # Some of the records in this event may not be relevant # despite the event-specific search filter. Skip those. if ( event_type not in ("SECCOMP", "SYSCALL") or comm != f'"{audit_comm}"' ): au.next_record() continue if not _find_field_in_current_record("syscall"): raise ValueError( f'Could not find field "syscall" in event of ' f"type {event_type}" ) # Intepret the syscall field that's under our 'cursor' following the # find. Interpreting fields yields human friendly names instead # of integers. E.g '16' -> 'ioctl'. syscall = au.interpret_field() # TODO(crbug/1172449): Add these syscalls to upstream # audit-userspace and remove this workaround. # This is redundant but safe for non-ARM architectures due to the # disjoint set of private syscall numbers. match = unknown_syscall_re.match(syscall) if match: syscall_num = int(match.group("syscall_num")) syscall = PRIVATE_ARM_SYSCALLS.get(syscall_num, syscall) if (syscall in arg_inspection and event_type == "SECCOMP") or ( syscall not in arg_inspection and event_type == "SYSCALL" ): # Skip SECCOMP records for syscalls that require argument # inspection. Similarly, skip SYSCALL records for syscalls # that do not require argument inspection. Technically such # records wouldn't exist per our setup instructions but audit # sometimes lets a few records slip through. au.next_record() continue elif event_type == "SYSCALL": arg_field_name = f"a{arg_inspection[syscall].arg_index}" if not _find_field_in_current_record(arg_field_name): raise ValueError( f'Could not find field "{arg_field_name}"' f"in event of type {event_type}" ) # Intepret the arg field that's under our 'cursor' following the # find. This may yield a more human friendly name. # E.g '5401' -> 'TCGETS'. arg_inspection[syscall].value_set.add(au.interpret_field()) syscalls[syscall] += 1 au.next_record() def main(argv=None): """Main entrypoint.""" if argv is None: argv = sys.argv[1:] opts = parse_args(argv) syscalls = collections.defaultdict(int) arg_inspection = { "socket": ArgInspectionEntry(0, set([])), # int domain "ioctl": ArgInspectionEntry(1, set([])), # int request "prctl": ArgInspectionEntry(0, set([])), # int option "mmap": ArgInspectionEntry(2, set([])), # int prot "mmap2": ArgInspectionEntry(2, set([])), # int prot "mprotect": ArgInspectionEntry(2, set([])), # int prot } if opts.verbose: # Print an informational message to stderr in case the filetype # detection heuristics are wonky. print( "Generating a seccomp policy using these input files:", file=sys.stderr, ) print(f"Strace logs: {opts.traces}", file=sys.stderr) print(f"Audit logs: {opts.audit_logs}", file=sys.stderr) for trace_filename in opts.traces: parse_trace_file(trace_filename, syscalls, arg_inspection) for audit_log in opts.audit_logs: parse_audit_log(audit_log, opts.audit_comm, syscalls, arg_inspection) # Add the basic set if they are not yet present. basic_set = [ "restart_syscall", "exit", "exit_group", "rt_sigreturn", ] for basic_syscall in basic_set: if basic_syscall not in syscalls: syscalls[basic_syscall] = 1 # If a frequency file isn't used then sort the syscalls based on frequency # to make the common case fast (by checking frequent calls earlier). # Otherwise, sort alphabetically to make it easier for humans to see which # calls are in use (and if necessary manually add a new syscall to the # list). if opts.frequency is None: sorted_syscalls = list( x[0] for x in sorted( syscalls.items(), key=lambda pair: pair[1], reverse=True ) ) else: sorted_syscalls = list( x[0] for x in sorted(syscalls.items(), key=lambda pair: pair[0]) ) print(NOTICE, file=opts.policy) if opts.frequency is not None: print(NOTICE, file=opts.frequency) for syscall in sorted_syscalls: if syscall in arg_inspection: arg_filter = get_seccomp_bpf_filter( syscall, arg_inspection[syscall] ) else: arg_filter = ALLOW print(f"{syscall}: {arg_filter}", file=opts.policy) if opts.frequency is not None: print(f"{syscall}: {syscalls[syscall]}", file=opts.frequency) if __name__ == "__main__": sys.exit(main(sys.argv[1:]))