1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (C) 2018 The Android Open Source Project 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17"""Unittests for the parser module.""" 18 19from __future__ import absolute_import 20from __future__ import division 21from __future__ import print_function 22 23import os 24import shutil 25import tempfile 26import unittest 27 28import arch 29import bpf 30import parser # pylint: disable=wrong-import-order 31 32ARCH_64 = arch.Arch.load_from_json( 33 os.path.join( 34 os.path.dirname(os.path.abspath(__file__)), 'testdata/arch_64.json')) 35 36 37class TokenizerTests(unittest.TestCase): 38 """Tests for ParserState.tokenize.""" 39 40 @staticmethod 41 def _tokenize(line): 42 parser_state = parser.ParserState('<memory>') 43 return list(parser_state.tokenize([line]))[0] 44 45 def test_tokenize(self): 46 """Accept valid tokens.""" 47 self.assertEqual([ 48 (token.type, token.value) 49 for token in TokenizerTests._tokenize('@include /minijail.policy') 50 ], [ 51 ('INCLUDE', '@include'), 52 ('PATH', '/minijail.policy'), 53 ]) 54 self.assertEqual([ 55 (token.type, token.value) 56 for token in TokenizerTests._tokenize('@include ./minijail.policy') 57 ], [ 58 ('INCLUDE', '@include'), 59 ('PATH', './minijail.policy'), 60 ]) 61 self.assertEqual( 62 [(token.type, token.value) for token in TokenizerTests._tokenize( 63 'read: arg0 in ~0xffff || arg0 & (1|2) && arg0 == 0o755; ' 64 'return ENOSYS # ignored')], [ 65 ('IDENTIFIER', 'read'), 66 ('COLON', ':'), 67 ('ARGUMENT', 'arg0'), 68 ('OP', 'in'), 69 ('BITWISE_COMPLEMENT', '~'), 70 ('NUMERIC_CONSTANT', '0xffff'), 71 ('OR', '||'), 72 ('ARGUMENT', 'arg0'), 73 ('OP', '&'), 74 ('LPAREN', '('), 75 ('NUMERIC_CONSTANT', '1'), 76 ('BITWISE_OR', '|'), 77 ('NUMERIC_CONSTANT', '2'), 78 ('RPAREN', ')'), 79 ('AND', '&&'), 80 ('ARGUMENT', 'arg0'), 81 ('OP', '=='), 82 ('NUMERIC_CONSTANT', '0o755'), 83 ('SEMICOLON', ';'), 84 ('RETURN', 'return'), 85 ('IDENTIFIER', 'ENOSYS'), 86 ]) 87 # Ensure that tokens that have an otherwise valid token as prefix are 88 # still matched correctly. 89 self.assertEqual([ 90 (token.type, token.value) 91 for token in TokenizerTests._tokenize( 92 'inotify_wait return_sys killall trace_sys') 93 ], [ 94 ('IDENTIFIER', 'inotify_wait'), 95 ('IDENTIFIER', 'return_sys'), 96 ('IDENTIFIER', 'killall'), 97 ('IDENTIFIER', 'trace_sys'), 98 ]) 99 100 def test_tokenize_invalid_token(self): 101 """Reject tokenizer errors.""" 102 with self.assertRaisesRegex(parser.ParseException, 103 (r'<memory>\(1:1\): invalid token\n' 104 r' %invalid-token%\n' 105 r' \^')): 106 TokenizerTests._tokenize('%invalid-token%') 107 108 109class ParseConstantTests(unittest.TestCase): 110 """Tests for PolicyParser.parse_value.""" 111 112 def setUp(self): 113 self.arch = ARCH_64 114 self.parser = parser.PolicyParser( 115 self.arch, kill_action=bpf.KillProcess()) 116 117 def _tokenize(self, line): 118 # pylint: disable=protected-access 119 return list(self.parser._parser_state.tokenize([line]))[0] 120 121 def test_parse_constant_unsigned(self): 122 """Accept reasonably-sized unsigned constants.""" 123 self.assertEqual( 124 self.parser.parse_value(self._tokenize('0x80000000')), 0x80000000) 125 if self.arch.bits == 64: 126 self.assertEqual( 127 self.parser.parse_value(self._tokenize('0x8000000000000000')), 128 0x8000000000000000) 129 130 def test_parse_constant_unsigned_too_big(self): 131 """Reject unreasonably-sized unsigned constants.""" 132 if self.arch.bits == 32: 133 with self.assertRaisesRegex(parser.ParseException, 134 'unsigned overflow'): 135 self.parser.parse_value(self._tokenize('0x100000000')) 136 with self.assertRaisesRegex(parser.ParseException, 137 'unsigned overflow'): 138 self.parser.parse_value(self._tokenize('0x10000000000000000')) 139 140 def test_parse_constant_signed(self): 141 """Accept reasonably-sized signed constants.""" 142 self.assertEqual( 143 self.parser.parse_value(self._tokenize('-1')), 144 self.arch.max_unsigned) 145 146 def test_parse_constant_signed_too_negative(self): 147 """Reject unreasonably-sized signed constants.""" 148 if self.arch.bits == 32: 149 with self.assertRaisesRegex(parser.ParseException, 150 'signed underflow'): 151 self.parser.parse_value(self._tokenize('-0x800000001')) 152 with self.assertRaisesRegex(parser.ParseException, 'signed underflow'): 153 self.parser.parse_value(self._tokenize('-0x8000000000000001')) 154 155 def test_parse_mask(self): 156 """Accept parsing a mask value.""" 157 self.assertEqual( 158 self.parser.parse_value(self._tokenize('0x1|0x2|0x4|0x8')), 0xf) 159 160 def test_parse_parenthesized_expressions(self): 161 """Accept parsing parenthesized expressions.""" 162 bad_expressions = [ 163 '(1', 164 '|(1)', 165 '(1)|', 166 '()', 167 '(', 168 '((', 169 '(()', 170 '(()1', 171 ] 172 for expression in bad_expressions: 173 with self.assertRaises(parser.ParseException, msg=expression): 174 self.parser.parse_value(self._tokenize(expression)) 175 176 bad_partial_expressions = [ 177 '1)', 178 '(1)1', 179 '1(0)', 180 ] 181 for expression in bad_partial_expressions: 182 tokens = self._tokenize(expression) 183 self.parser.parse_value(tokens) 184 self.assertNotEqual(tokens, []) 185 186 good_expressions = [ 187 '(3)', 188 '(1)|2', 189 '1|(2)', 190 '(1)|(2)', 191 '((3))', 192 '0|(1|2)', 193 '(0|1|2)', 194 ] 195 for expression in good_expressions: 196 self.assertEqual( 197 self.parser.parse_value(self._tokenize(expression)), 3) 198 199 def test_parse_constant_complements(self): 200 """Accept complementing constants.""" 201 self.assertEqual( 202 self.parser.parse_value(self._tokenize('~0')), 203 self.arch.max_unsigned) 204 self.assertEqual( 205 self.parser.parse_value(self._tokenize('~0|~0')), 206 self.arch.max_unsigned) 207 if self.arch.bits == 32: 208 self.assertEqual( 209 self.parser.parse_value( 210 self._tokenize('~0x005AF0FF|~0xFFA50FFF')), 0xFFFFFF00) 211 self.assertEqual( 212 self.parser.parse_value( 213 self._tokenize('0x0F|~(0x005AF000|0x00A50FFF)|0xF0')), 214 0xFF0000FF) 215 else: 216 self.assertEqual( 217 self.parser.parse_value( 218 self._tokenize('~0x00005A5AF0F0FFFF|~0xFFFFA5A50F0FFFFF')), 219 0xFFFFFFFFFFFF0000) 220 self.assertEqual( 221 self.parser.parse_value( 222 self._tokenize( 223 '0x00FF|~(0x00005A5AF0F00000|0x0000A5A50F0FFFFF)|0xFF00' 224 )), 0xFFFF00000000FFFF) 225 226 def test_parse_double_complement(self): 227 """Reject double-complementing constants.""" 228 with self.assertRaisesRegex(parser.ParseException, 229 'double complement'): 230 self.parser.parse_value(self._tokenize('~~0')) 231 232 def test_parse_empty_complement(self): 233 """Reject complementing nothing.""" 234 with self.assertRaisesRegex(parser.ParseException, 'empty complement'): 235 self.parser.parse_value(self._tokenize('0|~')) 236 237 def test_parse_named_constant(self): 238 """Accept parsing a named constant.""" 239 self.assertEqual( 240 self.parser.parse_value(self._tokenize('O_RDONLY')), 0) 241 242 def test_parse_empty_constant(self): 243 """Reject parsing nothing.""" 244 with self.assertRaisesRegex(parser.ParseException, 'empty constant'): 245 self.parser.parse_value([]) 246 with self.assertRaisesRegex(parser.ParseException, 'empty constant'): 247 self.parser.parse_value(self._tokenize('0|')) 248 249 def test_parse_invalid_constant(self): 250 """Reject parsing invalid constants.""" 251 with self.assertRaisesRegex(parser.ParseException, 'invalid constant'): 252 self.parser.parse_value(self._tokenize('foo')) 253 254 255class ParseFilterExpressionTests(unittest.TestCase): 256 """Tests for PolicyParser.parse_argument_expression.""" 257 258 def setUp(self): 259 self.arch = ARCH_64 260 self.parser = parser.PolicyParser( 261 self.arch, kill_action=bpf.KillProcess()) 262 263 def _tokenize(self, line): 264 # pylint: disable=protected-access 265 return list(self.parser._parser_state.tokenize([line]))[0] 266 267 def test_parse_argument_expression(self): 268 """Accept valid argument expressions.""" 269 self.assertEqual( 270 self.parser.parse_argument_expression( 271 self._tokenize( 272 'arg0 in 0xffff || arg0 == PROT_EXEC && arg1 == PROT_WRITE' 273 )), [ 274 [parser.Atom(0, 'in', 0xffff)], 275 [parser.Atom(0, '==', 4), 276 parser.Atom(1, '==', 2)], 277 ]) 278 279 def test_parse_empty_argument_expression(self): 280 """Reject empty argument expressions.""" 281 with self.assertRaisesRegex(parser.ParseException, 282 'empty argument expression'): 283 self.parser.parse_argument_expression( 284 self._tokenize('arg0 in 0xffff ||')) 285 286 def test_parse_empty_clause(self): 287 """Reject empty clause.""" 288 with self.assertRaisesRegex(parser.ParseException, 'empty clause'): 289 self.parser.parse_argument_expression( 290 self._tokenize('arg0 in 0xffff &&')) 291 292 def test_parse_invalid_argument(self): 293 """Reject invalid argument.""" 294 with self.assertRaisesRegex(parser.ParseException, 'invalid argument'): 295 self.parser.parse_argument_expression( 296 self._tokenize('argX in 0xffff')) 297 298 def test_parse_invalid_operator(self): 299 """Reject invalid operator.""" 300 with self.assertRaisesRegex(parser.ParseException, 'invalid operator'): 301 self.parser.parse_argument_expression( 302 self._tokenize('arg0 = 0xffff')) 303 304 def test_parse_missing_operator(self): 305 """Reject missing operator.""" 306 with self.assertRaisesRegex(parser.ParseException, 'missing operator'): 307 self.parser.parse_argument_expression(self._tokenize('arg0')) 308 309 def test_parse_missing_operand(self): 310 """Reject missing operand.""" 311 with self.assertRaisesRegex(parser.ParseException, 'empty constant'): 312 self.parser.parse_argument_expression(self._tokenize('arg0 ==')) 313 314 315class ParseFilterTests(unittest.TestCase): 316 """Tests for PolicyParser.parse_filter.""" 317 318 def setUp(self): 319 self.arch = ARCH_64 320 self.parser = parser.PolicyParser( 321 self.arch, kill_action=bpf.KillProcess()) 322 323 def _tokenize(self, line): 324 # pylint: disable=protected-access 325 return list(self.parser._parser_state.tokenize([line]))[0] 326 327 def test_parse_filter(self): 328 """Accept valid filters.""" 329 self.assertEqual( 330 self.parser.parse_filter(self._tokenize('arg0 == 0')), [ 331 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 332 ]) 333 self.assertEqual( 334 self.parser.parse_filter(self._tokenize('kill-process')), [ 335 parser.Filter(None, bpf.KillProcess()), 336 ]) 337 self.assertEqual( 338 self.parser.parse_filter(self._tokenize('kill-thread')), [ 339 parser.Filter(None, bpf.KillThread()), 340 ]) 341 self.assertEqual( 342 self.parser.parse_filter(self._tokenize('trap')), [ 343 parser.Filter(None, bpf.Trap()), 344 ]) 345 self.assertEqual( 346 self.parser.parse_filter(self._tokenize('return ENOSYS')), [ 347 parser.Filter(None, 348 bpf.ReturnErrno(self.arch.constants['ENOSYS'])), 349 ]) 350 self.assertEqual( 351 self.parser.parse_filter(self._tokenize('trace')), [ 352 parser.Filter(None, bpf.Trace()), 353 ]) 354 self.assertEqual( 355 self.parser.parse_filter(self._tokenize('log')), [ 356 parser.Filter(None, bpf.Log()), 357 ]) 358 self.assertEqual( 359 self.parser.parse_filter(self._tokenize('allow')), [ 360 parser.Filter(None, bpf.Allow()), 361 ]) 362 self.assertEqual( 363 self.parser.parse_filter(self._tokenize('1')), [ 364 parser.Filter(None, bpf.Allow()), 365 ]) 366 self.assertEqual( 367 self.parser.parse_filter( 368 self._tokenize( 369 '{ arg0 == 0, arg0 == 1; return ENOSYS, trap }')), 370 [ 371 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 372 parser.Filter([[parser.Atom(0, '==', 1)]], 373 bpf.ReturnErrno(self.arch.constants['ENOSYS'])), 374 parser.Filter(None, bpf.Trap()), 375 ]) 376 377 def test_parse_missing_return_value(self): 378 """Reject missing return value.""" 379 with self.assertRaisesRegex(parser.ParseException, 380 'missing return value'): 381 self.parser.parse_filter(self._tokenize('return')) 382 383 def test_parse_invalid_return_value(self): 384 """Reject invalid return value.""" 385 with self.assertRaisesRegex(parser.ParseException, 'invalid constant'): 386 self.parser.parse_filter(self._tokenize('return arg0')) 387 388 def test_parse_unclosed_brace(self): 389 """Reject unclosed brace.""" 390 with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'): 391 self.parser.parse_filter(self._tokenize('{ allow')) 392 393 394class ParseFilterStatementTests(unittest.TestCase): 395 """Tests for PolicyParser.parse_filter_statement.""" 396 397 def setUp(self): 398 self.arch = ARCH_64 399 self.parser = parser.PolicyParser( 400 self.arch, kill_action=bpf.KillProcess()) 401 402 def _tokenize(self, line): 403 # pylint: disable=protected-access 404 return list(self.parser._parser_state.tokenize([line]))[0] 405 406 def assertEqualIgnoringToken(self, actual, expected, msg=None): 407 """Similar to assertEqual, but ignores the token field.""" 408 if (actual.syscalls != expected.syscalls or 409 actual.filters != expected.filters): 410 self.fail('%r != %r' % (actual, expected), msg) 411 412 def test_parse_filter_statement(self): 413 """Accept valid filter statements.""" 414 self.assertEqualIgnoringToken( 415 self.parser.parse_filter_statement( 416 self._tokenize('read: arg0 == 0')), 417 parser.ParsedFilterStatement( 418 syscalls=(parser.Syscall('read', 0), ), 419 filters=[ 420 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 421 ], 422 token=None)) 423 self.assertEqualIgnoringToken( 424 self.parser.parse_filter_statement( 425 self._tokenize('{read, write}: arg0 == 0')), 426 parser.ParsedFilterStatement( 427 syscalls=( 428 parser.Syscall('read', 0), 429 parser.Syscall('write', 1), 430 ), 431 filters=[ 432 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 433 ], 434 token=None)) 435 self.assertEqualIgnoringToken( 436 self.parser.parse_filter_statement( 437 self._tokenize('io@libc: arg0 == 0')), 438 parser.ParsedFilterStatement( 439 syscalls=( 440 parser.Syscall('read', 0), 441 parser.Syscall('write', 1), 442 ), 443 filters=[ 444 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 445 ], 446 token=None)) 447 self.assertEqualIgnoringToken( 448 self.parser.parse_filter_statement( 449 self._tokenize('file-io@systemd: arg0 == 0')), 450 parser.ParsedFilterStatement( 451 syscalls=( 452 parser.Syscall('read', 0), 453 parser.Syscall('write', 1), 454 ), 455 filters=[ 456 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 457 ], 458 token=None)) 459 self.assertEqualIgnoringToken( 460 self.parser.parse_filter_statement( 461 self._tokenize('kill: arg0 == 0')), 462 parser.ParsedFilterStatement( 463 syscalls=( 464 parser.Syscall('kill', 62), 465 ), 466 filters=[ 467 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 468 ], 469 token=None)) 470 471 def test_parse_metadata(self): 472 """Accept valid filter statements with metadata.""" 473 self.assertEqualIgnoringToken( 474 self.parser.parse_filter_statement( 475 self._tokenize('read[arch=test]: arg0 == 0')), 476 parser.ParsedFilterStatement( 477 syscalls=( 478 parser.Syscall('read', 0), 479 ), 480 filters=[ 481 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 482 ], 483 token=None)) 484 self.assertEqualIgnoringToken( 485 self.parser.parse_filter_statement( 486 self._tokenize( 487 '{read, nonexistent[arch=nonexistent]}: arg0 == 0')), 488 parser.ParsedFilterStatement( 489 syscalls=( 490 parser.Syscall('read', 0), 491 ), 492 filters=[ 493 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 494 ], 495 token=None)) 496 497 def test_parse_unclosed_brace(self): 498 """Reject unclosed brace.""" 499 with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'): 500 self.parser.parse_filter(self._tokenize('{ allow')) 501 502 def test_parse_invalid_syscall_group(self): 503 """Reject invalid syscall groups.""" 504 with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'): 505 self.parser.parse_filter_statement( 506 self._tokenize('{ read, write: arg0 == 0')) 507 508 def test_parse_missing_colon(self): 509 """Reject missing colon.""" 510 with self.assertRaisesRegex(parser.ParseException, 'missing colon'): 511 self.parser.parse_filter_statement(self._tokenize('read')) 512 513 def test_parse_invalid_colon(self): 514 """Reject invalid colon.""" 515 with self.assertRaisesRegex(parser.ParseException, 'invalid colon'): 516 self.parser.parse_filter_statement(self._tokenize('read arg0')) 517 518 def test_parse_missing_filter(self): 519 """Reject missing filter.""" 520 with self.assertRaisesRegex(parser.ParseException, 'missing filter'): 521 self.parser.parse_filter_statement(self._tokenize('read:')) 522 523 524class ParseFileTests(unittest.TestCase): 525 """Tests for PolicyParser.parse_file.""" 526 527 def setUp(self): 528 self.arch = ARCH_64 529 self.parser = parser.PolicyParser( 530 self.arch, kill_action=bpf.KillProcess()) 531 self.tempdir = tempfile.mkdtemp() 532 533 def tearDown(self): 534 shutil.rmtree(self.tempdir) 535 536 def _write_file(self, filename, contents): 537 """Helper to write out a file for testing.""" 538 path = os.path.join(self.tempdir, filename) 539 with open(path, 'w') as outf: 540 outf.write(contents) 541 return path 542 543 def test_parse_simple(self): 544 """Allow simple policy files.""" 545 path = self._write_file( 546 'test.policy', """ 547 # Comment. 548 read: allow 549 write: allow 550 """) 551 552 self.assertEqual( 553 self.parser.parse_file(path), 554 parser.ParsedPolicy( 555 default_action=bpf.KillProcess(), 556 filter_statements=[ 557 parser.FilterStatement( 558 syscall=parser.Syscall('read', 0), 559 frequency=1, 560 filters=[ 561 parser.Filter(None, bpf.Allow()), 562 ]), 563 parser.FilterStatement( 564 syscall=parser.Syscall('write', 1), 565 frequency=1, 566 filters=[ 567 parser.Filter(None, bpf.Allow()), 568 ]), 569 ])) 570 571 def test_parse_multiline(self): 572 """Allow simple multi-line policy files.""" 573 path = self._write_file( 574 'test.policy', """ 575 # Comment. 576 read: \ 577 allow 578 write: allow 579 """) 580 581 self.assertEqual( 582 self.parser.parse_file(path), 583 parser.ParsedPolicy( 584 default_action=bpf.KillProcess(), 585 filter_statements=[ 586 parser.FilterStatement( 587 syscall=parser.Syscall('read', 0), 588 frequency=1, 589 filters=[ 590 parser.Filter(None, bpf.Allow()), 591 ]), 592 parser.FilterStatement( 593 syscall=parser.Syscall('write', 1), 594 frequency=1, 595 filters=[ 596 parser.Filter(None, bpf.Allow()), 597 ]), 598 ])) 599 600 def test_parse_default(self): 601 """Allow defining a default action.""" 602 path = self._write_file( 603 'test.policy', """ 604 @default kill-thread 605 read: allow 606 """) 607 608 self.assertEqual( 609 self.parser.parse_file(path), 610 parser.ParsedPolicy( 611 default_action=bpf.KillThread(), 612 filter_statements=[ 613 parser.FilterStatement( 614 syscall=parser.Syscall('read', 0), 615 frequency=1, 616 filters=[ 617 parser.Filter(None, bpf.Allow()), 618 ]), 619 ])) 620 621 def test_parse_default_permissive(self): 622 """Reject defining a permissive default action.""" 623 path = self._write_file( 624 'test.policy', """ 625 @default log 626 read: allow 627 """) 628 629 with self.assertRaisesRegex(parser.ParseException, 630 r'invalid permissive default action'): 631 self.parser.parse_file(path) 632 633 def test_parse_simple_grouped(self): 634 """Allow simple policy files.""" 635 path = self._write_file( 636 'test.policy', """ 637 # Comment. 638 {read, write}: allow 639 """) 640 641 self.assertEqual( 642 self.parser.parse_file(path), 643 parser.ParsedPolicy( 644 default_action=bpf.KillProcess(), 645 filter_statements=[ 646 parser.FilterStatement( 647 syscall=parser.Syscall('read', 0), 648 frequency=1, 649 filters=[ 650 parser.Filter(None, bpf.Allow()), 651 ]), 652 parser.FilterStatement( 653 syscall=parser.Syscall('write', 1), 654 frequency=1, 655 filters=[ 656 parser.Filter(None, bpf.Allow()), 657 ]), 658 ])) 659 660 def test_parse_other_arch(self): 661 """Allow entries that only target another architecture.""" 662 path = self._write_file( 663 'test.policy', """ 664 # Comment. 665 read[arch=nonexistent]: allow 666 write: allow 667 """) 668 669 self.assertEqual( 670 self.parser.parse_file(path), 671 parser.ParsedPolicy( 672 default_action=bpf.KillProcess(), 673 filter_statements=[ 674 parser.FilterStatement( 675 syscall=parser.Syscall('write', 1), 676 frequency=1, 677 filters=[ 678 parser.Filter(None, bpf.Allow()), 679 ]), 680 ])) 681 682 def test_parse_include(self): 683 """Allow including policy files.""" 684 path = self._write_file( 685 'test.include.policy', """ 686 {read, write}: arg0 == 0; allow 687 """) 688 path = self._write_file( 689 'test.policy', """ 690 @include ./test.include.policy 691 read: return ENOSYS 692 """) 693 694 self.assertEqual( 695 self.parser.parse_file(path), 696 parser.ParsedPolicy( 697 default_action=bpf.KillProcess(), 698 filter_statements=[ 699 parser.FilterStatement( 700 syscall=parser.Syscall('read', 0), 701 frequency=1, 702 filters=[ 703 parser.Filter([[parser.Atom(0, '==', 0)]], 704 bpf.Allow()), 705 parser.Filter( 706 None, 707 bpf.ReturnErrno( 708 self.arch.constants['ENOSYS'])), 709 ]), 710 parser.FilterStatement( 711 syscall=parser.Syscall('write', 1), 712 frequency=1, 713 filters=[ 714 parser.Filter([[parser.Atom(0, '==', 0)]], 715 bpf.Allow()), 716 parser.Filter(None, bpf.KillProcess()), 717 ]), 718 ])) 719 720 def test_parse_invalid_include(self): 721 """Reject including invalid policy files.""" 722 with self.assertRaisesRegex(parser.ParseException, 723 r'empty include path'): 724 path = self._write_file( 725 'test.policy', """ 726 @include 727 """) 728 self.parser.parse_file(path) 729 730 with self.assertRaisesRegex(parser.ParseException, 731 r'invalid include path'): 732 path = self._write_file( 733 'test.policy', """ 734 @include arg0 735 """) 736 self.parser.parse_file(path) 737 738 with self.assertRaisesRegex(parser.ParseException, 739 r'@include statement nested too deep'): 740 path = self._write_file( 741 'test.policy', """ 742 @include ./test.policy 743 """) 744 self.parser.parse_file(path) 745 746 with self.assertRaisesRegex(parser.ParseException, 747 r'Could not @include .*'): 748 path = self._write_file( 749 'test.policy', """ 750 @include ./nonexistent.policy 751 """) 752 self.parser.parse_file(path) 753 754 def test_parse_frequency(self): 755 """Allow including frequency files.""" 756 self._write_file( 757 'test.frequency', """ 758 read: 2 759 write: 3 760 """) 761 path = self._write_file( 762 'test.policy', """ 763 @frequency ./test.frequency 764 read: allow 765 """) 766 767 self.assertEqual( 768 self.parser.parse_file(path), 769 parser.ParsedPolicy( 770 default_action=bpf.KillProcess(), 771 filter_statements=[ 772 parser.FilterStatement( 773 syscall=parser.Syscall('read', 0), 774 frequency=2, 775 filters=[ 776 parser.Filter(None, bpf.Allow()), 777 ]), 778 ])) 779 780 def test_parse_invalid_frequency(self): 781 """Reject including invalid frequency files.""" 782 path = self._write_file('test.policy', 783 """@frequency ./test.frequency""") 784 785 with self.assertRaisesRegex(parser.ParseException, r'missing colon'): 786 self._write_file('test.frequency', """ 787 read 788 """) 789 self.parser.parse_file(path) 790 791 with self.assertRaisesRegex(parser.ParseException, r'invalid colon'): 792 self._write_file('test.frequency', """ 793 read foo 794 """) 795 self.parser.parse_file(path) 796 797 with self.assertRaisesRegex(parser.ParseException, r'missing number'): 798 self._write_file('test.frequency', """ 799 read: 800 """) 801 self.parser.parse_file(path) 802 803 with self.assertRaisesRegex(parser.ParseException, r'invalid number'): 804 self._write_file('test.frequency', """ 805 read: foo 806 """) 807 self.parser.parse_file(path) 808 809 with self.assertRaisesRegex(parser.ParseException, r'invalid number'): 810 self._write_file('test.frequency', """ 811 read: -1 812 """) 813 self.parser.parse_file(path) 814 815 with self.assertRaisesRegex(parser.ParseException, 816 r'empty frequency path'): 817 path = self._write_file( 818 'test.policy', """ 819 @frequency 820 """) 821 self.parser.parse_file(path) 822 823 with self.assertRaisesRegex(parser.ParseException, 824 r'invalid frequency path'): 825 path = self._write_file( 826 'test.policy', """ 827 @frequency arg0 828 """) 829 self.parser.parse_file(path) 830 831 with self.assertRaisesRegex(parser.ParseException, 832 r'Could not open frequency file.*'): 833 path = self._write_file( 834 'test.policy', """ 835 @frequency ./nonexistent.frequency 836 """) 837 self.parser.parse_file(path) 838 839 def test_parse_multiple_unconditional(self): 840 """Reject actions after an unconditional action.""" 841 path = self._write_file( 842 'test.policy', """ 843 read: allow 844 read: allow 845 """) 846 847 with self.assertRaisesRegex( 848 parser.ParseException, 849 (r'test.policy\(3:17\): ' 850 r'Syscall read.*already had an unconditional action ' 851 r'applied')): 852 self.parser.parse_file(path) 853 854 path = self._write_file( 855 'test.policy', """ 856 read: log 857 read: arg0 == 0; log 858 """) 859 860 with self.assertRaisesRegex( 861 parser.ParseException, 862 (r'test.policy\(3:17\): ' 863 r'Syscall read.*already had an unconditional action ' 864 r'applied')): 865 self.parser.parse_file(path) 866 867 868if __name__ == '__main__': 869 unittest.main() 870