#!/usr/bin/env python3 # Copyright 2020 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Unittests for the parser module.""" import os import parser # pylint: disable=wrong-import-order import shutil import tempfile import unittest from importlib import resources import arch import bpf ARCH_64 = arch.Arch.load_from_json_bytes( resources.files("testdata").joinpath("arch_64.json").read_bytes() ) class TokenizerTests(unittest.TestCase): """Tests for ParserState.tokenize.""" @staticmethod def _tokenize(line): parser_state = parser.ParserState("") return list(parser_state.tokenize([line]))[0] def test_tokenize(self): """Accept valid tokens.""" self.assertEqual( [ (token.type, token.value) for token in TokenizerTests._tokenize( "@include /minijail.policy" ) ], [ ("INCLUDE", "@include"), ("PATH", "/minijail.policy"), ], ) self.assertEqual( [ (token.type, token.value) for token in TokenizerTests._tokenize( "@include ./minijail.policy" ) ], [ ("INCLUDE", "@include"), ("PATH", "./minijail.policy"), ], ) self.assertEqual( [ (token.type, token.value) for token in TokenizerTests._tokenize( "read: arg0 in ~0xffff || arg0 & (1|2) && arg0 == 0755; " "return ENOSYS # ignored" ) ], [ ("IDENTIFIER", "read"), ("COLON", ":"), ("ARGUMENT", "arg0"), ("OP", "in"), ("BITWISE_COMPLEMENT", "~"), ("NUMERIC_CONSTANT", "0xffff"), ("OR", "||"), ("ARGUMENT", "arg0"), ("OP", "&"), ("LPAREN", "("), ("NUMERIC_CONSTANT", "1"), ("BITWISE_OR", "|"), ("NUMERIC_CONSTANT", "2"), ("RPAREN", ")"), ("AND", "&&"), ("ARGUMENT", "arg0"), ("OP", "=="), ("NUMERIC_CONSTANT", "0755"), ("SEMICOLON", ";"), ("RETURN", "return"), ("IDENTIFIER", "ENOSYS"), ], ) # Ensure that tokens that have an otherwise valid token as prefix are # still matched correctly. self.assertEqual( [ (token.type, token.value) for token in TokenizerTests._tokenize( "inotify_wait return_sys killall trace_sys" ) ], [ ("IDENTIFIER", "inotify_wait"), ("IDENTIFIER", "return_sys"), ("IDENTIFIER", "killall"), ("IDENTIFIER", "trace_sys"), ], ) def test_tokenize_invalid_token(self): """Reject tokenizer errors.""" with self.assertRaisesRegex( parser.ParseException, ( r"\(1:1\): invalid token\n" r" %invalid-token%\n" r" \^" ), ): TokenizerTests._tokenize("%invalid-token%") class ParseConstantTests(unittest.TestCase): """Tests for PolicyParser.parse_value.""" def setUp(self): self.arch = ARCH_64 self.parser = parser.PolicyParser( self.arch, kill_action=bpf.KillProcess() ) def _tokenize(self, line): # pylint: disable=protected-access return list(self.parser._parser_state.tokenize([line]))[0] def test_parse_constant_unsigned(self): """Accept reasonably-sized unsigned constants.""" self.assertEqual( self.parser.parse_value(self._tokenize("0x80000000")), 0x80000000 ) if self.arch.bits == 64: self.assertEqual( self.parser.parse_value(self._tokenize("0x8000000000000000")), 0x8000000000000000, ) def test_parse_constant_unsigned_too_big(self): """Reject unreasonably-sized unsigned constants.""" if self.arch.bits == 32: with self.assertRaisesRegex( parser.ParseException, "unsigned overflow" ): self.parser.parse_value(self._tokenize("0x100000000")) with self.assertRaisesRegex(parser.ParseException, "unsigned overflow"): self.parser.parse_value(self._tokenize("0x10000000000000000")) def test_parse_constant_signed(self): """Accept reasonably-sized signed constants.""" self.assertEqual( self.parser.parse_value(self._tokenize("-1")), self.arch.max_unsigned, ) def test_parse_constant_signed_too_negative(self): """Reject unreasonably-sized signed constants.""" if self.arch.bits == 32: with self.assertRaisesRegex( parser.ParseException, "signed underflow" ): self.parser.parse_value(self._tokenize("-0x800000001")) with self.assertRaisesRegex(parser.ParseException, "signed underflow"): self.parser.parse_value(self._tokenize("-0x8000000000000001")) def test_parse_mask(self): """Accept parsing a mask value.""" self.assertEqual( self.parser.parse_value(self._tokenize("0x1|0x2|0x4|0x8")), 0xF ) def test_parse_parenthesized_expressions(self): """Accept parsing parenthesized expressions.""" bad_expressions = [ "(1", "|(1)", "(1)|", "()", "(", "((", "(()", "(()1", ] for expression in bad_expressions: with self.assertRaises(parser.ParseException, msg=expression): self.parser.parse_value(self._tokenize(expression)) bad_partial_expressions = [ "1)", "(1)1", "1(0)", ] for expression in bad_partial_expressions: tokens = self._tokenize(expression) self.parser.parse_value(tokens) self.assertNotEqual(tokens, []) good_expressions = [ "(3)", "(1)|2", "1|(2)", "(1)|(2)", "((3))", "0|(1|2)", "(0|1|2)", ] for expression in good_expressions: self.assertEqual( self.parser.parse_value(self._tokenize(expression)), 3 ) def test_parse_constant_complements(self): """Accept complementing constants.""" self.assertEqual( self.parser.parse_value(self._tokenize("~0")), self.arch.max_unsigned, ) self.assertEqual( self.parser.parse_value(self._tokenize("~0|~0")), self.arch.max_unsigned, ) if self.arch.bits == 32: self.assertEqual( self.parser.parse_value( self._tokenize("~0x005AF0FF|~0xFFA50FFF") ), 0xFFFFFF00, ) self.assertEqual( self.parser.parse_value( self._tokenize("0x0F|~(0x005AF000|0x00A50FFF)|0xF0") ), 0xFF0000FF, ) else: self.assertEqual( self.parser.parse_value( self._tokenize("~0x00005A5AF0F0FFFF|~0xFFFFA5A50F0FFFFF") ), 0xFFFFFFFFFFFF0000, ) self.assertEqual( self.parser.parse_value( self._tokenize( "0x00FF|~(0x00005A5AF0F00000|0x0000A5A50F0FFFFF)|0xFF00" ) ), 0xFFFF00000000FFFF, ) def test_parse_double_complement(self): """Reject double-complementing constants.""" with self.assertRaisesRegex(parser.ParseException, "double complement"): self.parser.parse_value(self._tokenize("~~0")) def test_parse_empty_complement(self): """Reject complementing nothing.""" with self.assertRaisesRegex(parser.ParseException, "empty complement"): self.parser.parse_value(self._tokenize("0|~")) def test_parse_named_constant(self): """Accept parsing a named constant.""" self.assertEqual(self.parser.parse_value(self._tokenize("O_RDONLY")), 0) def test_parse_empty_constant(self): """Reject parsing nothing.""" with self.assertRaisesRegex(parser.ParseException, "empty constant"): self.parser.parse_value([]) with self.assertRaisesRegex(parser.ParseException, "empty constant"): self.parser.parse_value(self._tokenize("0|")) def test_parse_invalid_constant(self): """Reject parsing invalid constants.""" with self.assertRaisesRegex(parser.ParseException, "invalid constant"): self.parser.parse_value(self._tokenize("foo")) class ParseFilterExpressionTests(unittest.TestCase): """Tests for PolicyParser.parse_argument_expression.""" def setUp(self): self.arch = ARCH_64 self.parser = parser.PolicyParser( self.arch, kill_action=bpf.KillProcess() ) def _tokenize(self, line): # pylint: disable=protected-access return list(self.parser._parser_state.tokenize([line]))[0] def test_parse_argument_expression(self): """Accept valid argument expressions.""" self.assertEqual( self.parser.parse_argument_expression( self._tokenize( "arg0 in 0xffff || arg0 == PROT_EXEC && arg1 == PROT_WRITE" ) ), [ [parser.Atom(0, "in", 0xFFFF)], [parser.Atom(0, "==", 4), parser.Atom(1, "==", 2)], ], ) def test_parse_number_argument_expression(self): """Accept argument expressions with any octal/decimal/hex number.""" # 4607 == 010777 == 0x11ff self.assertEqual( self.parser.parse_argument_expression( self._tokenize("arg0 in 4607") ), [ [parser.Atom(0, "in", 4607)], ], ) self.assertEqual( self.parser.parse_argument_expression( self._tokenize("arg0 in 010777") ), [ [parser.Atom(0, "in", 4607)], ], ) self.assertEqual( self.parser.parse_argument_expression( self._tokenize("arg0 in 0x11ff") ), [ [parser.Atom(0, "in", 4607)], ], ) def test_parse_empty_argument_expression(self): """Reject empty argument expressions.""" with self.assertRaisesRegex( parser.ParseException, "empty argument expression" ): self.parser.parse_argument_expression( self._tokenize("arg0 in 0xffff ||") ) def test_parse_empty_clause(self): """Reject empty clause.""" with self.assertRaisesRegex(parser.ParseException, "empty clause"): self.parser.parse_argument_expression( self._tokenize("arg0 in 0xffff &&") ) def test_parse_invalid_argument(self): """Reject invalid argument.""" with self.assertRaisesRegex(parser.ParseException, "invalid argument"): self.parser.parse_argument_expression( self._tokenize("argX in 0xffff") ) def test_parse_invalid_operator(self): """Reject invalid operator.""" with self.assertRaisesRegex(parser.ParseException, "invalid operator"): self.parser.parse_argument_expression( self._tokenize("arg0 = 0xffff") ) def test_parse_missing_operator(self): """Reject missing operator.""" with self.assertRaisesRegex(parser.ParseException, "missing operator"): self.parser.parse_argument_expression(self._tokenize("arg0")) def test_parse_missing_operand(self): """Reject missing operand.""" with self.assertRaisesRegex(parser.ParseException, "empty constant"): self.parser.parse_argument_expression(self._tokenize("arg0 ==")) class ParseFilterTests(unittest.TestCase): """Tests for PolicyParser.parse_filter.""" def setUp(self): self.arch = ARCH_64 self.parser = parser.PolicyParser( self.arch, kill_action=bpf.KillProcess() ) def _tokenize(self, line): # pylint: disable=protected-access return list(self.parser._parser_state.tokenize([line]))[0] def test_parse_filter(self): """Accept valid filters.""" self.assertEqual( self.parser.parse_filter(self._tokenize("arg0 == 0")), [ parser.Filter([[parser.Atom(0, "==", 0)]], bpf.Allow()), ], ) self.assertEqual( self.parser.parse_filter(self._tokenize("kill-process")), [ parser.Filter(None, bpf.KillProcess()), ], ) self.assertEqual( self.parser.parse_filter(self._tokenize("kill-thread")), [ parser.Filter(None, bpf.KillThread()), ], ) self.assertEqual( self.parser.parse_filter(self._tokenize("trap")), [ parser.Filter(None, bpf.Trap()), ], ) self.assertEqual( self.parser.parse_filter(self._tokenize("return ENOSYS")), [ parser.Filter( None, bpf.ReturnErrno(self.arch.constants["ENOSYS"]) ), ], ) self.assertEqual( self.parser.parse_filter(self._tokenize("trace")), [ parser.Filter(None, bpf.Trace()), ], ) self.assertEqual( self.parser.parse_filter(self._tokenize("user-notify")), [ parser.Filter(None, bpf.UserNotify()), ], ) self.assertEqual( self.parser.parse_filter(self._tokenize("log")), [ parser.Filter(None, bpf.Log()), ], ) self.assertEqual( self.parser.parse_filter(self._tokenize("allow")), [ parser.Filter(None, bpf.Allow()), ], ) self.assertEqual( self.parser.parse_filter(self._tokenize("1")), [ parser.Filter(None, bpf.Allow()), ], ) self.assertEqual( self.parser.parse_filter( self._tokenize("{ arg0 == 0, arg0 == 1; return ENOSYS, trap }") ), [ parser.Filter([[parser.Atom(0, "==", 0)]], bpf.Allow()), parser.Filter( [[parser.Atom(0, "==", 1)]], bpf.ReturnErrno(self.arch.constants["ENOSYS"]), ), parser.Filter(None, bpf.Trap()), ], ) def test_parse_missing_return_value(self): """Reject missing return value.""" with self.assertRaisesRegex( parser.ParseException, "missing return value" ): self.parser.parse_filter(self._tokenize("return")) def test_parse_invalid_return_value(self): """Reject invalid return value.""" with self.assertRaisesRegex(parser.ParseException, "invalid constant"): self.parser.parse_filter(self._tokenize("return arg0")) def test_parse_unclosed_brace(self): """Reject unclosed brace.""" with self.assertRaisesRegex(parser.ParseException, "unclosed brace"): self.parser.parse_filter(self._tokenize("{ allow")) class ParseFilterDenylistTests(unittest.TestCase): """Tests for PolicyParser.parse_filter with a denylist policy.""" def setUp(self): self.arch = ARCH_64 self.kill_action = bpf.KillProcess() self.parser = parser.PolicyParser( self.arch, kill_action=self.kill_action, denylist=True ) def _tokenize(self, line): # pylint: disable=protected-access return list(self.parser._parser_state.tokenize([line]))[0] def test_parse_filter(self): """Accept only filters that return an errno.""" self.assertEqual( self.parser.parse_filter( self._tokenize("arg0 == 0; return ENOSYS") ), [ parser.Filter( [[parser.Atom(0, "==", 0)]], bpf.ReturnErrno(self.arch.constants["ENOSYS"]), ), ], ) class ParseFilterStatementTests(unittest.TestCase): """Tests for PolicyParser.parse_filter_statement.""" def setUp(self): self.arch = ARCH_64 self.parser = parser.PolicyParser( self.arch, kill_action=bpf.KillProcess() ) def _tokenize(self, line): # pylint: disable=protected-access return list(self.parser._parser_state.tokenize([line]))[0] def assertEqualIgnoringToken(self, actual, expected, msg=None): """Similar to assertEqual, but ignores the token field.""" if ( actual.syscalls != expected.syscalls or actual.filters != expected.filters ): self.fail("%r != %r: %s" % (actual, expected, msg)) def test_parse_filter_statement(self): """Accept valid filter statements.""" self.assertEqualIgnoringToken( self.parser.parse_filter_statement( self._tokenize("read: arg0 == 0") ), parser.ParsedFilterStatement( syscalls=(parser.Syscall("read", 0),), filters=[ parser.Filter([[parser.Atom(0, "==", 0)]], bpf.Allow()), ], token=None, ), ) self.assertEqualIgnoringToken( self.parser.parse_filter_statement( self._tokenize("{read, write}: arg0 == 0") ), parser.ParsedFilterStatement( syscalls=( parser.Syscall("read", 0), parser.Syscall("write", 1), ), filters=[ parser.Filter([[parser.Atom(0, "==", 0)]], bpf.Allow()), ], token=None, ), ) self.assertEqualIgnoringToken( self.parser.parse_filter_statement( self._tokenize("io@libc: arg0 == 0") ), parser.ParsedFilterStatement( syscalls=( parser.Syscall("read", 0), parser.Syscall("write", 1), ), filters=[ parser.Filter([[parser.Atom(0, "==", 0)]], bpf.Allow()), ], token=None, ), ) self.assertEqualIgnoringToken( self.parser.parse_filter_statement( self._tokenize("file-io@systemd: arg0 == 0") ), parser.ParsedFilterStatement( syscalls=( parser.Syscall("read", 0), parser.Syscall("write", 1), ), filters=[ parser.Filter([[parser.Atom(0, "==", 0)]], bpf.Allow()), ], token=None, ), ) self.assertEqualIgnoringToken( self.parser.parse_filter_statement( self._tokenize("kill: arg0 == 0") ), parser.ParsedFilterStatement( syscalls=(parser.Syscall("kill", 62),), filters=[ parser.Filter([[parser.Atom(0, "==", 0)]], bpf.Allow()), ], token=None, ), ) def test_parse_metadata(self): """Accept valid filter statements with metadata.""" self.assertEqualIgnoringToken( self.parser.parse_filter_statement( self._tokenize("read[arch=test]: arg0 == 0") ), parser.ParsedFilterStatement( syscalls=(parser.Syscall("read", 0),), filters=[ parser.Filter([[parser.Atom(0, "==", 0)]], bpf.Allow()), ], token=None, ), ) self.assertEqualIgnoringToken( self.parser.parse_filter_statement( self._tokenize( "{read, nonexistent[arch=nonexistent]}: arg0 == 0" ) ), parser.ParsedFilterStatement( syscalls=(parser.Syscall("read", 0),), filters=[ parser.Filter([[parser.Atom(0, "==", 0)]], bpf.Allow()), ], token=None, ), ) def test_parse_unclosed_brace(self): """Reject unclosed brace.""" with self.assertRaisesRegex(parser.ParseException, "unclosed brace"): self.parser.parse_filter(self._tokenize("{ allow")) def test_parse_invalid_syscall_group(self): """Reject invalid syscall groups.""" with self.assertRaisesRegex(parser.ParseException, "unclosed brace"): self.parser.parse_filter_statement( self._tokenize("{ read, write: arg0 == 0") ) def test_parse_missing_colon(self): """Reject missing colon.""" with self.assertRaisesRegex(parser.ParseException, "missing colon"): self.parser.parse_filter_statement(self._tokenize("read")) def test_parse_invalid_colon(self): """Reject invalid colon.""" with self.assertRaisesRegex(parser.ParseException, "invalid colon"): self.parser.parse_filter_statement(self._tokenize("read arg0")) def test_parse_missing_filter(self): """Reject missing filter.""" with self.assertRaisesRegex(parser.ParseException, "missing filter"): self.parser.parse_filter_statement(self._tokenize("read:")) class ParseFileTests(unittest.TestCase): """Tests for PolicyParser.parse_file.""" def setUp(self): self.arch = ARCH_64 self.parser = parser.PolicyParser( self.arch, kill_action=bpf.KillProcess() ) self.tempdir = tempfile.mkdtemp() def tearDown(self): shutil.rmtree(self.tempdir) def _write_file(self, filename, contents): """Helper to write out a file for testing.""" path = os.path.join(self.tempdir, filename) with open(path, "w", encoding="utf-8") as outf: outf.write(contents) return path def test_parse_simple(self): """Allow simple policy files.""" path = self._write_file( "test.policy", """ # Comment. read: allow write: allow """, ) self.assertEqual( self.parser.parse_file(path), parser.ParsedPolicy( default_action=bpf.KillProcess(), filter_statements=[ parser.FilterStatement( syscall=parser.Syscall("read", 0), frequency=1, filters=[ parser.Filter(None, bpf.Allow()), ], ), parser.FilterStatement( syscall=parser.Syscall("write", 1), frequency=1, filters=[ parser.Filter(None, bpf.Allow()), ], ), ], ), ) def test_parse_multiline(self): """Allow simple multi-line policy files.""" path = self._write_file( "test.policy", """ # Comment. read: \ allow write: allow """, ) self.assertEqual( self.parser.parse_file(path), parser.ParsedPolicy( default_action=bpf.KillProcess(), filter_statements=[ parser.FilterStatement( syscall=parser.Syscall("read", 0), frequency=1, filters=[ parser.Filter(None, bpf.Allow()), ], ), parser.FilterStatement( syscall=parser.Syscall("write", 1), frequency=1, filters=[ parser.Filter(None, bpf.Allow()), ], ), ], ), ) def test_parse_default(self): """Allow defining a default action.""" path = self._write_file( "test.policy", """ @default kill-thread read: allow """, ) self.assertEqual( self.parser.parse_file(path), parser.ParsedPolicy( default_action=bpf.KillThread(), filter_statements=[ parser.FilterStatement( syscall=parser.Syscall("read", 0), frequency=1, filters=[ parser.Filter(None, bpf.Allow()), ], ), ], ), ) def test_parse_default_permissive(self): """Reject defining a permissive default action.""" path = self._write_file( "test.policy", """ @default log read: allow """, ) with self.assertRaisesRegex( parser.ParseException, r"invalid permissive default action" ): self.parser.parse_file(path) def test_parse_simple_grouped(self): """Allow simple policy files.""" path = self._write_file( "test.policy", """ # Comment. {read, write}: allow """, ) self.assertEqual( self.parser.parse_file(path), parser.ParsedPolicy( default_action=bpf.KillProcess(), filter_statements=[ parser.FilterStatement( syscall=parser.Syscall("read", 0), frequency=1, filters=[ parser.Filter(None, bpf.Allow()), ], ), parser.FilterStatement( syscall=parser.Syscall("write", 1), frequency=1, filters=[ parser.Filter(None, bpf.Allow()), ], ), ], ), ) def test_parse_other_arch(self): """Allow entries that only target another architecture.""" path = self._write_file( "test.policy", """ # Comment. read[arch=nonexistent]: allow write: allow """, ) self.assertEqual( self.parser.parse_file(path), parser.ParsedPolicy( default_action=bpf.KillProcess(), filter_statements=[ parser.FilterStatement( syscall=parser.Syscall("write", 1), frequency=1, filters=[ parser.Filter(None, bpf.Allow()), ], ), ], ), ) def test_parse_include(self): """Allow including policy files.""" path = self._write_file( "test.include.policy", """ {read, write}: arg0 == 0; allow """, ) path = self._write_file( "test.policy", """ @include ./test.include.policy read: return ENOSYS """, ) self.assertEqual( self.parser.parse_file(path), parser.ParsedPolicy( default_action=bpf.KillProcess(), filter_statements=[ parser.FilterStatement( syscall=parser.Syscall("read", 0), frequency=1, filters=[ parser.Filter( [[parser.Atom(0, "==", 0)]], bpf.Allow() ), parser.Filter( None, bpf.ReturnErrno(self.arch.constants["ENOSYS"]), ), ], ), parser.FilterStatement( syscall=parser.Syscall("write", 1), frequency=1, filters=[ parser.Filter( [[parser.Atom(0, "==", 0)]], bpf.Allow() ), parser.Filter(None, bpf.KillProcess()), ], ), ], ), ) def test_parse_invalid_include(self): """Reject including invalid policy files.""" with self.assertRaisesRegex( parser.ParseException, r"empty include path" ): path = self._write_file( "test.policy", """ @include """, ) self.parser.parse_file(path) with self.assertRaisesRegex( parser.ParseException, r"invalid include path" ): path = self._write_file( "test.policy", """ @include arg0 """, ) self.parser.parse_file(path) with self.assertRaisesRegex( parser.ParseException, r"@include statement nested too deep" ): path = self._write_file( "test.policy", """ @include ./test.policy """, ) self.parser.parse_file(path) with self.assertRaisesRegex( parser.ParseException, r"Could not @include .*" ): path = self._write_file( "test.policy", """ @include ./nonexistent.policy """, ) self.parser.parse_file(path) def test_parse_frequency(self): """Allow including frequency files.""" self._write_file( "test.frequency", """ read: 2 write: 3 """, ) path = self._write_file( "test.policy", """ @frequency ./test.frequency read: allow """, ) self.assertEqual( self.parser.parse_file(path), parser.ParsedPolicy( default_action=bpf.KillProcess(), filter_statements=[ parser.FilterStatement( syscall=parser.Syscall("read", 0), frequency=2, filters=[ parser.Filter(None, bpf.Allow()), ], ), ], ), ) def test_parse_invalid_frequency(self): """Reject including invalid frequency files.""" path = self._write_file( "test.policy", """@frequency ./test.frequency""" ) with self.assertRaisesRegex(parser.ParseException, r"missing colon"): self._write_file( "test.frequency", """ read """, ) self.parser.parse_file(path) with self.assertRaisesRegex(parser.ParseException, r"invalid colon"): self._write_file( "test.frequency", """ read foo """, ) self.parser.parse_file(path) with self.assertRaisesRegex(parser.ParseException, r"missing number"): self._write_file( "test.frequency", """ read: """, ) self.parser.parse_file(path) with self.assertRaisesRegex(parser.ParseException, r"invalid number"): self._write_file( "test.frequency", """ read: foo """, ) self.parser.parse_file(path) with self.assertRaisesRegex(parser.ParseException, r"invalid number"): self._write_file( "test.frequency", """ read: -1 """, ) self.parser.parse_file(path) with self.assertRaisesRegex( parser.ParseException, r"empty frequency path" ): path = self._write_file( "test.policy", """ @frequency """, ) self.parser.parse_file(path) with self.assertRaisesRegex( parser.ParseException, r"invalid frequency path" ): path = self._write_file( "test.policy", """ @frequency arg0 """, ) self.parser.parse_file(path) with self.assertRaisesRegex( parser.ParseException, r"Could not open frequency file.*" ): path = self._write_file( "test.policy", """ @frequency ./nonexistent.frequency """, ) self.parser.parse_file(path) def test_parse_multiple_unconditional(self): """Reject actions after an unconditional action.""" path = self._write_file( "test.policy", """ read: allow read: allow """, ) with self.assertRaisesRegex( parser.ParseException, ( r"test.policy\(3:17\): " r"Syscall read.*already had an unconditional action " r"applied" ), ): self.parser.parse_file(path) path = self._write_file( "test.policy", """ read: log read: arg0 == 0; log """, ) with self.assertRaisesRegex( parser.ParseException, ( r"test.policy\(3:17\): " r"Syscall read.*already had an unconditional action " r"applied" ), ): self.parser.parse_file(path) def test_parse_allowlist_denylist_header(self): """Reject trying to compile denylist policy file as allowlist.""" with self.assertRaisesRegex( parser.ParseException, r"policy is denylist, but flag --denylist " "not passed in", ): path = self._write_file( "test.policy", """ @denylist """, ) self.parser.parse_file(path) class ParseFileDenylistTests(unittest.TestCase): """Tests for PolicyParser.parse_file.""" def setUp(self): self.arch = ARCH_64 self.kill_action = bpf.KillProcess() self.parser = parser.PolicyParser( self.arch, kill_action=self.kill_action, denylist=True ) self.tempdir = tempfile.mkdtemp() def tearDown(self): shutil.rmtree(self.tempdir) def _write_file(self, filename, contents): """Helper to write out a file for testing.""" path = os.path.join(self.tempdir, filename) with open(path, "w", encoding="utf-8") as outf: outf.write(contents) return path def test_parse_simple(self): """Allow simple denylist policy files.""" path = self._write_file( "test.policy", """ # Comment. @denylist read: return ENOSYS write: return ENOSYS """, ) self.assertEqual( self.parser.parse_file(path), parser.ParsedPolicy( default_action=bpf.Allow(), filter_statements=[ parser.FilterStatement( syscall=parser.Syscall("read", 0), frequency=1, filters=[ parser.Filter( None, bpf.ReturnErrno(self.arch.constants["ENOSYS"]), ), ], ), parser.FilterStatement( syscall=parser.Syscall("write", 1), frequency=1, filters=[ parser.Filter( None, bpf.ReturnErrno(self.arch.constants["ENOSYS"]), ), ], ), ], ), ) def test_parse_simple_with_arg(self): """Allow simple denylist policy files.""" path = self._write_file( "test.policy", """ # Comment. @denylist read: return ENOSYS write: arg0 == 0 ; return ENOSYS """, ) self.assertEqual( self.parser.parse_file(path), parser.ParsedPolicy( default_action=bpf.Allow(), filter_statements=[ parser.FilterStatement( syscall=parser.Syscall("read", 0), frequency=1, filters=[ parser.Filter( None, bpf.ReturnErrno(self.arch.constants["ENOSYS"]), ), ], ), parser.FilterStatement( syscall=parser.Syscall("write", 1), frequency=1, filters=[ parser.Filter( [[parser.Atom(0, "==", 0)]], bpf.ReturnErrno(self.arch.constants["ENOSYS"]), ), parser.Filter(None, bpf.Allow()), ], ), ], ), ) def test_parse_denylist_no_header(self): """Reject trying to compile denylist policy file as allowlist.""" with self.assertRaisesRegex( parser.ParseException, r"policy must contain @denylist flag to be " "compiled with --denylist flag", ): path = self._write_file( "test.policy", """ read: return ENOSYS """, ) self.parser.parse_file(path) if __name__ == "__main__": unittest.main()