• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2018 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17"""Unittests for the parser module."""
18
19from __future__ import absolute_import
20from __future__ import division
21from __future__ import print_function
22
23import os
24import shutil
25import tempfile
26import unittest
27
28import arch
29import bpf
30import parser  # pylint: disable=wrong-import-order
31
32ARCH_64 = arch.Arch.load_from_json(
33    os.path.join(
34        os.path.dirname(os.path.abspath(__file__)), 'testdata/arch_64.json'))
35
36
37class TokenizerTests(unittest.TestCase):
38    """Tests for ParserState.tokenize."""
39
40    @staticmethod
41    def _tokenize(line):
42        parser_state = parser.ParserState('<memory>')
43        return list(parser_state.tokenize([line]))[0]
44
45    def test_tokenize(self):
46        """Accept valid tokens."""
47        self.assertEqual([
48            (token.type, token.value)
49            for token in TokenizerTests._tokenize('@include /minijail.policy')
50        ], [
51            ('INCLUDE', '@include'),
52            ('PATH', '/minijail.policy'),
53        ])
54        self.assertEqual([
55            (token.type, token.value)
56            for token in TokenizerTests._tokenize('@include ./minijail.policy')
57        ], [
58            ('INCLUDE', '@include'),
59            ('PATH', './minijail.policy'),
60        ])
61        self.assertEqual(
62            [(token.type, token.value) for token in TokenizerTests._tokenize(
63                'read: arg0 in ~0xffff || arg0 & (1|2) && arg0 == 0o755; '
64                'return ENOSYS # ignored')], [
65                    ('IDENTIFIER', 'read'),
66                    ('COLON', ':'),
67                    ('ARGUMENT', 'arg0'),
68                    ('OP', 'in'),
69                    ('BITWISE_COMPLEMENT', '~'),
70                    ('NUMERIC_CONSTANT', '0xffff'),
71                    ('OR', '||'),
72                    ('ARGUMENT', 'arg0'),
73                    ('OP', '&'),
74                    ('LPAREN', '('),
75                    ('NUMERIC_CONSTANT', '1'),
76                    ('BITWISE_OR', '|'),
77                    ('NUMERIC_CONSTANT', '2'),
78                    ('RPAREN', ')'),
79                    ('AND', '&&'),
80                    ('ARGUMENT', 'arg0'),
81                    ('OP', '=='),
82                    ('NUMERIC_CONSTANT', '0o755'),
83                    ('SEMICOLON', ';'),
84                    ('RETURN', 'return'),
85                    ('IDENTIFIER', 'ENOSYS'),
86                ])
87        # Ensure that tokens that have an otherwise valid token as prefix are
88        # still matched correctly.
89        self.assertEqual([
90            (token.type, token.value)
91            for token in TokenizerTests._tokenize(
92                'inotify_wait return_sys killall trace_sys')
93        ], [
94            ('IDENTIFIER', 'inotify_wait'),
95            ('IDENTIFIER', 'return_sys'),
96            ('IDENTIFIER', 'killall'),
97            ('IDENTIFIER', 'trace_sys'),
98        ])
99
100    def test_tokenize_invalid_token(self):
101        """Reject tokenizer errors."""
102        with self.assertRaisesRegex(parser.ParseException,
103                                    (r'<memory>\(1:1\): invalid token\n'
104                                     r'    %invalid-token%\n'
105                                     r'    \^')):
106            TokenizerTests._tokenize('%invalid-token%')
107
108
109class ParseConstantTests(unittest.TestCase):
110    """Tests for PolicyParser.parse_value."""
111
112    def setUp(self):
113        self.arch = ARCH_64
114        self.parser = parser.PolicyParser(
115            self.arch, kill_action=bpf.KillProcess())
116
117    def _tokenize(self, line):
118        # pylint: disable=protected-access
119        return list(self.parser._parser_state.tokenize([line]))[0]
120
121    def test_parse_constant_unsigned(self):
122        """Accept reasonably-sized unsigned constants."""
123        self.assertEqual(
124            self.parser.parse_value(self._tokenize('0x80000000')), 0x80000000)
125        if self.arch.bits == 64:
126            self.assertEqual(
127                self.parser.parse_value(self._tokenize('0x8000000000000000')),
128                0x8000000000000000)
129
130    def test_parse_constant_unsigned_too_big(self):
131        """Reject unreasonably-sized unsigned constants."""
132        if self.arch.bits == 32:
133            with self.assertRaisesRegex(parser.ParseException,
134                                        'unsigned overflow'):
135                self.parser.parse_value(self._tokenize('0x100000000'))
136        with self.assertRaisesRegex(parser.ParseException,
137                                    'unsigned overflow'):
138            self.parser.parse_value(self._tokenize('0x10000000000000000'))
139
140    def test_parse_constant_signed(self):
141        """Accept reasonably-sized signed constants."""
142        self.assertEqual(
143            self.parser.parse_value(self._tokenize('-1')),
144            self.arch.max_unsigned)
145
146    def test_parse_constant_signed_too_negative(self):
147        """Reject unreasonably-sized signed constants."""
148        if self.arch.bits == 32:
149            with self.assertRaisesRegex(parser.ParseException,
150                                        'signed underflow'):
151                self.parser.parse_value(self._tokenize('-0x800000001'))
152        with self.assertRaisesRegex(parser.ParseException, 'signed underflow'):
153            self.parser.parse_value(self._tokenize('-0x8000000000000001'))
154
155    def test_parse_mask(self):
156        """Accept parsing a mask value."""
157        self.assertEqual(
158            self.parser.parse_value(self._tokenize('0x1|0x2|0x4|0x8')), 0xf)
159
160    def test_parse_parenthesized_expressions(self):
161        """Accept parsing parenthesized expressions."""
162        bad_expressions = [
163            '(1',
164            '|(1)',
165            '(1)|',
166            '()',
167            '(',
168            '((',
169            '(()',
170            '(()1',
171        ]
172        for expression in bad_expressions:
173            with self.assertRaises(parser.ParseException, msg=expression):
174                self.parser.parse_value(self._tokenize(expression))
175
176        bad_partial_expressions = [
177            '1)',
178            '(1)1',
179            '1(0)',
180        ]
181        for expression in bad_partial_expressions:
182            tokens = self._tokenize(expression)
183            self.parser.parse_value(tokens)
184            self.assertNotEqual(tokens, [])
185
186        good_expressions = [
187            '(3)',
188            '(1)|2',
189            '1|(2)',
190            '(1)|(2)',
191            '((3))',
192            '0|(1|2)',
193            '(0|1|2)',
194        ]
195        for expression in good_expressions:
196            self.assertEqual(
197                self.parser.parse_value(self._tokenize(expression)), 3)
198
199    def test_parse_constant_complements(self):
200        """Accept complementing constants."""
201        self.assertEqual(
202            self.parser.parse_value(self._tokenize('~0')),
203            self.arch.max_unsigned)
204        self.assertEqual(
205            self.parser.parse_value(self._tokenize('~0|~0')),
206            self.arch.max_unsigned)
207        if self.arch.bits == 32:
208            self.assertEqual(
209                self.parser.parse_value(
210                    self._tokenize('~0x005AF0FF|~0xFFA50FFF')), 0xFFFFFF00)
211            self.assertEqual(
212                self.parser.parse_value(
213                    self._tokenize('0x0F|~(0x005AF000|0x00A50FFF)|0xF0')),
214                0xFF0000FF)
215        else:
216            self.assertEqual(
217                self.parser.parse_value(
218                    self._tokenize('~0x00005A5AF0F0FFFF|~0xFFFFA5A50F0FFFFF')),
219                0xFFFFFFFFFFFF0000)
220            self.assertEqual(
221                self.parser.parse_value(
222                    self._tokenize(
223                        '0x00FF|~(0x00005A5AF0F00000|0x0000A5A50F0FFFFF)|0xFF00'
224                    )), 0xFFFF00000000FFFF)
225
226    def test_parse_double_complement(self):
227        """Reject double-complementing constants."""
228        with self.assertRaisesRegex(parser.ParseException,
229                                    'double complement'):
230            self.parser.parse_value(self._tokenize('~~0'))
231
232    def test_parse_empty_complement(self):
233        """Reject complementing nothing."""
234        with self.assertRaisesRegex(parser.ParseException, 'empty complement'):
235            self.parser.parse_value(self._tokenize('0|~'))
236
237    def test_parse_named_constant(self):
238        """Accept parsing a named constant."""
239        self.assertEqual(
240            self.parser.parse_value(self._tokenize('O_RDONLY')), 0)
241
242    def test_parse_empty_constant(self):
243        """Reject parsing nothing."""
244        with self.assertRaisesRegex(parser.ParseException, 'empty constant'):
245            self.parser.parse_value([])
246        with self.assertRaisesRegex(parser.ParseException, 'empty constant'):
247            self.parser.parse_value(self._tokenize('0|'))
248
249    def test_parse_invalid_constant(self):
250        """Reject parsing invalid constants."""
251        with self.assertRaisesRegex(parser.ParseException, 'invalid constant'):
252            self.parser.parse_value(self._tokenize('foo'))
253
254
255class ParseFilterExpressionTests(unittest.TestCase):
256    """Tests for PolicyParser.parse_argument_expression."""
257
258    def setUp(self):
259        self.arch = ARCH_64
260        self.parser = parser.PolicyParser(
261            self.arch, kill_action=bpf.KillProcess())
262
263    def _tokenize(self, line):
264        # pylint: disable=protected-access
265        return list(self.parser._parser_state.tokenize([line]))[0]
266
267    def test_parse_argument_expression(self):
268        """Accept valid argument expressions."""
269        self.assertEqual(
270            self.parser.parse_argument_expression(
271                self._tokenize(
272                    'arg0 in 0xffff || arg0 == PROT_EXEC && arg1 == PROT_WRITE'
273                )), [
274                    [parser.Atom(0, 'in', 0xffff)],
275                    [parser.Atom(0, '==', 4),
276                     parser.Atom(1, '==', 2)],
277                ])
278
279    def test_parse_empty_argument_expression(self):
280        """Reject empty argument expressions."""
281        with self.assertRaisesRegex(parser.ParseException,
282                                    'empty argument expression'):
283            self.parser.parse_argument_expression(
284                self._tokenize('arg0 in 0xffff ||'))
285
286    def test_parse_empty_clause(self):
287        """Reject empty clause."""
288        with self.assertRaisesRegex(parser.ParseException, 'empty clause'):
289            self.parser.parse_argument_expression(
290                self._tokenize('arg0 in 0xffff &&'))
291
292    def test_parse_invalid_argument(self):
293        """Reject invalid argument."""
294        with self.assertRaisesRegex(parser.ParseException, 'invalid argument'):
295            self.parser.parse_argument_expression(
296                self._tokenize('argX in 0xffff'))
297
298    def test_parse_invalid_operator(self):
299        """Reject invalid operator."""
300        with self.assertRaisesRegex(parser.ParseException, 'invalid operator'):
301            self.parser.parse_argument_expression(
302                self._tokenize('arg0 = 0xffff'))
303
304    def test_parse_missing_operator(self):
305        """Reject missing operator."""
306        with self.assertRaisesRegex(parser.ParseException, 'missing operator'):
307            self.parser.parse_argument_expression(self._tokenize('arg0'))
308
309    def test_parse_missing_operand(self):
310        """Reject missing operand."""
311        with self.assertRaisesRegex(parser.ParseException, 'empty constant'):
312            self.parser.parse_argument_expression(self._tokenize('arg0 =='))
313
314
315class ParseFilterTests(unittest.TestCase):
316    """Tests for PolicyParser.parse_filter."""
317
318    def setUp(self):
319        self.arch = ARCH_64
320        self.parser = parser.PolicyParser(
321            self.arch, kill_action=bpf.KillProcess())
322
323    def _tokenize(self, line):
324        # pylint: disable=protected-access
325        return list(self.parser._parser_state.tokenize([line]))[0]
326
327    def test_parse_filter(self):
328        """Accept valid filters."""
329        self.assertEqual(
330            self.parser.parse_filter(self._tokenize('arg0 == 0')), [
331                parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
332            ])
333        self.assertEqual(
334            self.parser.parse_filter(self._tokenize('kill-process')), [
335                parser.Filter(None, bpf.KillProcess()),
336            ])
337        self.assertEqual(
338            self.parser.parse_filter(self._tokenize('kill-thread')), [
339                parser.Filter(None, bpf.KillThread()),
340            ])
341        self.assertEqual(
342            self.parser.parse_filter(self._tokenize('trap')), [
343                parser.Filter(None, bpf.Trap()),
344            ])
345        self.assertEqual(
346            self.parser.parse_filter(self._tokenize('return ENOSYS')), [
347                parser.Filter(None,
348                              bpf.ReturnErrno(self.arch.constants['ENOSYS'])),
349            ])
350        self.assertEqual(
351            self.parser.parse_filter(self._tokenize('trace')), [
352                parser.Filter(None, bpf.Trace()),
353            ])
354        self.assertEqual(
355            self.parser.parse_filter(self._tokenize('log')), [
356                parser.Filter(None, bpf.Log()),
357            ])
358        self.assertEqual(
359            self.parser.parse_filter(self._tokenize('allow')), [
360                parser.Filter(None, bpf.Allow()),
361            ])
362        self.assertEqual(
363            self.parser.parse_filter(self._tokenize('1')), [
364                parser.Filter(None, bpf.Allow()),
365            ])
366        self.assertEqual(
367            self.parser.parse_filter(
368                self._tokenize(
369                    '{ arg0 == 0, arg0 == 1; return ENOSYS, trap }')),
370            [
371                parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
372                parser.Filter([[parser.Atom(0, '==', 1)]],
373                              bpf.ReturnErrno(self.arch.constants['ENOSYS'])),
374                parser.Filter(None, bpf.Trap()),
375            ])
376
377    def test_parse_missing_return_value(self):
378        """Reject missing return value."""
379        with self.assertRaisesRegex(parser.ParseException,
380                                    'missing return value'):
381            self.parser.parse_filter(self._tokenize('return'))
382
383    def test_parse_invalid_return_value(self):
384        """Reject invalid return value."""
385        with self.assertRaisesRegex(parser.ParseException, 'invalid constant'):
386            self.parser.parse_filter(self._tokenize('return arg0'))
387
388    def test_parse_unclosed_brace(self):
389        """Reject unclosed brace."""
390        with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'):
391            self.parser.parse_filter(self._tokenize('{ allow'))
392
393
394class ParseFilterStatementTests(unittest.TestCase):
395    """Tests for PolicyParser.parse_filter_statement."""
396
397    def setUp(self):
398        self.arch = ARCH_64
399        self.parser = parser.PolicyParser(
400            self.arch, kill_action=bpf.KillProcess())
401
402    def _tokenize(self, line):
403        # pylint: disable=protected-access
404        return list(self.parser._parser_state.tokenize([line]))[0]
405
406    def assertEqualIgnoringToken(self, actual, expected, msg=None):
407        """Similar to assertEqual, but ignores the token field."""
408        if (actual.syscalls != expected.syscalls or
409            actual.filters != expected.filters):
410            self.fail('%r != %r' % (actual, expected), msg)
411
412    def test_parse_filter_statement(self):
413        """Accept valid filter statements."""
414        self.assertEqualIgnoringToken(
415            self.parser.parse_filter_statement(
416                self._tokenize('read: arg0 == 0')),
417            parser.ParsedFilterStatement(
418                syscalls=(parser.Syscall('read', 0), ),
419                filters=[
420                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
421                ],
422                token=None))
423        self.assertEqualIgnoringToken(
424            self.parser.parse_filter_statement(
425                self._tokenize('{read, write}: arg0 == 0')),
426            parser.ParsedFilterStatement(
427                syscalls=(
428                    parser.Syscall('read', 0),
429                    parser.Syscall('write', 1),
430                ),
431                filters=[
432                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
433                ],
434                token=None))
435        self.assertEqualIgnoringToken(
436            self.parser.parse_filter_statement(
437                self._tokenize('io@libc: arg0 == 0')),
438            parser.ParsedFilterStatement(
439                syscalls=(
440                    parser.Syscall('read', 0),
441                    parser.Syscall('write', 1),
442                ),
443                filters=[
444                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
445                ],
446                token=None))
447        self.assertEqualIgnoringToken(
448            self.parser.parse_filter_statement(
449                self._tokenize('file-io@systemd: arg0 == 0')),
450            parser.ParsedFilterStatement(
451                syscalls=(
452                    parser.Syscall('read', 0),
453                    parser.Syscall('write', 1),
454                ),
455                filters=[
456                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
457                ],
458                token=None))
459        self.assertEqualIgnoringToken(
460            self.parser.parse_filter_statement(
461                self._tokenize('kill: arg0 == 0')),
462            parser.ParsedFilterStatement(
463                syscalls=(
464                    parser.Syscall('kill', 62),
465                ),
466                filters=[
467                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
468                ],
469                token=None))
470
471    def test_parse_metadata(self):
472        """Accept valid filter statements with metadata."""
473        self.assertEqualIgnoringToken(
474            self.parser.parse_filter_statement(
475                self._tokenize('read[arch=test]: arg0 == 0')),
476            parser.ParsedFilterStatement(
477                syscalls=(
478                    parser.Syscall('read', 0),
479                ),
480                filters=[
481                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
482                ],
483                token=None))
484        self.assertEqualIgnoringToken(
485            self.parser.parse_filter_statement(
486                self._tokenize(
487                    '{read, nonexistent[arch=nonexistent]}: arg0 == 0')),
488            parser.ParsedFilterStatement(
489                syscalls=(
490                    parser.Syscall('read', 0),
491                ),
492                filters=[
493                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
494                ],
495                token=None))
496
497    def test_parse_unclosed_brace(self):
498        """Reject unclosed brace."""
499        with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'):
500            self.parser.parse_filter(self._tokenize('{ allow'))
501
502    def test_parse_invalid_syscall_group(self):
503        """Reject invalid syscall groups."""
504        with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'):
505            self.parser.parse_filter_statement(
506                self._tokenize('{ read, write: arg0 == 0'))
507
508    def test_parse_missing_colon(self):
509        """Reject missing colon."""
510        with self.assertRaisesRegex(parser.ParseException, 'missing colon'):
511            self.parser.parse_filter_statement(self._tokenize('read'))
512
513    def test_parse_invalid_colon(self):
514        """Reject invalid colon."""
515        with self.assertRaisesRegex(parser.ParseException, 'invalid colon'):
516            self.parser.parse_filter_statement(self._tokenize('read arg0'))
517
518    def test_parse_missing_filter(self):
519        """Reject missing filter."""
520        with self.assertRaisesRegex(parser.ParseException, 'missing filter'):
521            self.parser.parse_filter_statement(self._tokenize('read:'))
522
523
524class ParseFileTests(unittest.TestCase):
525    """Tests for PolicyParser.parse_file."""
526
527    def setUp(self):
528        self.arch = ARCH_64
529        self.parser = parser.PolicyParser(
530            self.arch, kill_action=bpf.KillProcess())
531        self.tempdir = tempfile.mkdtemp()
532
533    def tearDown(self):
534        shutil.rmtree(self.tempdir)
535
536    def _write_file(self, filename, contents):
537        """Helper to write out a file for testing."""
538        path = os.path.join(self.tempdir, filename)
539        with open(path, 'w') as outf:
540            outf.write(contents)
541        return path
542
543    def test_parse_simple(self):
544        """Allow simple policy files."""
545        path = self._write_file(
546            'test.policy', """
547            # Comment.
548            read: allow
549            write: allow
550        """)
551
552        self.assertEqual(
553            self.parser.parse_file(path),
554            parser.ParsedPolicy(
555                default_action=bpf.KillProcess(),
556                filter_statements=[
557                    parser.FilterStatement(
558                        syscall=parser.Syscall('read', 0),
559                        frequency=1,
560                        filters=[
561                            parser.Filter(None, bpf.Allow()),
562                        ]),
563                    parser.FilterStatement(
564                        syscall=parser.Syscall('write', 1),
565                        frequency=1,
566                        filters=[
567                            parser.Filter(None, bpf.Allow()),
568                        ]),
569                ]))
570
571    def test_parse_multiline(self):
572        """Allow simple multi-line policy files."""
573        path = self._write_file(
574            'test.policy', """
575            # Comment.
576            read: \
577                allow
578            write: allow
579        """)
580
581        self.assertEqual(
582            self.parser.parse_file(path),
583            parser.ParsedPolicy(
584                default_action=bpf.KillProcess(),
585                filter_statements=[
586                    parser.FilterStatement(
587                        syscall=parser.Syscall('read', 0),
588                        frequency=1,
589                        filters=[
590                            parser.Filter(None, bpf.Allow()),
591                        ]),
592                    parser.FilterStatement(
593                        syscall=parser.Syscall('write', 1),
594                        frequency=1,
595                        filters=[
596                            parser.Filter(None, bpf.Allow()),
597                        ]),
598                ]))
599
600    def test_parse_default(self):
601        """Allow defining a default action."""
602        path = self._write_file(
603            'test.policy', """
604            @default kill-thread
605            read: allow
606        """)
607
608        self.assertEqual(
609            self.parser.parse_file(path),
610            parser.ParsedPolicy(
611                default_action=bpf.KillThread(),
612                filter_statements=[
613                    parser.FilterStatement(
614                        syscall=parser.Syscall('read', 0),
615                        frequency=1,
616                        filters=[
617                            parser.Filter(None, bpf.Allow()),
618                        ]),
619                ]))
620
621    def test_parse_default_permissive(self):
622        """Reject defining a permissive default action."""
623        path = self._write_file(
624            'test.policy', """
625            @default log
626            read: allow
627        """)
628
629        with self.assertRaisesRegex(parser.ParseException,
630                                    r'invalid permissive default action'):
631            self.parser.parse_file(path)
632
633    def test_parse_simple_grouped(self):
634        """Allow simple policy files."""
635        path = self._write_file(
636            'test.policy', """
637            # Comment.
638            {read, write}: allow
639        """)
640
641        self.assertEqual(
642            self.parser.parse_file(path),
643            parser.ParsedPolicy(
644                default_action=bpf.KillProcess(),
645                filter_statements=[
646                    parser.FilterStatement(
647                        syscall=parser.Syscall('read', 0),
648                        frequency=1,
649                        filters=[
650                            parser.Filter(None, bpf.Allow()),
651                        ]),
652                    parser.FilterStatement(
653                        syscall=parser.Syscall('write', 1),
654                        frequency=1,
655                        filters=[
656                            parser.Filter(None, bpf.Allow()),
657                        ]),
658                ]))
659
660    def test_parse_other_arch(self):
661        """Allow entries that only target another architecture."""
662        path = self._write_file(
663            'test.policy', """
664            # Comment.
665            read[arch=nonexistent]: allow
666            write: allow
667        """)
668
669        self.assertEqual(
670            self.parser.parse_file(path),
671            parser.ParsedPolicy(
672                default_action=bpf.KillProcess(),
673                filter_statements=[
674                    parser.FilterStatement(
675                        syscall=parser.Syscall('write', 1),
676                        frequency=1,
677                        filters=[
678                            parser.Filter(None, bpf.Allow()),
679                        ]),
680                ]))
681
682    def test_parse_include(self):
683        """Allow including policy files."""
684        path = self._write_file(
685            'test.include.policy', """
686            {read, write}: arg0 == 0; allow
687        """)
688        path = self._write_file(
689            'test.policy', """
690            @include ./test.include.policy
691            read: return ENOSYS
692        """)
693
694        self.assertEqual(
695            self.parser.parse_file(path),
696            parser.ParsedPolicy(
697                default_action=bpf.KillProcess(),
698                filter_statements=[
699                    parser.FilterStatement(
700                        syscall=parser.Syscall('read', 0),
701                        frequency=1,
702                        filters=[
703                            parser.Filter([[parser.Atom(0, '==', 0)]],
704                                          bpf.Allow()),
705                            parser.Filter(
706                                None,
707                                bpf.ReturnErrno(
708                                    self.arch.constants['ENOSYS'])),
709                        ]),
710                    parser.FilterStatement(
711                        syscall=parser.Syscall('write', 1),
712                        frequency=1,
713                        filters=[
714                            parser.Filter([[parser.Atom(0, '==', 0)]],
715                                          bpf.Allow()),
716                            parser.Filter(None, bpf.KillProcess()),
717                        ]),
718                ]))
719
720    def test_parse_invalid_include(self):
721        """Reject including invalid policy files."""
722        with self.assertRaisesRegex(parser.ParseException,
723                                    r'empty include path'):
724            path = self._write_file(
725                'test.policy', """
726                @include
727            """)
728            self.parser.parse_file(path)
729
730        with self.assertRaisesRegex(parser.ParseException,
731                                    r'invalid include path'):
732            path = self._write_file(
733                'test.policy', """
734                @include arg0
735            """)
736            self.parser.parse_file(path)
737
738        with self.assertRaisesRegex(parser.ParseException,
739                                    r'@include statement nested too deep'):
740            path = self._write_file(
741                'test.policy', """
742                @include ./test.policy
743            """)
744            self.parser.parse_file(path)
745
746        with self.assertRaisesRegex(parser.ParseException,
747                                    r'Could not @include .*'):
748            path = self._write_file(
749                'test.policy', """
750                @include ./nonexistent.policy
751            """)
752            self.parser.parse_file(path)
753
754    def test_parse_frequency(self):
755        """Allow including frequency files."""
756        self._write_file(
757            'test.frequency', """
758            read: 2
759            write: 3
760        """)
761        path = self._write_file(
762            'test.policy', """
763            @frequency ./test.frequency
764            read: allow
765        """)
766
767        self.assertEqual(
768            self.parser.parse_file(path),
769            parser.ParsedPolicy(
770                default_action=bpf.KillProcess(),
771                filter_statements=[
772                    parser.FilterStatement(
773                        syscall=parser.Syscall('read', 0),
774                        frequency=2,
775                        filters=[
776                            parser.Filter(None, bpf.Allow()),
777                        ]),
778                ]))
779
780    def test_parse_invalid_frequency(self):
781        """Reject including invalid frequency files."""
782        path = self._write_file('test.policy',
783                                """@frequency ./test.frequency""")
784
785        with self.assertRaisesRegex(parser.ParseException, r'missing colon'):
786            self._write_file('test.frequency', """
787                read
788            """)
789            self.parser.parse_file(path)
790
791        with self.assertRaisesRegex(parser.ParseException, r'invalid colon'):
792            self._write_file('test.frequency', """
793                read foo
794            """)
795            self.parser.parse_file(path)
796
797        with self.assertRaisesRegex(parser.ParseException, r'missing number'):
798            self._write_file('test.frequency', """
799                read:
800            """)
801            self.parser.parse_file(path)
802
803        with self.assertRaisesRegex(parser.ParseException, r'invalid number'):
804            self._write_file('test.frequency', """
805                read: foo
806            """)
807            self.parser.parse_file(path)
808
809        with self.assertRaisesRegex(parser.ParseException, r'invalid number'):
810            self._write_file('test.frequency', """
811                read: -1
812            """)
813            self.parser.parse_file(path)
814
815        with self.assertRaisesRegex(parser.ParseException,
816                                    r'empty frequency path'):
817            path = self._write_file(
818                'test.policy', """
819                @frequency
820            """)
821            self.parser.parse_file(path)
822
823        with self.assertRaisesRegex(parser.ParseException,
824                                    r'invalid frequency path'):
825            path = self._write_file(
826                'test.policy', """
827                @frequency arg0
828            """)
829            self.parser.parse_file(path)
830
831        with self.assertRaisesRegex(parser.ParseException,
832                                    r'Could not open frequency file.*'):
833            path = self._write_file(
834                'test.policy', """
835                @frequency ./nonexistent.frequency
836            """)
837            self.parser.parse_file(path)
838
839    def test_parse_multiple_unconditional(self):
840        """Reject actions after an unconditional action."""
841        path = self._write_file(
842            'test.policy', """
843            read: allow
844            read: allow
845        """)
846
847        with self.assertRaisesRegex(
848                parser.ParseException,
849                (r'test.policy\(3:17\): '
850                 r'Syscall read.*already had an unconditional action '
851                 r'applied')):
852            self.parser.parse_file(path)
853
854        path = self._write_file(
855            'test.policy', """
856            read: log
857            read: arg0 == 0; log
858        """)
859
860        with self.assertRaisesRegex(
861                parser.ParseException,
862                (r'test.policy\(3:17\): '
863                 r'Syscall read.*already had an unconditional action '
864                 r'applied')):
865            self.parser.parse_file(path)
866
867
868if __name__ == '__main__':
869    unittest.main()
870