• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2018 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17"""Unittests for the parser module."""
18
19from __future__ import absolute_import
20from __future__ import division
21from __future__ import print_function
22
23import os
24import shutil
25import tempfile
26import unittest
27
28import arch
29import bpf
30import parser  # pylint: disable=wrong-import-order
31
32ARCH_64 = arch.Arch.load_from_json(
33    os.path.join(
34        os.path.dirname(os.path.abspath(__file__)), 'testdata/arch_64.json'))
35
36
37class TokenizerTests(unittest.TestCase):
38    """Tests for ParserState.tokenize."""
39
40    @staticmethod
41    def _tokenize(line):
42        parser_state = parser.ParserState('<memory>')
43        return list(parser_state.tokenize([line]))[0]
44
45    def test_tokenize(self):
46        """Accept valid tokens."""
47        self.assertEqual([
48            (token.type, token.value)
49            for token in TokenizerTests._tokenize('@include /minijail.policy')
50        ], [
51            ('INCLUDE', '@include'),
52            ('PATH', '/minijail.policy'),
53        ])
54        self.assertEqual([
55            (token.type, token.value)
56            for token in TokenizerTests._tokenize('@include ./minijail.policy')
57        ], [
58            ('INCLUDE', '@include'),
59            ('PATH', './minijail.policy'),
60        ])
61        self.assertEqual(
62            [(token.type, token.value) for token in TokenizerTests._tokenize(
63                'read: arg0 in ~0xffff || arg0 & (1|2) && arg0 == 0o755; '
64                'return ENOSYS # ignored')], [
65                    ('IDENTIFIER', 'read'),
66                    ('COLON', ':'),
67                    ('ARGUMENT', 'arg0'),
68                    ('OP', 'in'),
69                    ('BITWISE_COMPLEMENT', '~'),
70                    ('NUMERIC_CONSTANT', '0xffff'),
71                    ('OR', '||'),
72                    ('ARGUMENT', 'arg0'),
73                    ('OP', '&'),
74                    ('LPAREN', '('),
75                    ('NUMERIC_CONSTANT', '1'),
76                    ('BITWISE_OR', '|'),
77                    ('NUMERIC_CONSTANT', '2'),
78                    ('RPAREN', ')'),
79                    ('AND', '&&'),
80                    ('ARGUMENT', 'arg0'),
81                    ('OP', '=='),
82                    ('NUMERIC_CONSTANT', '0o755'),
83                    ('SEMICOLON', ';'),
84                    ('RETURN', 'return'),
85                    ('IDENTIFIER', 'ENOSYS'),
86                ])
87
88    def test_tokenize_invalid_token(self):
89        """Reject tokenizer errors."""
90        with self.assertRaisesRegex(parser.ParseException,
91                                    (r'<memory>\(1:1\): invalid token\n'
92                                     r'    %invalid-token%\n'
93                                     r'    \^')):
94            TokenizerTests._tokenize('%invalid-token%')
95
96
97class ParseConstantTests(unittest.TestCase):
98    """Tests for PolicyParser.parse_value."""
99
100    def setUp(self):
101        self.arch = ARCH_64
102        self.parser = parser.PolicyParser(
103            self.arch, kill_action=bpf.KillProcess())
104
105    def _tokenize(self, line):
106        # pylint: disable=protected-access
107        return list(self.parser._parser_state.tokenize([line]))[0]
108
109    def test_parse_constant_unsigned(self):
110        """Accept reasonably-sized unsigned constants."""
111        self.assertEqual(
112            self.parser.parse_value(self._tokenize('0x80000000')), 0x80000000)
113        if self.arch.bits == 64:
114            self.assertEqual(
115                self.parser.parse_value(self._tokenize('0x8000000000000000')),
116                0x8000000000000000)
117
118    def test_parse_constant_unsigned_too_big(self):
119        """Reject unreasonably-sized unsigned constants."""
120        if self.arch.bits == 32:
121            with self.assertRaisesRegex(parser.ParseException,
122                                        'unsigned overflow'):
123                self.parser.parse_value(self._tokenize('0x100000000'))
124        with self.assertRaisesRegex(parser.ParseException,
125                                    'unsigned overflow'):
126            self.parser.parse_value(self._tokenize('0x10000000000000000'))
127
128    def test_parse_constant_signed(self):
129        """Accept reasonably-sized signed constants."""
130        self.assertEqual(
131            self.parser.parse_value(self._tokenize('-1')),
132            self.arch.max_unsigned)
133
134    def test_parse_constant_signed_too_negative(self):
135        """Reject unreasonably-sized signed constants."""
136        if self.arch.bits == 32:
137            with self.assertRaisesRegex(parser.ParseException,
138                                        'signed underflow'):
139                self.parser.parse_value(self._tokenize('-0x800000001'))
140        with self.assertRaisesRegex(parser.ParseException, 'signed underflow'):
141            self.parser.parse_value(self._tokenize('-0x8000000000000001'))
142
143    def test_parse_mask(self):
144        """Accept parsing a mask value."""
145        self.assertEqual(
146            self.parser.parse_value(self._tokenize('0x1|0x2|0x4|0x8')), 0xf)
147
148    def test_parse_parenthesized_expressions(self):
149        """Accept parsing parenthesized expressions."""
150        bad_expressions = [
151            '(1',
152            '|(1)',
153            '(1)|',
154            '()',
155            '(',
156            '((',
157            '(()',
158            '(()1',
159        ]
160        for expression in bad_expressions:
161            with self.assertRaises(parser.ParseException, msg=expression):
162                self.parser.parse_value(self._tokenize(expression))
163
164        bad_partial_expressions = [
165            '1)',
166            '(1)1',
167            '1(0)',
168        ]
169        for expression in bad_partial_expressions:
170            tokens = self._tokenize(expression)
171            self.parser.parse_value(tokens)
172            self.assertNotEqual(tokens, [])
173
174        good_expressions = [
175            '(3)',
176            '(1)|2',
177            '1|(2)',
178            '(1)|(2)',
179            '((3))',
180            '0|(1|2)',
181            '(0|1|2)',
182        ]
183        for expression in good_expressions:
184            self.assertEqual(
185                self.parser.parse_value(self._tokenize(expression)), 3)
186
187    def test_parse_constant_complements(self):
188        """Accept complementing constants."""
189        self.assertEqual(
190            self.parser.parse_value(self._tokenize('~0')),
191            self.arch.max_unsigned)
192        self.assertEqual(
193            self.parser.parse_value(self._tokenize('~0|~0')),
194            self.arch.max_unsigned)
195        if self.arch.bits == 32:
196            self.assertEqual(
197                self.parser.parse_value(
198                    self._tokenize('~0x005AF0FF|~0xFFA50FFF')), 0xFFFFFF00)
199            self.assertEqual(
200                self.parser.parse_value(
201                    self._tokenize('0x0F|~(0x005AF000|0x00A50FFF)|0xF0')),
202                0xFF0000FF)
203        else:
204            self.assertEqual(
205                self.parser.parse_value(
206                    self._tokenize('~0x00005A5AF0F0FFFF|~0xFFFFA5A50F0FFFFF')),
207                0xFFFFFFFFFFFF0000)
208            self.assertEqual(
209                self.parser.parse_value(
210                    self._tokenize(
211                        '0x00FF|~(0x00005A5AF0F00000|0x0000A5A50F0FFFFF)|0xFF00'
212                    )), 0xFFFF00000000FFFF)
213
214    def test_parse_double_complement(self):
215        """Reject double-complementing constants."""
216        with self.assertRaisesRegex(parser.ParseException,
217                                    'double complement'):
218            self.parser.parse_value(self._tokenize('~~0'))
219
220    def test_parse_empty_complement(self):
221        """Reject complementing nothing."""
222        with self.assertRaisesRegex(parser.ParseException, 'empty complement'):
223            self.parser.parse_value(self._tokenize('0|~'))
224
225    def test_parse_named_constant(self):
226        """Accept parsing a named constant."""
227        self.assertEqual(
228            self.parser.parse_value(self._tokenize('O_RDONLY')), 0)
229
230    def test_parse_empty_constant(self):
231        """Reject parsing nothing."""
232        with self.assertRaisesRegex(parser.ParseException, 'empty constant'):
233            self.parser.parse_value([])
234        with self.assertRaisesRegex(parser.ParseException, 'empty constant'):
235            self.parser.parse_value(self._tokenize('0|'))
236
237    def test_parse_invalid_constant(self):
238        """Reject parsing invalid constants."""
239        with self.assertRaisesRegex(parser.ParseException, 'invalid constant'):
240            self.parser.parse_value(self._tokenize('foo'))
241
242
243class ParseFilterExpressionTests(unittest.TestCase):
244    """Tests for PolicyParser.parse_argument_expression."""
245
246    def setUp(self):
247        self.arch = ARCH_64
248        self.parser = parser.PolicyParser(
249            self.arch, kill_action=bpf.KillProcess())
250
251    def _tokenize(self, line):
252        # pylint: disable=protected-access
253        return list(self.parser._parser_state.tokenize([line]))[0]
254
255    def test_parse_argument_expression(self):
256        """Accept valid argument expressions."""
257        self.assertEqual(
258            self.parser.parse_argument_expression(
259                self._tokenize(
260                    'arg0 in 0xffff || arg0 == PROT_EXEC && arg1 == PROT_WRITE'
261                )), [
262                    [parser.Atom(0, 'in', 0xffff)],
263                    [parser.Atom(0, '==', 4),
264                     parser.Atom(1, '==', 2)],
265                ])
266
267    def test_parse_empty_argument_expression(self):
268        """Reject empty argument expressions."""
269        with self.assertRaisesRegex(parser.ParseException,
270                                    'empty argument expression'):
271            self.parser.parse_argument_expression(
272                self._tokenize('arg0 in 0xffff ||'))
273
274    def test_parse_empty_clause(self):
275        """Reject empty clause."""
276        with self.assertRaisesRegex(parser.ParseException, 'empty clause'):
277            self.parser.parse_argument_expression(
278                self._tokenize('arg0 in 0xffff &&'))
279
280    def test_parse_invalid_argument(self):
281        """Reject invalid argument."""
282        with self.assertRaisesRegex(parser.ParseException, 'invalid argument'):
283            self.parser.parse_argument_expression(
284                self._tokenize('argX in 0xffff'))
285
286    def test_parse_invalid_operator(self):
287        """Reject invalid operator."""
288        with self.assertRaisesRegex(parser.ParseException, 'invalid operator'):
289            self.parser.parse_argument_expression(
290                self._tokenize('arg0 = 0xffff'))
291
292    def test_parse_missing_operator(self):
293        """Reject missing operator."""
294        with self.assertRaisesRegex(parser.ParseException, 'missing operator'):
295            self.parser.parse_argument_expression(self._tokenize('arg0'))
296
297    def test_parse_missing_operand(self):
298        """Reject missing operand."""
299        with self.assertRaisesRegex(parser.ParseException, 'empty constant'):
300            self.parser.parse_argument_expression(self._tokenize('arg0 =='))
301
302
303class ParseFilterTests(unittest.TestCase):
304    """Tests for PolicyParser.parse_filter."""
305
306    def setUp(self):
307        self.arch = ARCH_64
308        self.parser = parser.PolicyParser(
309            self.arch, kill_action=bpf.KillProcess())
310
311    def _tokenize(self, line):
312        # pylint: disable=protected-access
313        return list(self.parser._parser_state.tokenize([line]))[0]
314
315    def test_parse_filter(self):
316        """Accept valid filters."""
317        self.assertEqual(
318            self.parser.parse_filter(self._tokenize('arg0 == 0')), [
319                parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
320            ])
321        self.assertEqual(
322            self.parser.parse_filter(self._tokenize('kill-process')), [
323                parser.Filter(None, bpf.KillProcess()),
324            ])
325        self.assertEqual(
326            self.parser.parse_filter(self._tokenize('kill-thread')), [
327                parser.Filter(None, bpf.KillThread()),
328            ])
329        self.assertEqual(
330            self.parser.parse_filter(self._tokenize('trap')), [
331                parser.Filter(None, bpf.Trap()),
332            ])
333        self.assertEqual(
334            self.parser.parse_filter(self._tokenize('return ENOSYS')), [
335                parser.Filter(None,
336                              bpf.ReturnErrno(self.arch.constants['ENOSYS'])),
337            ])
338        self.assertEqual(
339            self.parser.parse_filter(self._tokenize('trace')), [
340                parser.Filter(None, bpf.Trace()),
341            ])
342        self.assertEqual(
343            self.parser.parse_filter(self._tokenize('log')), [
344                parser.Filter(None, bpf.Log()),
345            ])
346        self.assertEqual(
347            self.parser.parse_filter(self._tokenize('allow')), [
348                parser.Filter(None, bpf.Allow()),
349            ])
350        self.assertEqual(
351            self.parser.parse_filter(self._tokenize('1')), [
352                parser.Filter(None, bpf.Allow()),
353            ])
354        self.assertEqual(
355            self.parser.parse_filter(
356                self._tokenize(
357                    '{ arg0 == 0, arg0 == 1; return ENOSYS, trap }')),
358            [
359                parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
360                parser.Filter([[parser.Atom(0, '==', 1)]],
361                              bpf.ReturnErrno(self.arch.constants['ENOSYS'])),
362                parser.Filter(None, bpf.Trap()),
363            ])
364
365    def test_parse_missing_return_value(self):
366        """Reject missing return value."""
367        with self.assertRaisesRegex(parser.ParseException,
368                                    'missing return value'):
369            self.parser.parse_filter(self._tokenize('return'))
370
371    def test_parse_invalid_return_value(self):
372        """Reject invalid return value."""
373        with self.assertRaisesRegex(parser.ParseException, 'invalid constant'):
374            self.parser.parse_filter(self._tokenize('return arg0'))
375
376    def test_parse_unclosed_brace(self):
377        """Reject unclosed brace."""
378        with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'):
379            self.parser.parse_filter(self._tokenize('{ allow'))
380
381
382class ParseFilterStatementTests(unittest.TestCase):
383    """Tests for PolicyParser.parse_filter_statement."""
384
385    def setUp(self):
386        self.arch = ARCH_64
387        self.parser = parser.PolicyParser(
388            self.arch, kill_action=bpf.KillProcess())
389
390    def _tokenize(self, line):
391        # pylint: disable=protected-access
392        return list(self.parser._parser_state.tokenize([line]))[0]
393
394    def test_parse_filter_statement(self):
395        """Accept valid filter statements."""
396        self.assertEqual(
397            self.parser.parse_filter_statement(
398                self._tokenize('read: arg0 == 0')),
399            parser.ParsedFilterStatement((parser.Syscall('read', 0), ), [
400                parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
401            ]))
402        self.assertEqual(
403            self.parser.parse_filter_statement(
404                self._tokenize('{read, write}: arg0 == 0')),
405            parser.ParsedFilterStatement((
406                parser.Syscall('read', 0),
407                parser.Syscall('write', 1),
408            ), [
409                parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
410            ]))
411        self.assertEqual(
412            self.parser.parse_filter_statement(
413                self._tokenize('io@libc: arg0 == 0')),
414            parser.ParsedFilterStatement((
415                parser.Syscall('read', 0),
416                parser.Syscall('write', 1),
417            ), [
418                parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
419            ]))
420        self.assertEqual(
421            self.parser.parse_filter_statement(
422                self._tokenize('file-io@systemd: arg0 == 0')),
423            parser.ParsedFilterStatement((
424                parser.Syscall('read', 0),
425                parser.Syscall('write', 1),
426            ), [
427                parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
428            ]))
429
430    def test_parse_metadata(self):
431        """Accept valid filter statements with metadata."""
432        self.assertEqual(
433            self.parser.parse_filter_statement(
434                self._tokenize('read[arch=test]: arg0 == 0')),
435            parser.ParsedFilterStatement((parser.Syscall('read', 0), ), [
436                parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
437            ]))
438        self.assertEqual(
439            self.parser.parse_filter_statement(
440                self._tokenize(
441                    '{read, nonexistent[arch=nonexistent]}: arg0 == 0')),
442            parser.ParsedFilterStatement((parser.Syscall('read', 0), ), [
443                parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
444            ]))
445
446    def test_parse_unclosed_brace(self):
447        """Reject unclosed brace."""
448        with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'):
449            self.parser.parse_filter(self._tokenize('{ allow'))
450
451    def test_parse_invalid_syscall_group(self):
452        """Reject invalid syscall groups."""
453        with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'):
454            self.parser.parse_filter_statement(
455                self._tokenize('{ read, write: arg0 == 0'))
456
457    def test_parse_missing_colon(self):
458        """Reject missing colon."""
459        with self.assertRaisesRegex(parser.ParseException, 'missing colon'):
460            self.parser.parse_filter_statement(self._tokenize('read'))
461
462    def test_parse_invalid_colon(self):
463        """Reject invalid colon."""
464        with self.assertRaisesRegex(parser.ParseException, 'invalid colon'):
465            self.parser.parse_filter_statement(self._tokenize('read arg0'))
466
467    def test_parse_missing_filter(self):
468        """Reject missing filter."""
469        with self.assertRaisesRegex(parser.ParseException, 'missing filter'):
470            self.parser.parse_filter_statement(self._tokenize('read:'))
471
472
473class ParseFileTests(unittest.TestCase):
474    """Tests for PolicyParser.parse_file."""
475
476    def setUp(self):
477        self.arch = ARCH_64
478        self.parser = parser.PolicyParser(
479            self.arch, kill_action=bpf.KillProcess())
480        self.tempdir = tempfile.mkdtemp()
481
482    def tearDown(self):
483        shutil.rmtree(self.tempdir)
484
485    def _write_file(self, filename, contents):
486        """Helper to write out a file for testing."""
487        path = os.path.join(self.tempdir, filename)
488        with open(path, 'w') as outf:
489            outf.write(contents)
490        return path
491
492    def test_parse_simple(self):
493        """Allow simple policy files."""
494        path = self._write_file(
495            'test.policy', """
496            # Comment.
497            read: allow
498            write: allow
499        """)
500
501        self.assertEqual(
502            self.parser.parse_file(path),
503            parser.ParsedPolicy(
504                default_action=bpf.KillProcess(),
505                filter_statements=[
506                    parser.FilterStatement(
507                        syscall=parser.Syscall('read', 0),
508                        frequency=1,
509                        filters=[
510                            parser.Filter(None, bpf.Allow()),
511                        ]),
512                    parser.FilterStatement(
513                        syscall=parser.Syscall('write', 1),
514                        frequency=1,
515                        filters=[
516                            parser.Filter(None, bpf.Allow()),
517                        ]),
518                ]))
519
520    def test_parse_multiline(self):
521        """Allow simple multi-line policy files."""
522        path = self._write_file(
523            'test.policy', """
524            # Comment.
525            read: \
526                allow
527            write: allow
528        """)
529
530        self.assertEqual(
531            self.parser.parse_file(path),
532            parser.ParsedPolicy(
533                default_action=bpf.KillProcess(),
534                filter_statements=[
535                    parser.FilterStatement(
536                        syscall=parser.Syscall('read', 0),
537                        frequency=1,
538                        filters=[
539                            parser.Filter(None, bpf.Allow()),
540                        ]),
541                    parser.FilterStatement(
542                        syscall=parser.Syscall('write', 1),
543                        frequency=1,
544                        filters=[
545                            parser.Filter(None, bpf.Allow()),
546                        ]),
547                ]))
548
549    def test_parse_default(self):
550        """Allow defining a default action."""
551        path = self._write_file(
552            'test.policy', """
553            @default kill-thread
554            read: allow
555        """)
556
557        self.assertEqual(
558            self.parser.parse_file(path),
559            parser.ParsedPolicy(
560                default_action=bpf.KillThread(),
561                filter_statements=[
562                    parser.FilterStatement(
563                        syscall=parser.Syscall('read', 0),
564                        frequency=1,
565                        filters=[
566                            parser.Filter(None, bpf.Allow()),
567                        ]),
568                ]))
569
570    def test_parse_default_permissive(self):
571        """Reject defining a permissive default action."""
572        path = self._write_file(
573            'test.policy', """
574            @default log
575            read: allow
576        """)
577
578        with self.assertRaisesRegex(parser.ParseException,
579                                    r'invalid permissive default action'):
580            self.parser.parse_file(path)
581
582    def test_parse_simple_grouped(self):
583        """Allow simple policy files."""
584        path = self._write_file(
585            'test.policy', """
586            # Comment.
587            {read, write}: allow
588        """)
589
590        self.assertEqual(
591            self.parser.parse_file(path),
592            parser.ParsedPolicy(
593                default_action=bpf.KillProcess(),
594                filter_statements=[
595                    parser.FilterStatement(
596                        syscall=parser.Syscall('read', 0),
597                        frequency=1,
598                        filters=[
599                            parser.Filter(None, bpf.Allow()),
600                        ]),
601                    parser.FilterStatement(
602                        syscall=parser.Syscall('write', 1),
603                        frequency=1,
604                        filters=[
605                            parser.Filter(None, bpf.Allow()),
606                        ]),
607                ]))
608
609    def test_parse_other_arch(self):
610        """Allow entries that only target another architecture."""
611        path = self._write_file(
612            'test.policy', """
613            # Comment.
614            read[arch=nonexistent]: allow
615            write: allow
616        """)
617
618        self.assertEqual(
619            self.parser.parse_file(path),
620            parser.ParsedPolicy(
621                default_action=bpf.KillProcess(),
622                filter_statements=[
623                    parser.FilterStatement(
624                        syscall=parser.Syscall('write', 1),
625                        frequency=1,
626                        filters=[
627                            parser.Filter(None, bpf.Allow()),
628                        ]),
629                ]))
630
631    def test_parse_include(self):
632        """Allow including policy files."""
633        path = self._write_file(
634            'test.include.policy', """
635            {read, write}: arg0 == 0; allow
636        """)
637        path = self._write_file(
638            'test.policy', """
639            @include ./test.include.policy
640            read: return ENOSYS
641        """)
642
643        self.assertEqual(
644            self.parser.parse_file(path),
645            parser.ParsedPolicy(
646                default_action=bpf.KillProcess(),
647                filter_statements=[
648                    parser.FilterStatement(
649                        syscall=parser.Syscall('read', 0),
650                        frequency=1,
651                        filters=[
652                            parser.Filter([[parser.Atom(0, '==', 0)]],
653                                          bpf.Allow()),
654                            parser.Filter(
655                                None,
656                                bpf.ReturnErrno(
657                                    self.arch.constants['ENOSYS'])),
658                        ]),
659                    parser.FilterStatement(
660                        syscall=parser.Syscall('write', 1),
661                        frequency=1,
662                        filters=[
663                            parser.Filter([[parser.Atom(0, '==', 0)]],
664                                          bpf.Allow()),
665                            parser.Filter(None, bpf.KillProcess()),
666                        ]),
667                ]))
668
669    def test_parse_invalid_include(self):
670        """Reject including invalid policy files."""
671        with self.assertRaisesRegex(parser.ParseException,
672                                    r'empty include path'):
673            path = self._write_file(
674                'test.policy', """
675                @include
676            """)
677            self.parser.parse_file(path)
678
679        with self.assertRaisesRegex(parser.ParseException,
680                                    r'invalid include path'):
681            path = self._write_file(
682                'test.policy', """
683                @include arg0
684            """)
685            self.parser.parse_file(path)
686
687        with self.assertRaisesRegex(parser.ParseException,
688                                    r'@include statement nested too deep'):
689            path = self._write_file(
690                'test.policy', """
691                @include ./test.policy
692            """)
693            self.parser.parse_file(path)
694
695        with self.assertRaisesRegex(parser.ParseException,
696                                    r'Could not @include .*'):
697            path = self._write_file(
698                'test.policy', """
699                @include ./nonexistent.policy
700            """)
701            self.parser.parse_file(path)
702
703    def test_parse_frequency(self):
704        """Allow including frequency files."""
705        self._write_file(
706            'test.frequency', """
707            read: 2
708            write: 3
709        """)
710        path = self._write_file(
711            'test.policy', """
712            @frequency ./test.frequency
713            read: allow
714        """)
715
716        self.assertEqual(
717            self.parser.parse_file(path),
718            parser.ParsedPolicy(
719                default_action=bpf.KillProcess(),
720                filter_statements=[
721                    parser.FilterStatement(
722                        syscall=parser.Syscall('read', 0),
723                        frequency=2,
724                        filters=[
725                            parser.Filter(None, bpf.Allow()),
726                        ]),
727                ]))
728
729    def test_parse_invalid_frequency(self):
730        """Reject including invalid frequency files."""
731        path = self._write_file('test.policy',
732                                """@frequency ./test.frequency""")
733
734        with self.assertRaisesRegex(parser.ParseException, r'missing colon'):
735            self._write_file('test.frequency', """
736                read
737            """)
738            self.parser.parse_file(path)
739
740        with self.assertRaisesRegex(parser.ParseException, r'invalid colon'):
741            self._write_file('test.frequency', """
742                read foo
743            """)
744            self.parser.parse_file(path)
745
746        with self.assertRaisesRegex(parser.ParseException, r'missing number'):
747            self._write_file('test.frequency', """
748                read:
749            """)
750            self.parser.parse_file(path)
751
752        with self.assertRaisesRegex(parser.ParseException, r'invalid number'):
753            self._write_file('test.frequency', """
754                read: foo
755            """)
756            self.parser.parse_file(path)
757
758        with self.assertRaisesRegex(parser.ParseException, r'invalid number'):
759            self._write_file('test.frequency', """
760                read: -1
761            """)
762            self.parser.parse_file(path)
763
764        with self.assertRaisesRegex(parser.ParseException,
765                                    r'empty frequency path'):
766            path = self._write_file(
767                'test.policy', """
768                @frequency
769            """)
770            self.parser.parse_file(path)
771
772        with self.assertRaisesRegex(parser.ParseException,
773                                    r'invalid frequency path'):
774            path = self._write_file(
775                'test.policy', """
776                @frequency arg0
777            """)
778            self.parser.parse_file(path)
779
780        with self.assertRaisesRegex(parser.ParseException,
781                                    r'Could not open frequency file.*'):
782            path = self._write_file(
783                'test.policy', """
784                @frequency ./nonexistent.frequency
785            """)
786            self.parser.parse_file(path)
787
788    def test_parse_multiple_unconditional(self):
789        """Reject actions after an unconditional action."""
790        path = self._write_file(
791            'test.policy', """
792            read: allow
793            read: allow
794        """)
795
796        with self.assertRaisesRegex(
797                parser.ParseException,
798                r'Syscall read.*already had an unconditional action applied'):
799            self.parser.parse_file(path)
800
801        path = self._write_file(
802            'test.policy', """
803            read: log
804            read: arg0 == 0; log
805        """)
806
807        with self.assertRaisesRegex(
808                parser.ParseException,
809                r'Syscall read.*already had an unconditional action applied'):
810            self.parser.parse_file(path)
811
812
813if __name__ == '__main__':
814    unittest.main()
815