1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (C) 2018 The Android Open Source Project 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17"""Unittests for the parser module.""" 18 19from __future__ import absolute_import 20from __future__ import division 21from __future__ import print_function 22 23import os 24import shutil 25import tempfile 26import unittest 27 28import arch 29import bpf 30import parser # pylint: disable=wrong-import-order 31 32ARCH_64 = arch.Arch.load_from_json( 33 os.path.join( 34 os.path.dirname(os.path.abspath(__file__)), 'testdata/arch_64.json')) 35 36 37class TokenizerTests(unittest.TestCase): 38 """Tests for ParserState.tokenize.""" 39 40 @staticmethod 41 def _tokenize(line): 42 parser_state = parser.ParserState('<memory>') 43 return list(parser_state.tokenize([line]))[0] 44 45 def test_tokenize(self): 46 """Accept valid tokens.""" 47 self.assertEqual([ 48 (token.type, token.value) 49 for token in TokenizerTests._tokenize('@include /minijail.policy') 50 ], [ 51 ('INCLUDE', '@include'), 52 ('PATH', '/minijail.policy'), 53 ]) 54 self.assertEqual([ 55 (token.type, token.value) 56 for token in TokenizerTests._tokenize('@include ./minijail.policy') 57 ], [ 58 ('INCLUDE', '@include'), 59 ('PATH', './minijail.policy'), 60 ]) 61 self.assertEqual( 62 [(token.type, token.value) for token in TokenizerTests._tokenize( 63 'read: arg0 in ~0xffff || arg0 & (1|2) && arg0 == 0o755; ' 64 'return ENOSYS # ignored')], [ 65 ('IDENTIFIER', 'read'), 66 ('COLON', ':'), 67 ('ARGUMENT', 'arg0'), 68 ('OP', 'in'), 69 ('BITWISE_COMPLEMENT', '~'), 70 ('NUMERIC_CONSTANT', '0xffff'), 71 ('OR', '||'), 72 ('ARGUMENT', 'arg0'), 73 ('OP', '&'), 74 ('LPAREN', '('), 75 ('NUMERIC_CONSTANT', '1'), 76 ('BITWISE_OR', '|'), 77 ('NUMERIC_CONSTANT', '2'), 78 ('RPAREN', ')'), 79 ('AND', '&&'), 80 ('ARGUMENT', 'arg0'), 81 ('OP', '=='), 82 ('NUMERIC_CONSTANT', '0o755'), 83 ('SEMICOLON', ';'), 84 ('RETURN', 'return'), 85 ('IDENTIFIER', 'ENOSYS'), 86 ]) 87 88 def test_tokenize_invalid_token(self): 89 """Reject tokenizer errors.""" 90 with self.assertRaisesRegex(parser.ParseException, 91 (r'<memory>\(1:1\): invalid token\n' 92 r' %invalid-token%\n' 93 r' \^')): 94 TokenizerTests._tokenize('%invalid-token%') 95 96 97class ParseConstantTests(unittest.TestCase): 98 """Tests for PolicyParser.parse_value.""" 99 100 def setUp(self): 101 self.arch = ARCH_64 102 self.parser = parser.PolicyParser( 103 self.arch, kill_action=bpf.KillProcess()) 104 105 def _tokenize(self, line): 106 # pylint: disable=protected-access 107 return list(self.parser._parser_state.tokenize([line]))[0] 108 109 def test_parse_constant_unsigned(self): 110 """Accept reasonably-sized unsigned constants.""" 111 self.assertEqual( 112 self.parser.parse_value(self._tokenize('0x80000000')), 0x80000000) 113 if self.arch.bits == 64: 114 self.assertEqual( 115 self.parser.parse_value(self._tokenize('0x8000000000000000')), 116 0x8000000000000000) 117 118 def test_parse_constant_unsigned_too_big(self): 119 """Reject unreasonably-sized unsigned constants.""" 120 if self.arch.bits == 32: 121 with self.assertRaisesRegex(parser.ParseException, 122 'unsigned overflow'): 123 self.parser.parse_value(self._tokenize('0x100000000')) 124 with self.assertRaisesRegex(parser.ParseException, 125 'unsigned overflow'): 126 self.parser.parse_value(self._tokenize('0x10000000000000000')) 127 128 def test_parse_constant_signed(self): 129 """Accept reasonably-sized signed constants.""" 130 self.assertEqual( 131 self.parser.parse_value(self._tokenize('-1')), 132 self.arch.max_unsigned) 133 134 def test_parse_constant_signed_too_negative(self): 135 """Reject unreasonably-sized signed constants.""" 136 if self.arch.bits == 32: 137 with self.assertRaisesRegex(parser.ParseException, 138 'signed underflow'): 139 self.parser.parse_value(self._tokenize('-0x800000001')) 140 with self.assertRaisesRegex(parser.ParseException, 'signed underflow'): 141 self.parser.parse_value(self._tokenize('-0x8000000000000001')) 142 143 def test_parse_mask(self): 144 """Accept parsing a mask value.""" 145 self.assertEqual( 146 self.parser.parse_value(self._tokenize('0x1|0x2|0x4|0x8')), 0xf) 147 148 def test_parse_parenthesized_expressions(self): 149 """Accept parsing parenthesized expressions.""" 150 bad_expressions = [ 151 '(1', 152 '|(1)', 153 '(1)|', 154 '()', 155 '(', 156 '((', 157 '(()', 158 '(()1', 159 ] 160 for expression in bad_expressions: 161 with self.assertRaises(parser.ParseException, msg=expression): 162 self.parser.parse_value(self._tokenize(expression)) 163 164 bad_partial_expressions = [ 165 '1)', 166 '(1)1', 167 '1(0)', 168 ] 169 for expression in bad_partial_expressions: 170 tokens = self._tokenize(expression) 171 self.parser.parse_value(tokens) 172 self.assertNotEqual(tokens, []) 173 174 good_expressions = [ 175 '(3)', 176 '(1)|2', 177 '1|(2)', 178 '(1)|(2)', 179 '((3))', 180 '0|(1|2)', 181 '(0|1|2)', 182 ] 183 for expression in good_expressions: 184 self.assertEqual( 185 self.parser.parse_value(self._tokenize(expression)), 3) 186 187 def test_parse_constant_complements(self): 188 """Accept complementing constants.""" 189 self.assertEqual( 190 self.parser.parse_value(self._tokenize('~0')), 191 self.arch.max_unsigned) 192 self.assertEqual( 193 self.parser.parse_value(self._tokenize('~0|~0')), 194 self.arch.max_unsigned) 195 if self.arch.bits == 32: 196 self.assertEqual( 197 self.parser.parse_value( 198 self._tokenize('~0x005AF0FF|~0xFFA50FFF')), 0xFFFFFF00) 199 self.assertEqual( 200 self.parser.parse_value( 201 self._tokenize('0x0F|~(0x005AF000|0x00A50FFF)|0xF0')), 202 0xFF0000FF) 203 else: 204 self.assertEqual( 205 self.parser.parse_value( 206 self._tokenize('~0x00005A5AF0F0FFFF|~0xFFFFA5A50F0FFFFF')), 207 0xFFFFFFFFFFFF0000) 208 self.assertEqual( 209 self.parser.parse_value( 210 self._tokenize( 211 '0x00FF|~(0x00005A5AF0F00000|0x0000A5A50F0FFFFF)|0xFF00' 212 )), 0xFFFF00000000FFFF) 213 214 def test_parse_double_complement(self): 215 """Reject double-complementing constants.""" 216 with self.assertRaisesRegex(parser.ParseException, 217 'double complement'): 218 self.parser.parse_value(self._tokenize('~~0')) 219 220 def test_parse_empty_complement(self): 221 """Reject complementing nothing.""" 222 with self.assertRaisesRegex(parser.ParseException, 'empty complement'): 223 self.parser.parse_value(self._tokenize('0|~')) 224 225 def test_parse_named_constant(self): 226 """Accept parsing a named constant.""" 227 self.assertEqual( 228 self.parser.parse_value(self._tokenize('O_RDONLY')), 0) 229 230 def test_parse_empty_constant(self): 231 """Reject parsing nothing.""" 232 with self.assertRaisesRegex(parser.ParseException, 'empty constant'): 233 self.parser.parse_value([]) 234 with self.assertRaisesRegex(parser.ParseException, 'empty constant'): 235 self.parser.parse_value(self._tokenize('0|')) 236 237 def test_parse_invalid_constant(self): 238 """Reject parsing invalid constants.""" 239 with self.assertRaisesRegex(parser.ParseException, 'invalid constant'): 240 self.parser.parse_value(self._tokenize('foo')) 241 242 243class ParseFilterExpressionTests(unittest.TestCase): 244 """Tests for PolicyParser.parse_argument_expression.""" 245 246 def setUp(self): 247 self.arch = ARCH_64 248 self.parser = parser.PolicyParser( 249 self.arch, kill_action=bpf.KillProcess()) 250 251 def _tokenize(self, line): 252 # pylint: disable=protected-access 253 return list(self.parser._parser_state.tokenize([line]))[0] 254 255 def test_parse_argument_expression(self): 256 """Accept valid argument expressions.""" 257 self.assertEqual( 258 self.parser.parse_argument_expression( 259 self._tokenize( 260 'arg0 in 0xffff || arg0 == PROT_EXEC && arg1 == PROT_WRITE' 261 )), [ 262 [parser.Atom(0, 'in', 0xffff)], 263 [parser.Atom(0, '==', 4), 264 parser.Atom(1, '==', 2)], 265 ]) 266 267 def test_parse_empty_argument_expression(self): 268 """Reject empty argument expressions.""" 269 with self.assertRaisesRegex(parser.ParseException, 270 'empty argument expression'): 271 self.parser.parse_argument_expression( 272 self._tokenize('arg0 in 0xffff ||')) 273 274 def test_parse_empty_clause(self): 275 """Reject empty clause.""" 276 with self.assertRaisesRegex(parser.ParseException, 'empty clause'): 277 self.parser.parse_argument_expression( 278 self._tokenize('arg0 in 0xffff &&')) 279 280 def test_parse_invalid_argument(self): 281 """Reject invalid argument.""" 282 with self.assertRaisesRegex(parser.ParseException, 'invalid argument'): 283 self.parser.parse_argument_expression( 284 self._tokenize('argX in 0xffff')) 285 286 def test_parse_invalid_operator(self): 287 """Reject invalid operator.""" 288 with self.assertRaisesRegex(parser.ParseException, 'invalid operator'): 289 self.parser.parse_argument_expression( 290 self._tokenize('arg0 = 0xffff')) 291 292 def test_parse_missing_operator(self): 293 """Reject missing operator.""" 294 with self.assertRaisesRegex(parser.ParseException, 'missing operator'): 295 self.parser.parse_argument_expression(self._tokenize('arg0')) 296 297 def test_parse_missing_operand(self): 298 """Reject missing operand.""" 299 with self.assertRaisesRegex(parser.ParseException, 'empty constant'): 300 self.parser.parse_argument_expression(self._tokenize('arg0 ==')) 301 302 303class ParseFilterTests(unittest.TestCase): 304 """Tests for PolicyParser.parse_filter.""" 305 306 def setUp(self): 307 self.arch = ARCH_64 308 self.parser = parser.PolicyParser( 309 self.arch, kill_action=bpf.KillProcess()) 310 311 def _tokenize(self, line): 312 # pylint: disable=protected-access 313 return list(self.parser._parser_state.tokenize([line]))[0] 314 315 def test_parse_filter(self): 316 """Accept valid filters.""" 317 self.assertEqual( 318 self.parser.parse_filter(self._tokenize('arg0 == 0')), [ 319 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 320 ]) 321 self.assertEqual( 322 self.parser.parse_filter(self._tokenize('kill-process')), [ 323 parser.Filter(None, bpf.KillProcess()), 324 ]) 325 self.assertEqual( 326 self.parser.parse_filter(self._tokenize('kill-thread')), [ 327 parser.Filter(None, bpf.KillThread()), 328 ]) 329 self.assertEqual( 330 self.parser.parse_filter(self._tokenize('trap')), [ 331 parser.Filter(None, bpf.Trap()), 332 ]) 333 self.assertEqual( 334 self.parser.parse_filter(self._tokenize('return ENOSYS')), [ 335 parser.Filter(None, 336 bpf.ReturnErrno(self.arch.constants['ENOSYS'])), 337 ]) 338 self.assertEqual( 339 self.parser.parse_filter(self._tokenize('trace')), [ 340 parser.Filter(None, bpf.Trace()), 341 ]) 342 self.assertEqual( 343 self.parser.parse_filter(self._tokenize('log')), [ 344 parser.Filter(None, bpf.Log()), 345 ]) 346 self.assertEqual( 347 self.parser.parse_filter(self._tokenize('allow')), [ 348 parser.Filter(None, bpf.Allow()), 349 ]) 350 self.assertEqual( 351 self.parser.parse_filter(self._tokenize('1')), [ 352 parser.Filter(None, bpf.Allow()), 353 ]) 354 self.assertEqual( 355 self.parser.parse_filter( 356 self._tokenize( 357 '{ arg0 == 0, arg0 == 1; return ENOSYS, trap }')), 358 [ 359 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 360 parser.Filter([[parser.Atom(0, '==', 1)]], 361 bpf.ReturnErrno(self.arch.constants['ENOSYS'])), 362 parser.Filter(None, bpf.Trap()), 363 ]) 364 365 def test_parse_missing_return_value(self): 366 """Reject missing return value.""" 367 with self.assertRaisesRegex(parser.ParseException, 368 'missing return value'): 369 self.parser.parse_filter(self._tokenize('return')) 370 371 def test_parse_invalid_return_value(self): 372 """Reject invalid return value.""" 373 with self.assertRaisesRegex(parser.ParseException, 'invalid constant'): 374 self.parser.parse_filter(self._tokenize('return arg0')) 375 376 def test_parse_unclosed_brace(self): 377 """Reject unclosed brace.""" 378 with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'): 379 self.parser.parse_filter(self._tokenize('{ allow')) 380 381 382class ParseFilterStatementTests(unittest.TestCase): 383 """Tests for PolicyParser.parse_filter_statement.""" 384 385 def setUp(self): 386 self.arch = ARCH_64 387 self.parser = parser.PolicyParser( 388 self.arch, kill_action=bpf.KillProcess()) 389 390 def _tokenize(self, line): 391 # pylint: disable=protected-access 392 return list(self.parser._parser_state.tokenize([line]))[0] 393 394 def test_parse_filter_statement(self): 395 """Accept valid filter statements.""" 396 self.assertEqual( 397 self.parser.parse_filter_statement( 398 self._tokenize('read: arg0 == 0')), 399 parser.ParsedFilterStatement((parser.Syscall('read', 0), ), [ 400 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 401 ])) 402 self.assertEqual( 403 self.parser.parse_filter_statement( 404 self._tokenize('{read, write}: arg0 == 0')), 405 parser.ParsedFilterStatement(( 406 parser.Syscall('read', 0), 407 parser.Syscall('write', 1), 408 ), [ 409 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 410 ])) 411 self.assertEqual( 412 self.parser.parse_filter_statement( 413 self._tokenize('io@libc: arg0 == 0')), 414 parser.ParsedFilterStatement(( 415 parser.Syscall('read', 0), 416 parser.Syscall('write', 1), 417 ), [ 418 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 419 ])) 420 self.assertEqual( 421 self.parser.parse_filter_statement( 422 self._tokenize('file-io@systemd: arg0 == 0')), 423 parser.ParsedFilterStatement(( 424 parser.Syscall('read', 0), 425 parser.Syscall('write', 1), 426 ), [ 427 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 428 ])) 429 430 def test_parse_metadata(self): 431 """Accept valid filter statements with metadata.""" 432 self.assertEqual( 433 self.parser.parse_filter_statement( 434 self._tokenize('read[arch=test]: arg0 == 0')), 435 parser.ParsedFilterStatement((parser.Syscall('read', 0), ), [ 436 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 437 ])) 438 self.assertEqual( 439 self.parser.parse_filter_statement( 440 self._tokenize( 441 '{read, nonexistent[arch=nonexistent]}: arg0 == 0')), 442 parser.ParsedFilterStatement((parser.Syscall('read', 0), ), [ 443 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 444 ])) 445 446 def test_parse_unclosed_brace(self): 447 """Reject unclosed brace.""" 448 with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'): 449 self.parser.parse_filter(self._tokenize('{ allow')) 450 451 def test_parse_invalid_syscall_group(self): 452 """Reject invalid syscall groups.""" 453 with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'): 454 self.parser.parse_filter_statement( 455 self._tokenize('{ read, write: arg0 == 0')) 456 457 def test_parse_missing_colon(self): 458 """Reject missing colon.""" 459 with self.assertRaisesRegex(parser.ParseException, 'missing colon'): 460 self.parser.parse_filter_statement(self._tokenize('read')) 461 462 def test_parse_invalid_colon(self): 463 """Reject invalid colon.""" 464 with self.assertRaisesRegex(parser.ParseException, 'invalid colon'): 465 self.parser.parse_filter_statement(self._tokenize('read arg0')) 466 467 def test_parse_missing_filter(self): 468 """Reject missing filter.""" 469 with self.assertRaisesRegex(parser.ParseException, 'missing filter'): 470 self.parser.parse_filter_statement(self._tokenize('read:')) 471 472 473class ParseFileTests(unittest.TestCase): 474 """Tests for PolicyParser.parse_file.""" 475 476 def setUp(self): 477 self.arch = ARCH_64 478 self.parser = parser.PolicyParser( 479 self.arch, kill_action=bpf.KillProcess()) 480 self.tempdir = tempfile.mkdtemp() 481 482 def tearDown(self): 483 shutil.rmtree(self.tempdir) 484 485 def _write_file(self, filename, contents): 486 """Helper to write out a file for testing.""" 487 path = os.path.join(self.tempdir, filename) 488 with open(path, 'w') as outf: 489 outf.write(contents) 490 return path 491 492 def test_parse_simple(self): 493 """Allow simple policy files.""" 494 path = self._write_file( 495 'test.policy', """ 496 # Comment. 497 read: allow 498 write: allow 499 """) 500 501 self.assertEqual( 502 self.parser.parse_file(path), 503 parser.ParsedPolicy( 504 default_action=bpf.KillProcess(), 505 filter_statements=[ 506 parser.FilterStatement( 507 syscall=parser.Syscall('read', 0), 508 frequency=1, 509 filters=[ 510 parser.Filter(None, bpf.Allow()), 511 ]), 512 parser.FilterStatement( 513 syscall=parser.Syscall('write', 1), 514 frequency=1, 515 filters=[ 516 parser.Filter(None, bpf.Allow()), 517 ]), 518 ])) 519 520 def test_parse_multiline(self): 521 """Allow simple multi-line policy files.""" 522 path = self._write_file( 523 'test.policy', """ 524 # Comment. 525 read: \ 526 allow 527 write: allow 528 """) 529 530 self.assertEqual( 531 self.parser.parse_file(path), 532 parser.ParsedPolicy( 533 default_action=bpf.KillProcess(), 534 filter_statements=[ 535 parser.FilterStatement( 536 syscall=parser.Syscall('read', 0), 537 frequency=1, 538 filters=[ 539 parser.Filter(None, bpf.Allow()), 540 ]), 541 parser.FilterStatement( 542 syscall=parser.Syscall('write', 1), 543 frequency=1, 544 filters=[ 545 parser.Filter(None, bpf.Allow()), 546 ]), 547 ])) 548 549 def test_parse_default(self): 550 """Allow defining a default action.""" 551 path = self._write_file( 552 'test.policy', """ 553 @default kill-thread 554 read: allow 555 """) 556 557 self.assertEqual( 558 self.parser.parse_file(path), 559 parser.ParsedPolicy( 560 default_action=bpf.KillThread(), 561 filter_statements=[ 562 parser.FilterStatement( 563 syscall=parser.Syscall('read', 0), 564 frequency=1, 565 filters=[ 566 parser.Filter(None, bpf.Allow()), 567 ]), 568 ])) 569 570 def test_parse_default_permissive(self): 571 """Reject defining a permissive default action.""" 572 path = self._write_file( 573 'test.policy', """ 574 @default log 575 read: allow 576 """) 577 578 with self.assertRaisesRegex(parser.ParseException, 579 r'invalid permissive default action'): 580 self.parser.parse_file(path) 581 582 def test_parse_simple_grouped(self): 583 """Allow simple policy files.""" 584 path = self._write_file( 585 'test.policy', """ 586 # Comment. 587 {read, write}: allow 588 """) 589 590 self.assertEqual( 591 self.parser.parse_file(path), 592 parser.ParsedPolicy( 593 default_action=bpf.KillProcess(), 594 filter_statements=[ 595 parser.FilterStatement( 596 syscall=parser.Syscall('read', 0), 597 frequency=1, 598 filters=[ 599 parser.Filter(None, bpf.Allow()), 600 ]), 601 parser.FilterStatement( 602 syscall=parser.Syscall('write', 1), 603 frequency=1, 604 filters=[ 605 parser.Filter(None, bpf.Allow()), 606 ]), 607 ])) 608 609 def test_parse_other_arch(self): 610 """Allow entries that only target another architecture.""" 611 path = self._write_file( 612 'test.policy', """ 613 # Comment. 614 read[arch=nonexistent]: allow 615 write: allow 616 """) 617 618 self.assertEqual( 619 self.parser.parse_file(path), 620 parser.ParsedPolicy( 621 default_action=bpf.KillProcess(), 622 filter_statements=[ 623 parser.FilterStatement( 624 syscall=parser.Syscall('write', 1), 625 frequency=1, 626 filters=[ 627 parser.Filter(None, bpf.Allow()), 628 ]), 629 ])) 630 631 def test_parse_include(self): 632 """Allow including policy files.""" 633 path = self._write_file( 634 'test.include.policy', """ 635 {read, write}: arg0 == 0; allow 636 """) 637 path = self._write_file( 638 'test.policy', """ 639 @include ./test.include.policy 640 read: return ENOSYS 641 """) 642 643 self.assertEqual( 644 self.parser.parse_file(path), 645 parser.ParsedPolicy( 646 default_action=bpf.KillProcess(), 647 filter_statements=[ 648 parser.FilterStatement( 649 syscall=parser.Syscall('read', 0), 650 frequency=1, 651 filters=[ 652 parser.Filter([[parser.Atom(0, '==', 0)]], 653 bpf.Allow()), 654 parser.Filter( 655 None, 656 bpf.ReturnErrno( 657 self.arch.constants['ENOSYS'])), 658 ]), 659 parser.FilterStatement( 660 syscall=parser.Syscall('write', 1), 661 frequency=1, 662 filters=[ 663 parser.Filter([[parser.Atom(0, '==', 0)]], 664 bpf.Allow()), 665 parser.Filter(None, bpf.KillProcess()), 666 ]), 667 ])) 668 669 def test_parse_invalid_include(self): 670 """Reject including invalid policy files.""" 671 with self.assertRaisesRegex(parser.ParseException, 672 r'empty include path'): 673 path = self._write_file( 674 'test.policy', """ 675 @include 676 """) 677 self.parser.parse_file(path) 678 679 with self.assertRaisesRegex(parser.ParseException, 680 r'invalid include path'): 681 path = self._write_file( 682 'test.policy', """ 683 @include arg0 684 """) 685 self.parser.parse_file(path) 686 687 with self.assertRaisesRegex(parser.ParseException, 688 r'@include statement nested too deep'): 689 path = self._write_file( 690 'test.policy', """ 691 @include ./test.policy 692 """) 693 self.parser.parse_file(path) 694 695 with self.assertRaisesRegex(parser.ParseException, 696 r'Could not @include .*'): 697 path = self._write_file( 698 'test.policy', """ 699 @include ./nonexistent.policy 700 """) 701 self.parser.parse_file(path) 702 703 def test_parse_frequency(self): 704 """Allow including frequency files.""" 705 self._write_file( 706 'test.frequency', """ 707 read: 2 708 write: 3 709 """) 710 path = self._write_file( 711 'test.policy', """ 712 @frequency ./test.frequency 713 read: allow 714 """) 715 716 self.assertEqual( 717 self.parser.parse_file(path), 718 parser.ParsedPolicy( 719 default_action=bpf.KillProcess(), 720 filter_statements=[ 721 parser.FilterStatement( 722 syscall=parser.Syscall('read', 0), 723 frequency=2, 724 filters=[ 725 parser.Filter(None, bpf.Allow()), 726 ]), 727 ])) 728 729 def test_parse_invalid_frequency(self): 730 """Reject including invalid frequency files.""" 731 path = self._write_file('test.policy', 732 """@frequency ./test.frequency""") 733 734 with self.assertRaisesRegex(parser.ParseException, r'missing colon'): 735 self._write_file('test.frequency', """ 736 read 737 """) 738 self.parser.parse_file(path) 739 740 with self.assertRaisesRegex(parser.ParseException, r'invalid colon'): 741 self._write_file('test.frequency', """ 742 read foo 743 """) 744 self.parser.parse_file(path) 745 746 with self.assertRaisesRegex(parser.ParseException, r'missing number'): 747 self._write_file('test.frequency', """ 748 read: 749 """) 750 self.parser.parse_file(path) 751 752 with self.assertRaisesRegex(parser.ParseException, r'invalid number'): 753 self._write_file('test.frequency', """ 754 read: foo 755 """) 756 self.parser.parse_file(path) 757 758 with self.assertRaisesRegex(parser.ParseException, r'invalid number'): 759 self._write_file('test.frequency', """ 760 read: -1 761 """) 762 self.parser.parse_file(path) 763 764 with self.assertRaisesRegex(parser.ParseException, 765 r'empty frequency path'): 766 path = self._write_file( 767 'test.policy', """ 768 @frequency 769 """) 770 self.parser.parse_file(path) 771 772 with self.assertRaisesRegex(parser.ParseException, 773 r'invalid frequency path'): 774 path = self._write_file( 775 'test.policy', """ 776 @frequency arg0 777 """) 778 self.parser.parse_file(path) 779 780 with self.assertRaisesRegex(parser.ParseException, 781 r'Could not open frequency file.*'): 782 path = self._write_file( 783 'test.policy', """ 784 @frequency ./nonexistent.frequency 785 """) 786 self.parser.parse_file(path) 787 788 def test_parse_multiple_unconditional(self): 789 """Reject actions after an unconditional action.""" 790 path = self._write_file( 791 'test.policy', """ 792 read: allow 793 read: allow 794 """) 795 796 with self.assertRaisesRegex( 797 parser.ParseException, 798 r'Syscall read.*already had an unconditional action applied'): 799 self.parser.parse_file(path) 800 801 path = self._write_file( 802 'test.policy', """ 803 read: log 804 read: arg0 == 0; log 805 """) 806 807 with self.assertRaisesRegex( 808 parser.ParseException, 809 r'Syscall read.*already had an unconditional action applied'): 810 self.parser.parse_file(path) 811 812 813if __name__ == '__main__': 814 unittest.main() 815