• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2018 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17"""Unittests for the parser module."""
18
19from __future__ import absolute_import
20from __future__ import division
21from __future__ import print_function
22
23import os
24import shutil
25import tempfile
26import unittest
27
28import arch
29import bpf
30import parser  # pylint: disable=wrong-import-order
31
32ARCH_64 = arch.Arch.load_from_json(
33    os.path.join(
34        os.path.dirname(os.path.abspath(__file__)), 'testdata/arch_64.json'))
35
36
37class TokenizerTests(unittest.TestCase):
38    """Tests for ParserState.tokenize."""
39
40    @staticmethod
41    def _tokenize(line):
42        parser_state = parser.ParserState('<memory>')
43        return list(parser_state.tokenize([line]))[0]
44
45    def test_tokenize(self):
46        """Accept valid tokens."""
47        self.assertEqual([
48            (token.type, token.value)
49            for token in TokenizerTests._tokenize('@include /minijail.policy')
50        ], [
51            ('INCLUDE', '@include'),
52            ('PATH', '/minijail.policy'),
53        ])
54        self.assertEqual([
55            (token.type, token.value)
56            for token in TokenizerTests._tokenize('@include ./minijail.policy')
57        ], [
58            ('INCLUDE', '@include'),
59            ('PATH', './minijail.policy'),
60        ])
61        self.assertEqual(
62            [(token.type, token.value) for token in TokenizerTests._tokenize(
63                'read: arg0 in ~0xffff || arg0 & (1|2) && arg0 == 0755; '
64                'return ENOSYS # ignored')], [
65                    ('IDENTIFIER', 'read'),
66                    ('COLON', ':'),
67                    ('ARGUMENT', 'arg0'),
68                    ('OP', 'in'),
69                    ('BITWISE_COMPLEMENT', '~'),
70                    ('NUMERIC_CONSTANT', '0xffff'),
71                    ('OR', '||'),
72                    ('ARGUMENT', 'arg0'),
73                    ('OP', '&'),
74                    ('LPAREN', '('),
75                    ('NUMERIC_CONSTANT', '1'),
76                    ('BITWISE_OR', '|'),
77                    ('NUMERIC_CONSTANT', '2'),
78                    ('RPAREN', ')'),
79                    ('AND', '&&'),
80                    ('ARGUMENT', 'arg0'),
81                    ('OP', '=='),
82                    ('NUMERIC_CONSTANT', '0755'),
83                    ('SEMICOLON', ';'),
84                    ('RETURN', 'return'),
85                    ('IDENTIFIER', 'ENOSYS'),
86                ])
87        # Ensure that tokens that have an otherwise valid token as prefix are
88        # still matched correctly.
89        self.assertEqual([
90            (token.type, token.value)
91            for token in TokenizerTests._tokenize(
92                'inotify_wait return_sys killall trace_sys')
93        ], [
94            ('IDENTIFIER', 'inotify_wait'),
95            ('IDENTIFIER', 'return_sys'),
96            ('IDENTIFIER', 'killall'),
97            ('IDENTIFIER', 'trace_sys'),
98        ])
99
100    def test_tokenize_invalid_token(self):
101        """Reject tokenizer errors."""
102        with self.assertRaisesRegex(parser.ParseException,
103                                    (r'<memory>\(1:1\): invalid token\n'
104                                     r'    %invalid-token%\n'
105                                     r'    \^')):
106            TokenizerTests._tokenize('%invalid-token%')
107
108
109class ParseConstantTests(unittest.TestCase):
110    """Tests for PolicyParser.parse_value."""
111
112    def setUp(self):
113        self.arch = ARCH_64
114        self.parser = parser.PolicyParser(
115            self.arch, kill_action=bpf.KillProcess())
116
117    def _tokenize(self, line):
118        # pylint: disable=protected-access
119        return list(self.parser._parser_state.tokenize([line]))[0]
120
121    def test_parse_constant_unsigned(self):
122        """Accept reasonably-sized unsigned constants."""
123        self.assertEqual(
124            self.parser.parse_value(self._tokenize('0x80000000')), 0x80000000)
125        if self.arch.bits == 64:
126            self.assertEqual(
127                self.parser.parse_value(self._tokenize('0x8000000000000000')),
128                0x8000000000000000)
129
130    def test_parse_constant_unsigned_too_big(self):
131        """Reject unreasonably-sized unsigned constants."""
132        if self.arch.bits == 32:
133            with self.assertRaisesRegex(parser.ParseException,
134                                        'unsigned overflow'):
135                self.parser.parse_value(self._tokenize('0x100000000'))
136        with self.assertRaisesRegex(parser.ParseException,
137                                    'unsigned overflow'):
138            self.parser.parse_value(self._tokenize('0x10000000000000000'))
139
140    def test_parse_constant_signed(self):
141        """Accept reasonably-sized signed constants."""
142        self.assertEqual(
143            self.parser.parse_value(self._tokenize('-1')),
144            self.arch.max_unsigned)
145
146    def test_parse_constant_signed_too_negative(self):
147        """Reject unreasonably-sized signed constants."""
148        if self.arch.bits == 32:
149            with self.assertRaisesRegex(parser.ParseException,
150                                        'signed underflow'):
151                self.parser.parse_value(self._tokenize('-0x800000001'))
152        with self.assertRaisesRegex(parser.ParseException, 'signed underflow'):
153            self.parser.parse_value(self._tokenize('-0x8000000000000001'))
154
155    def test_parse_mask(self):
156        """Accept parsing a mask value."""
157        self.assertEqual(
158            self.parser.parse_value(self._tokenize('0x1|0x2|0x4|0x8')), 0xf)
159
160    def test_parse_parenthesized_expressions(self):
161        """Accept parsing parenthesized expressions."""
162        bad_expressions = [
163            '(1',
164            '|(1)',
165            '(1)|',
166            '()',
167            '(',
168            '((',
169            '(()',
170            '(()1',
171        ]
172        for expression in bad_expressions:
173            with self.assertRaises(parser.ParseException, msg=expression):
174                self.parser.parse_value(self._tokenize(expression))
175
176        bad_partial_expressions = [
177            '1)',
178            '(1)1',
179            '1(0)',
180        ]
181        for expression in bad_partial_expressions:
182            tokens = self._tokenize(expression)
183            self.parser.parse_value(tokens)
184            self.assertNotEqual(tokens, [])
185
186        good_expressions = [
187            '(3)',
188            '(1)|2',
189            '1|(2)',
190            '(1)|(2)',
191            '((3))',
192            '0|(1|2)',
193            '(0|1|2)',
194        ]
195        for expression in good_expressions:
196            self.assertEqual(
197                self.parser.parse_value(self._tokenize(expression)), 3)
198
199    def test_parse_constant_complements(self):
200        """Accept complementing constants."""
201        self.assertEqual(
202            self.parser.parse_value(self._tokenize('~0')),
203            self.arch.max_unsigned)
204        self.assertEqual(
205            self.parser.parse_value(self._tokenize('~0|~0')),
206            self.arch.max_unsigned)
207        if self.arch.bits == 32:
208            self.assertEqual(
209                self.parser.parse_value(
210                    self._tokenize('~0x005AF0FF|~0xFFA50FFF')), 0xFFFFFF00)
211            self.assertEqual(
212                self.parser.parse_value(
213                    self._tokenize('0x0F|~(0x005AF000|0x00A50FFF)|0xF0')),
214                0xFF0000FF)
215        else:
216            self.assertEqual(
217                self.parser.parse_value(
218                    self._tokenize('~0x00005A5AF0F0FFFF|~0xFFFFA5A50F0FFFFF')),
219                0xFFFFFFFFFFFF0000)
220            self.assertEqual(
221                self.parser.parse_value(
222                    self._tokenize(
223                        '0x00FF|~(0x00005A5AF0F00000|0x0000A5A50F0FFFFF)|0xFF00'
224                    )), 0xFFFF00000000FFFF)
225
226    def test_parse_double_complement(self):
227        """Reject double-complementing constants."""
228        with self.assertRaisesRegex(parser.ParseException,
229                                    'double complement'):
230            self.parser.parse_value(self._tokenize('~~0'))
231
232    def test_parse_empty_complement(self):
233        """Reject complementing nothing."""
234        with self.assertRaisesRegex(parser.ParseException, 'empty complement'):
235            self.parser.parse_value(self._tokenize('0|~'))
236
237    def test_parse_named_constant(self):
238        """Accept parsing a named constant."""
239        self.assertEqual(
240            self.parser.parse_value(self._tokenize('O_RDONLY')), 0)
241
242    def test_parse_empty_constant(self):
243        """Reject parsing nothing."""
244        with self.assertRaisesRegex(parser.ParseException, 'empty constant'):
245            self.parser.parse_value([])
246        with self.assertRaisesRegex(parser.ParseException, 'empty constant'):
247            self.parser.parse_value(self._tokenize('0|'))
248
249    def test_parse_invalid_constant(self):
250        """Reject parsing invalid constants."""
251        with self.assertRaisesRegex(parser.ParseException, 'invalid constant'):
252            self.parser.parse_value(self._tokenize('foo'))
253
254
255class ParseFilterExpressionTests(unittest.TestCase):
256    """Tests for PolicyParser.parse_argument_expression."""
257
258    def setUp(self):
259        self.arch = ARCH_64
260        self.parser = parser.PolicyParser(
261            self.arch, kill_action=bpf.KillProcess())
262
263    def _tokenize(self, line):
264        # pylint: disable=protected-access
265        return list(self.parser._parser_state.tokenize([line]))[0]
266
267    def test_parse_argument_expression(self):
268        """Accept valid argument expressions."""
269        self.assertEqual(
270            self.parser.parse_argument_expression(
271                self._tokenize(
272                    'arg0 in 0xffff || arg0 == PROT_EXEC && arg1 == PROT_WRITE'
273                )), [
274                    [parser.Atom(0, 'in', 0xffff)],
275                    [parser.Atom(0, '==', 4),
276                     parser.Atom(1, '==', 2)],
277                ])
278
279    def test_parse_number_argument_expression(self):
280        """Accept valid argument expressions with any octal/decimal/hex number."""
281        # 4607 == 010777 == 0x11ff
282        self.assertEqual(
283            self.parser.parse_argument_expression(
284                self._tokenize('arg0 in 4607')), [
285                    [parser.Atom(0, 'in', 4607)],
286            ])
287
288        self.assertEqual(
289            self.parser.parse_argument_expression(
290                self._tokenize('arg0 in 010777')), [
291                    [parser.Atom(0, 'in', 4607)],
292            ])
293
294        self.assertEqual(
295            self.parser.parse_argument_expression(
296                self._tokenize('arg0 in 0x11ff')), [
297                    [parser.Atom(0, 'in', 4607)],
298            ])
299
300    def test_parse_empty_argument_expression(self):
301        """Reject empty argument expressions."""
302        with self.assertRaisesRegex(parser.ParseException,
303                                    'empty argument expression'):
304            self.parser.parse_argument_expression(
305                self._tokenize('arg0 in 0xffff ||'))
306
307    def test_parse_empty_clause(self):
308        """Reject empty clause."""
309        with self.assertRaisesRegex(parser.ParseException, 'empty clause'):
310            self.parser.parse_argument_expression(
311                self._tokenize('arg0 in 0xffff &&'))
312
313    def test_parse_invalid_argument(self):
314        """Reject invalid argument."""
315        with self.assertRaisesRegex(parser.ParseException, 'invalid argument'):
316            self.parser.parse_argument_expression(
317                self._tokenize('argX in 0xffff'))
318
319    def test_parse_invalid_operator(self):
320        """Reject invalid operator."""
321        with self.assertRaisesRegex(parser.ParseException, 'invalid operator'):
322            self.parser.parse_argument_expression(
323                self._tokenize('arg0 = 0xffff'))
324
325    def test_parse_missing_operator(self):
326        """Reject missing operator."""
327        with self.assertRaisesRegex(parser.ParseException, 'missing operator'):
328            self.parser.parse_argument_expression(self._tokenize('arg0'))
329
330    def test_parse_missing_operand(self):
331        """Reject missing operand."""
332        with self.assertRaisesRegex(parser.ParseException, 'empty constant'):
333            self.parser.parse_argument_expression(self._tokenize('arg0 =='))
334
335
336class ParseFilterTests(unittest.TestCase):
337    """Tests for PolicyParser.parse_filter."""
338
339    def setUp(self):
340        self.arch = ARCH_64
341        self.parser = parser.PolicyParser(
342            self.arch, kill_action=bpf.KillProcess())
343
344    def _tokenize(self, line):
345        # pylint: disable=protected-access
346        return list(self.parser._parser_state.tokenize([line]))[0]
347
348    def test_parse_filter(self):
349        """Accept valid filters."""
350        self.assertEqual(
351            self.parser.parse_filter(self._tokenize('arg0 == 0')), [
352                parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
353            ])
354        self.assertEqual(
355            self.parser.parse_filter(self._tokenize('kill-process')), [
356                parser.Filter(None, bpf.KillProcess()),
357            ])
358        self.assertEqual(
359            self.parser.parse_filter(self._tokenize('kill-thread')), [
360                parser.Filter(None, bpf.KillThread()),
361            ])
362        self.assertEqual(
363            self.parser.parse_filter(self._tokenize('trap')), [
364                parser.Filter(None, bpf.Trap()),
365            ])
366        self.assertEqual(
367            self.parser.parse_filter(self._tokenize('return ENOSYS')), [
368                parser.Filter(None,
369                              bpf.ReturnErrno(self.arch.constants['ENOSYS'])),
370            ])
371        self.assertEqual(
372            self.parser.parse_filter(self._tokenize('trace')), [
373                parser.Filter(None, bpf.Trace()),
374            ])
375        self.assertEqual(
376            self.parser.parse_filter(self._tokenize('user-notify')), [
377                parser.Filter(None, bpf.UserNotify()),
378            ])
379        self.assertEqual(
380            self.parser.parse_filter(self._tokenize('log')), [
381                parser.Filter(None, bpf.Log()),
382            ])
383        self.assertEqual(
384            self.parser.parse_filter(self._tokenize('allow')), [
385                parser.Filter(None, bpf.Allow()),
386            ])
387        self.assertEqual(
388            self.parser.parse_filter(self._tokenize('1')), [
389                parser.Filter(None, bpf.Allow()),
390            ])
391        self.assertEqual(
392            self.parser.parse_filter(
393                self._tokenize(
394                    '{ arg0 == 0, arg0 == 1; return ENOSYS, trap }')),
395            [
396                parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
397                parser.Filter([[parser.Atom(0, '==', 1)]],
398                              bpf.ReturnErrno(self.arch.constants['ENOSYS'])),
399                parser.Filter(None, bpf.Trap()),
400            ])
401
402    def test_parse_missing_return_value(self):
403        """Reject missing return value."""
404        with self.assertRaisesRegex(parser.ParseException,
405                                    'missing return value'):
406            self.parser.parse_filter(self._tokenize('return'))
407
408    def test_parse_invalid_return_value(self):
409        """Reject invalid return value."""
410        with self.assertRaisesRegex(parser.ParseException, 'invalid constant'):
411            self.parser.parse_filter(self._tokenize('return arg0'))
412
413    def test_parse_unclosed_brace(self):
414        """Reject unclosed brace."""
415        with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'):
416            self.parser.parse_filter(self._tokenize('{ allow'))
417
418
419class ParseFilterDenylistTests(unittest.TestCase):
420    """Tests for PolicyParser.parse_filter with a denylist policy."""
421
422    def setUp(self):
423        self.arch = ARCH_64
424        self.kill_action = bpf.KillProcess()
425        self.parser = parser.PolicyParser(
426            self.arch, kill_action=self.kill_action, denylist=True)
427
428    def _tokenize(self, line):
429        # pylint: disable=protected-access
430        return list(self.parser._parser_state.tokenize([line]))[0]
431
432    def test_parse_filter(self):
433        """Accept only filters that return an errno."""
434        self.assertEqual(
435            self.parser.parse_filter(self._tokenize('arg0 == 0; return ENOSYS')),
436            [
437                parser.Filter([[parser.Atom(0, '==', 0)]],
438                bpf.ReturnErrno(self.arch.constants['ENOSYS'])),
439            ])
440
441
442class ParseFilterStatementTests(unittest.TestCase):
443    """Tests for PolicyParser.parse_filter_statement."""
444
445    def setUp(self):
446        self.arch = ARCH_64
447        self.parser = parser.PolicyParser(
448            self.arch, kill_action=bpf.KillProcess())
449
450    def _tokenize(self, line):
451        # pylint: disable=protected-access
452        return list(self.parser._parser_state.tokenize([line]))[0]
453
454    def assertEqualIgnoringToken(self, actual, expected, msg=None):
455        """Similar to assertEqual, but ignores the token field."""
456        if (actual.syscalls != expected.syscalls or
457            actual.filters != expected.filters):
458            self.fail('%r != %r' % (actual, expected), msg)
459
460    def test_parse_filter_statement(self):
461        """Accept valid filter statements."""
462        self.assertEqualIgnoringToken(
463            self.parser.parse_filter_statement(
464                self._tokenize('read: arg0 == 0')),
465            parser.ParsedFilterStatement(
466                syscalls=(parser.Syscall('read', 0), ),
467                filters=[
468                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
469                ],
470                token=None))
471        self.assertEqualIgnoringToken(
472            self.parser.parse_filter_statement(
473                self._tokenize('{read, write}: arg0 == 0')),
474            parser.ParsedFilterStatement(
475                syscalls=(
476                    parser.Syscall('read', 0),
477                    parser.Syscall('write', 1),
478                ),
479                filters=[
480                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
481                ],
482                token=None))
483        self.assertEqualIgnoringToken(
484            self.parser.parse_filter_statement(
485                self._tokenize('io@libc: arg0 == 0')),
486            parser.ParsedFilterStatement(
487                syscalls=(
488                    parser.Syscall('read', 0),
489                    parser.Syscall('write', 1),
490                ),
491                filters=[
492                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
493                ],
494                token=None))
495        self.assertEqualIgnoringToken(
496            self.parser.parse_filter_statement(
497                self._tokenize('file-io@systemd: arg0 == 0')),
498            parser.ParsedFilterStatement(
499                syscalls=(
500                    parser.Syscall('read', 0),
501                    parser.Syscall('write', 1),
502                ),
503                filters=[
504                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
505                ],
506                token=None))
507        self.assertEqualIgnoringToken(
508            self.parser.parse_filter_statement(
509                self._tokenize('kill: arg0 == 0')),
510            parser.ParsedFilterStatement(
511                syscalls=(
512                    parser.Syscall('kill', 62),
513                ),
514                filters=[
515                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
516                ],
517                token=None))
518
519    def test_parse_metadata(self):
520        """Accept valid filter statements with metadata."""
521        self.assertEqualIgnoringToken(
522            self.parser.parse_filter_statement(
523                self._tokenize('read[arch=test]: arg0 == 0')),
524            parser.ParsedFilterStatement(
525                syscalls=(
526                    parser.Syscall('read', 0),
527                ),
528                filters=[
529                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
530                ],
531                token=None))
532        self.assertEqualIgnoringToken(
533            self.parser.parse_filter_statement(
534                self._tokenize(
535                    '{read, nonexistent[arch=nonexistent]}: arg0 == 0')),
536            parser.ParsedFilterStatement(
537                syscalls=(
538                    parser.Syscall('read', 0),
539                ),
540                filters=[
541                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
542                ],
543                token=None))
544
545    def test_parse_unclosed_brace(self):
546        """Reject unclosed brace."""
547        with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'):
548            self.parser.parse_filter(self._tokenize('{ allow'))
549
550    def test_parse_invalid_syscall_group(self):
551        """Reject invalid syscall groups."""
552        with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'):
553            self.parser.parse_filter_statement(
554                self._tokenize('{ read, write: arg0 == 0'))
555
556    def test_parse_missing_colon(self):
557        """Reject missing colon."""
558        with self.assertRaisesRegex(parser.ParseException, 'missing colon'):
559            self.parser.parse_filter_statement(self._tokenize('read'))
560
561    def test_parse_invalid_colon(self):
562        """Reject invalid colon."""
563        with self.assertRaisesRegex(parser.ParseException, 'invalid colon'):
564            self.parser.parse_filter_statement(self._tokenize('read arg0'))
565
566    def test_parse_missing_filter(self):
567        """Reject missing filter."""
568        with self.assertRaisesRegex(parser.ParseException, 'missing filter'):
569            self.parser.parse_filter_statement(self._tokenize('read:'))
570
571
572class ParseFileTests(unittest.TestCase):
573    """Tests for PolicyParser.parse_file."""
574
575    def setUp(self):
576        self.arch = ARCH_64
577        self.parser = parser.PolicyParser(
578            self.arch, kill_action=bpf.KillProcess())
579        self.tempdir = tempfile.mkdtemp()
580
581    def tearDown(self):
582        shutil.rmtree(self.tempdir)
583
584    def _write_file(self, filename, contents):
585        """Helper to write out a file for testing."""
586        path = os.path.join(self.tempdir, filename)
587        with open(path, 'w') as outf:
588            outf.write(contents)
589        return path
590
591    def test_parse_simple(self):
592        """Allow simple policy files."""
593        path = self._write_file(
594            'test.policy', """
595            # Comment.
596            read: allow
597            write: allow
598        """)
599
600        self.assertEqual(
601            self.parser.parse_file(path),
602            parser.ParsedPolicy(
603                default_action=bpf.KillProcess(),
604                filter_statements=[
605                    parser.FilterStatement(
606                        syscall=parser.Syscall('read', 0),
607                        frequency=1,
608                        filters=[
609                            parser.Filter(None, bpf.Allow()),
610                        ]),
611                    parser.FilterStatement(
612                        syscall=parser.Syscall('write', 1),
613                        frequency=1,
614                        filters=[
615                            parser.Filter(None, bpf.Allow()),
616                        ]),
617                ]))
618
619    def test_parse_multiline(self):
620        """Allow simple multi-line policy files."""
621        path = self._write_file(
622            'test.policy', """
623            # Comment.
624            read: \
625                allow
626            write: allow
627        """)
628
629        self.assertEqual(
630            self.parser.parse_file(path),
631            parser.ParsedPolicy(
632                default_action=bpf.KillProcess(),
633                filter_statements=[
634                    parser.FilterStatement(
635                        syscall=parser.Syscall('read', 0),
636                        frequency=1,
637                        filters=[
638                            parser.Filter(None, bpf.Allow()),
639                        ]),
640                    parser.FilterStatement(
641                        syscall=parser.Syscall('write', 1),
642                        frequency=1,
643                        filters=[
644                            parser.Filter(None, bpf.Allow()),
645                        ]),
646                ]))
647
648    def test_parse_default(self):
649        """Allow defining a default action."""
650        path = self._write_file(
651            'test.policy', """
652            @default kill-thread
653            read: allow
654        """)
655
656        self.assertEqual(
657            self.parser.parse_file(path),
658            parser.ParsedPolicy(
659                default_action=bpf.KillThread(),
660                filter_statements=[
661                    parser.FilterStatement(
662                        syscall=parser.Syscall('read', 0),
663                        frequency=1,
664                        filters=[
665                            parser.Filter(None, bpf.Allow()),
666                        ]),
667                ]))
668
669    def test_parse_default_permissive(self):
670        """Reject defining a permissive default action."""
671        path = self._write_file(
672            'test.policy', """
673            @default log
674            read: allow
675        """)
676
677        with self.assertRaisesRegex(parser.ParseException,
678                                    r'invalid permissive default action'):
679            self.parser.parse_file(path)
680
681    def test_parse_simple_grouped(self):
682        """Allow simple policy files."""
683        path = self._write_file(
684            'test.policy', """
685            # Comment.
686            {read, write}: allow
687        """)
688
689        self.assertEqual(
690            self.parser.parse_file(path),
691            parser.ParsedPolicy(
692                default_action=bpf.KillProcess(),
693                filter_statements=[
694                    parser.FilterStatement(
695                        syscall=parser.Syscall('read', 0),
696                        frequency=1,
697                        filters=[
698                            parser.Filter(None, bpf.Allow()),
699                        ]),
700                    parser.FilterStatement(
701                        syscall=parser.Syscall('write', 1),
702                        frequency=1,
703                        filters=[
704                            parser.Filter(None, bpf.Allow()),
705                        ]),
706                ]))
707
708    def test_parse_other_arch(self):
709        """Allow entries that only target another architecture."""
710        path = self._write_file(
711            'test.policy', """
712            # Comment.
713            read[arch=nonexistent]: allow
714            write: allow
715        """)
716
717        self.assertEqual(
718            self.parser.parse_file(path),
719            parser.ParsedPolicy(
720                default_action=bpf.KillProcess(),
721                filter_statements=[
722                    parser.FilterStatement(
723                        syscall=parser.Syscall('write', 1),
724                        frequency=1,
725                        filters=[
726                            parser.Filter(None, bpf.Allow()),
727                        ]),
728                ]))
729
730    def test_parse_include(self):
731        """Allow including policy files."""
732        path = self._write_file(
733            'test.include.policy', """
734            {read, write}: arg0 == 0; allow
735        """)
736        path = self._write_file(
737            'test.policy', """
738            @include ./test.include.policy
739            read: return ENOSYS
740        """)
741
742        self.assertEqual(
743            self.parser.parse_file(path),
744            parser.ParsedPolicy(
745                default_action=bpf.KillProcess(),
746                filter_statements=[
747                    parser.FilterStatement(
748                        syscall=parser.Syscall('read', 0),
749                        frequency=1,
750                        filters=[
751                            parser.Filter([[parser.Atom(0, '==', 0)]],
752                                          bpf.Allow()),
753                            parser.Filter(
754                                None,
755                                bpf.ReturnErrno(
756                                    self.arch.constants['ENOSYS'])),
757                        ]),
758                    parser.FilterStatement(
759                        syscall=parser.Syscall('write', 1),
760                        frequency=1,
761                        filters=[
762                            parser.Filter([[parser.Atom(0, '==', 0)]],
763                                          bpf.Allow()),
764                            parser.Filter(None, bpf.KillProcess()),
765                        ]),
766                ]))
767
768    def test_parse_invalid_include(self):
769        """Reject including invalid policy files."""
770        with self.assertRaisesRegex(parser.ParseException,
771                                    r'empty include path'):
772            path = self._write_file(
773                'test.policy', """
774                @include
775            """)
776            self.parser.parse_file(path)
777
778        with self.assertRaisesRegex(parser.ParseException,
779                                    r'invalid include path'):
780            path = self._write_file(
781                'test.policy', """
782                @include arg0
783            """)
784            self.parser.parse_file(path)
785
786        with self.assertRaisesRegex(parser.ParseException,
787                                    r'@include statement nested too deep'):
788            path = self._write_file(
789                'test.policy', """
790                @include ./test.policy
791            """)
792            self.parser.parse_file(path)
793
794        with self.assertRaisesRegex(parser.ParseException,
795                                    r'Could not @include .*'):
796            path = self._write_file(
797                'test.policy', """
798                @include ./nonexistent.policy
799            """)
800            self.parser.parse_file(path)
801
802    def test_parse_frequency(self):
803        """Allow including frequency files."""
804        self._write_file(
805            'test.frequency', """
806            read: 2
807            write: 3
808        """)
809        path = self._write_file(
810            'test.policy', """
811            @frequency ./test.frequency
812            read: allow
813        """)
814
815        self.assertEqual(
816            self.parser.parse_file(path),
817            parser.ParsedPolicy(
818                default_action=bpf.KillProcess(),
819                filter_statements=[
820                    parser.FilterStatement(
821                        syscall=parser.Syscall('read', 0),
822                        frequency=2,
823                        filters=[
824                            parser.Filter(None, bpf.Allow()),
825                        ]),
826                ]))
827
828    def test_parse_invalid_frequency(self):
829        """Reject including invalid frequency files."""
830        path = self._write_file('test.policy',
831                                """@frequency ./test.frequency""")
832
833        with self.assertRaisesRegex(parser.ParseException, r'missing colon'):
834            self._write_file('test.frequency', """
835                read
836            """)
837            self.parser.parse_file(path)
838
839        with self.assertRaisesRegex(parser.ParseException, r'invalid colon'):
840            self._write_file('test.frequency', """
841                read foo
842            """)
843            self.parser.parse_file(path)
844
845        with self.assertRaisesRegex(parser.ParseException, r'missing number'):
846            self._write_file('test.frequency', """
847                read:
848            """)
849            self.parser.parse_file(path)
850
851        with self.assertRaisesRegex(parser.ParseException, r'invalid number'):
852            self._write_file('test.frequency', """
853                read: foo
854            """)
855            self.parser.parse_file(path)
856
857        with self.assertRaisesRegex(parser.ParseException, r'invalid number'):
858            self._write_file('test.frequency', """
859                read: -1
860            """)
861            self.parser.parse_file(path)
862
863        with self.assertRaisesRegex(parser.ParseException,
864                                    r'empty frequency path'):
865            path = self._write_file(
866                'test.policy', """
867                @frequency
868            """)
869            self.parser.parse_file(path)
870
871        with self.assertRaisesRegex(parser.ParseException,
872                                    r'invalid frequency path'):
873            path = self._write_file(
874                'test.policy', """
875                @frequency arg0
876            """)
877            self.parser.parse_file(path)
878
879        with self.assertRaisesRegex(parser.ParseException,
880                                    r'Could not open frequency file.*'):
881            path = self._write_file(
882                'test.policy', """
883                @frequency ./nonexistent.frequency
884            """)
885            self.parser.parse_file(path)
886
887    def test_parse_multiple_unconditional(self):
888        """Reject actions after an unconditional action."""
889        path = self._write_file(
890            'test.policy', """
891            read: allow
892            read: allow
893        """)
894
895        with self.assertRaisesRegex(
896                parser.ParseException,
897                (r'test.policy\(3:17\): '
898                 r'Syscall read.*already had an unconditional action '
899                 r'applied')):
900            self.parser.parse_file(path)
901
902        path = self._write_file(
903            'test.policy', """
904            read: log
905            read: arg0 == 0; log
906        """)
907
908        with self.assertRaisesRegex(
909                parser.ParseException,
910                (r'test.policy\(3:17\): '
911                 r'Syscall read.*already had an unconditional action '
912                 r'applied')):
913            self.parser.parse_file(path)
914
915    def test_parse_allowlist_denylist_header(self):
916        """Reject trying to compile denylist policy file as allowlist."""
917        with self.assertRaisesRegex(parser.ParseException,
918                                    r'policy is denylist, but flag --denylist '
919                                    'not passed in'):
920            path = self._write_file(
921                'test.policy', """
922                @denylist
923            """)
924            self.parser.parse_file(path)
925
926
927class ParseFileDenylistTests(unittest.TestCase):
928    """Tests for PolicyParser.parse_file."""
929
930    def setUp(self):
931        self.arch = ARCH_64
932        self.kill_action = bpf.KillProcess()
933        self.parser = parser.PolicyParser(
934            self.arch, kill_action=self.kill_action, denylist=True)
935        self.tempdir = tempfile.mkdtemp()
936
937    def tearDown(self):
938        shutil.rmtree(self.tempdir)
939
940    def _write_file(self, filename, contents):
941        """Helper to write out a file for testing."""
942        path = os.path.join(self.tempdir, filename)
943        with open(path, 'w') as outf:
944            outf.write(contents)
945        return path
946
947    def test_parse_simple(self):
948        """Allow simple denylist policy files."""
949        path = self._write_file(
950            'test.policy', """
951            # Comment.
952            @denylist
953            read: return ENOSYS
954            write: return ENOSYS
955        """)
956
957        self.assertEqual(
958            self.parser.parse_file(path),
959            parser.ParsedPolicy(
960                default_action=bpf.Allow(),
961                filter_statements=[
962                    parser.FilterStatement(
963                        syscall=parser.Syscall('read', 0),
964                        frequency=1,
965                        filters=[
966                            parser.Filter(None, bpf.ReturnErrno(
967                                    self.arch.constants['ENOSYS'])),
968                        ]),
969                    parser.FilterStatement(
970                        syscall=parser.Syscall('write', 1),
971                        frequency=1,
972                        filters=[
973                            parser.Filter(None, bpf.ReturnErrno(
974                                    self.arch.constants['ENOSYS'])),
975                        ]),
976                ]))
977
978    def test_parse_simple_with_arg(self):
979        """Allow simple denylist policy files."""
980        path = self._write_file(
981            'test.policy', """
982            # Comment.
983            @denylist
984            read: return ENOSYS
985            write: arg0 == 0 ; return ENOSYS
986        """)
987
988        self.assertEqual(
989            self.parser.parse_file(path),
990            parser.ParsedPolicy(
991                default_action=bpf.Allow(),
992                filter_statements=[
993                    parser.FilterStatement(
994                        syscall=parser.Syscall('read', 0),
995                        frequency=1,
996                        filters=[
997                            parser.Filter(None, bpf.ReturnErrno(
998                                    self.arch.constants['ENOSYS'])),
999                        ]),
1000                    parser.FilterStatement(
1001                        syscall=parser.Syscall('write', 1),
1002                        frequency=1,
1003                        filters=[
1004                            parser.Filter([[parser.Atom(0, '==', 0)]],
1005                                bpf.ReturnErrno(self.arch.constants['ENOSYS'])),
1006                            parser.Filter(None, bpf.Allow()),
1007                        ]),
1008                ]))
1009
1010
1011    def test_parse_denylist_no_header(self):
1012        """Reject trying to compile denylist policy file as allowlist."""
1013        with self.assertRaisesRegex(parser.ParseException,
1014                                    r'policy must contain @denylist flag to be '
1015                                    'compiled with --denylist flag'):
1016            path = self._write_file(
1017                'test.policy', """
1018                read: return ENOSYS
1019            """)
1020            self.parser.parse_file(path)
1021
1022if __name__ == '__main__':
1023    unittest.main()
1024