1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (C) 2018 The Android Open Source Project 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17"""Unittests for the parser module.""" 18 19from __future__ import absolute_import 20from __future__ import division 21from __future__ import print_function 22 23import os 24import shutil 25import tempfile 26import unittest 27 28import arch 29import bpf 30import parser # pylint: disable=wrong-import-order 31 32ARCH_64 = arch.Arch.load_from_json( 33 os.path.join( 34 os.path.dirname(os.path.abspath(__file__)), 'testdata/arch_64.json')) 35 36 37class TokenizerTests(unittest.TestCase): 38 """Tests for ParserState.tokenize.""" 39 40 @staticmethod 41 def _tokenize(line): 42 parser_state = parser.ParserState('<memory>') 43 return list(parser_state.tokenize([line]))[0] 44 45 def test_tokenize(self): 46 """Accept valid tokens.""" 47 self.assertEqual([ 48 (token.type, token.value) 49 for token in TokenizerTests._tokenize('@include /minijail.policy') 50 ], [ 51 ('INCLUDE', '@include'), 52 ('PATH', '/minijail.policy'), 53 ]) 54 self.assertEqual([ 55 (token.type, token.value) 56 for token in TokenizerTests._tokenize('@include ./minijail.policy') 57 ], [ 58 ('INCLUDE', '@include'), 59 ('PATH', './minijail.policy'), 60 ]) 61 self.assertEqual( 62 [(token.type, token.value) for token in TokenizerTests._tokenize( 63 'read: arg0 in ~0xffff || arg0 & (1|2) && arg0 == 0755; ' 64 'return ENOSYS # ignored')], [ 65 ('IDENTIFIER', 'read'), 66 ('COLON', ':'), 67 ('ARGUMENT', 'arg0'), 68 ('OP', 'in'), 69 ('BITWISE_COMPLEMENT', '~'), 70 ('NUMERIC_CONSTANT', '0xffff'), 71 ('OR', '||'), 72 ('ARGUMENT', 'arg0'), 73 ('OP', '&'), 74 ('LPAREN', '('), 75 ('NUMERIC_CONSTANT', '1'), 76 ('BITWISE_OR', '|'), 77 ('NUMERIC_CONSTANT', '2'), 78 ('RPAREN', ')'), 79 ('AND', '&&'), 80 ('ARGUMENT', 'arg0'), 81 ('OP', '=='), 82 ('NUMERIC_CONSTANT', '0755'), 83 ('SEMICOLON', ';'), 84 ('RETURN', 'return'), 85 ('IDENTIFIER', 'ENOSYS'), 86 ]) 87 # Ensure that tokens that have an otherwise valid token as prefix are 88 # still matched correctly. 89 self.assertEqual([ 90 (token.type, token.value) 91 for token in TokenizerTests._tokenize( 92 'inotify_wait return_sys killall trace_sys') 93 ], [ 94 ('IDENTIFIER', 'inotify_wait'), 95 ('IDENTIFIER', 'return_sys'), 96 ('IDENTIFIER', 'killall'), 97 ('IDENTIFIER', 'trace_sys'), 98 ]) 99 100 def test_tokenize_invalid_token(self): 101 """Reject tokenizer errors.""" 102 with self.assertRaisesRegex(parser.ParseException, 103 (r'<memory>\(1:1\): invalid token\n' 104 r' %invalid-token%\n' 105 r' \^')): 106 TokenizerTests._tokenize('%invalid-token%') 107 108 109class ParseConstantTests(unittest.TestCase): 110 """Tests for PolicyParser.parse_value.""" 111 112 def setUp(self): 113 self.arch = ARCH_64 114 self.parser = parser.PolicyParser( 115 self.arch, kill_action=bpf.KillProcess()) 116 117 def _tokenize(self, line): 118 # pylint: disable=protected-access 119 return list(self.parser._parser_state.tokenize([line]))[0] 120 121 def test_parse_constant_unsigned(self): 122 """Accept reasonably-sized unsigned constants.""" 123 self.assertEqual( 124 self.parser.parse_value(self._tokenize('0x80000000')), 0x80000000) 125 if self.arch.bits == 64: 126 self.assertEqual( 127 self.parser.parse_value(self._tokenize('0x8000000000000000')), 128 0x8000000000000000) 129 130 def test_parse_constant_unsigned_too_big(self): 131 """Reject unreasonably-sized unsigned constants.""" 132 if self.arch.bits == 32: 133 with self.assertRaisesRegex(parser.ParseException, 134 'unsigned overflow'): 135 self.parser.parse_value(self._tokenize('0x100000000')) 136 with self.assertRaisesRegex(parser.ParseException, 137 'unsigned overflow'): 138 self.parser.parse_value(self._tokenize('0x10000000000000000')) 139 140 def test_parse_constant_signed(self): 141 """Accept reasonably-sized signed constants.""" 142 self.assertEqual( 143 self.parser.parse_value(self._tokenize('-1')), 144 self.arch.max_unsigned) 145 146 def test_parse_constant_signed_too_negative(self): 147 """Reject unreasonably-sized signed constants.""" 148 if self.arch.bits == 32: 149 with self.assertRaisesRegex(parser.ParseException, 150 'signed underflow'): 151 self.parser.parse_value(self._tokenize('-0x800000001')) 152 with self.assertRaisesRegex(parser.ParseException, 'signed underflow'): 153 self.parser.parse_value(self._tokenize('-0x8000000000000001')) 154 155 def test_parse_mask(self): 156 """Accept parsing a mask value.""" 157 self.assertEqual( 158 self.parser.parse_value(self._tokenize('0x1|0x2|0x4|0x8')), 0xf) 159 160 def test_parse_parenthesized_expressions(self): 161 """Accept parsing parenthesized expressions.""" 162 bad_expressions = [ 163 '(1', 164 '|(1)', 165 '(1)|', 166 '()', 167 '(', 168 '((', 169 '(()', 170 '(()1', 171 ] 172 for expression in bad_expressions: 173 with self.assertRaises(parser.ParseException, msg=expression): 174 self.parser.parse_value(self._tokenize(expression)) 175 176 bad_partial_expressions = [ 177 '1)', 178 '(1)1', 179 '1(0)', 180 ] 181 for expression in bad_partial_expressions: 182 tokens = self._tokenize(expression) 183 self.parser.parse_value(tokens) 184 self.assertNotEqual(tokens, []) 185 186 good_expressions = [ 187 '(3)', 188 '(1)|2', 189 '1|(2)', 190 '(1)|(2)', 191 '((3))', 192 '0|(1|2)', 193 '(0|1|2)', 194 ] 195 for expression in good_expressions: 196 self.assertEqual( 197 self.parser.parse_value(self._tokenize(expression)), 3) 198 199 def test_parse_constant_complements(self): 200 """Accept complementing constants.""" 201 self.assertEqual( 202 self.parser.parse_value(self._tokenize('~0')), 203 self.arch.max_unsigned) 204 self.assertEqual( 205 self.parser.parse_value(self._tokenize('~0|~0')), 206 self.arch.max_unsigned) 207 if self.arch.bits == 32: 208 self.assertEqual( 209 self.parser.parse_value( 210 self._tokenize('~0x005AF0FF|~0xFFA50FFF')), 0xFFFFFF00) 211 self.assertEqual( 212 self.parser.parse_value( 213 self._tokenize('0x0F|~(0x005AF000|0x00A50FFF)|0xF0')), 214 0xFF0000FF) 215 else: 216 self.assertEqual( 217 self.parser.parse_value( 218 self._tokenize('~0x00005A5AF0F0FFFF|~0xFFFFA5A50F0FFFFF')), 219 0xFFFFFFFFFFFF0000) 220 self.assertEqual( 221 self.parser.parse_value( 222 self._tokenize( 223 '0x00FF|~(0x00005A5AF0F00000|0x0000A5A50F0FFFFF)|0xFF00' 224 )), 0xFFFF00000000FFFF) 225 226 def test_parse_double_complement(self): 227 """Reject double-complementing constants.""" 228 with self.assertRaisesRegex(parser.ParseException, 229 'double complement'): 230 self.parser.parse_value(self._tokenize('~~0')) 231 232 def test_parse_empty_complement(self): 233 """Reject complementing nothing.""" 234 with self.assertRaisesRegex(parser.ParseException, 'empty complement'): 235 self.parser.parse_value(self._tokenize('0|~')) 236 237 def test_parse_named_constant(self): 238 """Accept parsing a named constant.""" 239 self.assertEqual( 240 self.parser.parse_value(self._tokenize('O_RDONLY')), 0) 241 242 def test_parse_empty_constant(self): 243 """Reject parsing nothing.""" 244 with self.assertRaisesRegex(parser.ParseException, 'empty constant'): 245 self.parser.parse_value([]) 246 with self.assertRaisesRegex(parser.ParseException, 'empty constant'): 247 self.parser.parse_value(self._tokenize('0|')) 248 249 def test_parse_invalid_constant(self): 250 """Reject parsing invalid constants.""" 251 with self.assertRaisesRegex(parser.ParseException, 'invalid constant'): 252 self.parser.parse_value(self._tokenize('foo')) 253 254 255class ParseFilterExpressionTests(unittest.TestCase): 256 """Tests for PolicyParser.parse_argument_expression.""" 257 258 def setUp(self): 259 self.arch = ARCH_64 260 self.parser = parser.PolicyParser( 261 self.arch, kill_action=bpf.KillProcess()) 262 263 def _tokenize(self, line): 264 # pylint: disable=protected-access 265 return list(self.parser._parser_state.tokenize([line]))[0] 266 267 def test_parse_argument_expression(self): 268 """Accept valid argument expressions.""" 269 self.assertEqual( 270 self.parser.parse_argument_expression( 271 self._tokenize( 272 'arg0 in 0xffff || arg0 == PROT_EXEC && arg1 == PROT_WRITE' 273 )), [ 274 [parser.Atom(0, 'in', 0xffff)], 275 [parser.Atom(0, '==', 4), 276 parser.Atom(1, '==', 2)], 277 ]) 278 279 def test_parse_number_argument_expression(self): 280 """Accept valid argument expressions with any octal/decimal/hex number.""" 281 # 4607 == 010777 == 0x11ff 282 self.assertEqual( 283 self.parser.parse_argument_expression( 284 self._tokenize('arg0 in 4607')), [ 285 [parser.Atom(0, 'in', 4607)], 286 ]) 287 288 self.assertEqual( 289 self.parser.parse_argument_expression( 290 self._tokenize('arg0 in 010777')), [ 291 [parser.Atom(0, 'in', 4607)], 292 ]) 293 294 self.assertEqual( 295 self.parser.parse_argument_expression( 296 self._tokenize('arg0 in 0x11ff')), [ 297 [parser.Atom(0, 'in', 4607)], 298 ]) 299 300 def test_parse_empty_argument_expression(self): 301 """Reject empty argument expressions.""" 302 with self.assertRaisesRegex(parser.ParseException, 303 'empty argument expression'): 304 self.parser.parse_argument_expression( 305 self._tokenize('arg0 in 0xffff ||')) 306 307 def test_parse_empty_clause(self): 308 """Reject empty clause.""" 309 with self.assertRaisesRegex(parser.ParseException, 'empty clause'): 310 self.parser.parse_argument_expression( 311 self._tokenize('arg0 in 0xffff &&')) 312 313 def test_parse_invalid_argument(self): 314 """Reject invalid argument.""" 315 with self.assertRaisesRegex(parser.ParseException, 'invalid argument'): 316 self.parser.parse_argument_expression( 317 self._tokenize('argX in 0xffff')) 318 319 def test_parse_invalid_operator(self): 320 """Reject invalid operator.""" 321 with self.assertRaisesRegex(parser.ParseException, 'invalid operator'): 322 self.parser.parse_argument_expression( 323 self._tokenize('arg0 = 0xffff')) 324 325 def test_parse_missing_operator(self): 326 """Reject missing operator.""" 327 with self.assertRaisesRegex(parser.ParseException, 'missing operator'): 328 self.parser.parse_argument_expression(self._tokenize('arg0')) 329 330 def test_parse_missing_operand(self): 331 """Reject missing operand.""" 332 with self.assertRaisesRegex(parser.ParseException, 'empty constant'): 333 self.parser.parse_argument_expression(self._tokenize('arg0 ==')) 334 335 336class ParseFilterTests(unittest.TestCase): 337 """Tests for PolicyParser.parse_filter.""" 338 339 def setUp(self): 340 self.arch = ARCH_64 341 self.parser = parser.PolicyParser( 342 self.arch, kill_action=bpf.KillProcess()) 343 344 def _tokenize(self, line): 345 # pylint: disable=protected-access 346 return list(self.parser._parser_state.tokenize([line]))[0] 347 348 def test_parse_filter(self): 349 """Accept valid filters.""" 350 self.assertEqual( 351 self.parser.parse_filter(self._tokenize('arg0 == 0')), [ 352 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 353 ]) 354 self.assertEqual( 355 self.parser.parse_filter(self._tokenize('kill-process')), [ 356 parser.Filter(None, bpf.KillProcess()), 357 ]) 358 self.assertEqual( 359 self.parser.parse_filter(self._tokenize('kill-thread')), [ 360 parser.Filter(None, bpf.KillThread()), 361 ]) 362 self.assertEqual( 363 self.parser.parse_filter(self._tokenize('trap')), [ 364 parser.Filter(None, bpf.Trap()), 365 ]) 366 self.assertEqual( 367 self.parser.parse_filter(self._tokenize('return ENOSYS')), [ 368 parser.Filter(None, 369 bpf.ReturnErrno(self.arch.constants['ENOSYS'])), 370 ]) 371 self.assertEqual( 372 self.parser.parse_filter(self._tokenize('trace')), [ 373 parser.Filter(None, bpf.Trace()), 374 ]) 375 self.assertEqual( 376 self.parser.parse_filter(self._tokenize('user-notify')), [ 377 parser.Filter(None, bpf.UserNotify()), 378 ]) 379 self.assertEqual( 380 self.parser.parse_filter(self._tokenize('log')), [ 381 parser.Filter(None, bpf.Log()), 382 ]) 383 self.assertEqual( 384 self.parser.parse_filter(self._tokenize('allow')), [ 385 parser.Filter(None, bpf.Allow()), 386 ]) 387 self.assertEqual( 388 self.parser.parse_filter(self._tokenize('1')), [ 389 parser.Filter(None, bpf.Allow()), 390 ]) 391 self.assertEqual( 392 self.parser.parse_filter( 393 self._tokenize( 394 '{ arg0 == 0, arg0 == 1; return ENOSYS, trap }')), 395 [ 396 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 397 parser.Filter([[parser.Atom(0, '==', 1)]], 398 bpf.ReturnErrno(self.arch.constants['ENOSYS'])), 399 parser.Filter(None, bpf.Trap()), 400 ]) 401 402 def test_parse_missing_return_value(self): 403 """Reject missing return value.""" 404 with self.assertRaisesRegex(parser.ParseException, 405 'missing return value'): 406 self.parser.parse_filter(self._tokenize('return')) 407 408 def test_parse_invalid_return_value(self): 409 """Reject invalid return value.""" 410 with self.assertRaisesRegex(parser.ParseException, 'invalid constant'): 411 self.parser.parse_filter(self._tokenize('return arg0')) 412 413 def test_parse_unclosed_brace(self): 414 """Reject unclosed brace.""" 415 with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'): 416 self.parser.parse_filter(self._tokenize('{ allow')) 417 418 419class ParseFilterDenylistTests(unittest.TestCase): 420 """Tests for PolicyParser.parse_filter with a denylist policy.""" 421 422 def setUp(self): 423 self.arch = ARCH_64 424 self.kill_action = bpf.KillProcess() 425 self.parser = parser.PolicyParser( 426 self.arch, kill_action=self.kill_action, denylist=True) 427 428 def _tokenize(self, line): 429 # pylint: disable=protected-access 430 return list(self.parser._parser_state.tokenize([line]))[0] 431 432 def test_parse_filter(self): 433 """Accept only filters that return an errno.""" 434 self.assertEqual( 435 self.parser.parse_filter(self._tokenize('arg0 == 0; return ENOSYS')), 436 [ 437 parser.Filter([[parser.Atom(0, '==', 0)]], 438 bpf.ReturnErrno(self.arch.constants['ENOSYS'])), 439 ]) 440 441 442class ParseFilterStatementTests(unittest.TestCase): 443 """Tests for PolicyParser.parse_filter_statement.""" 444 445 def setUp(self): 446 self.arch = ARCH_64 447 self.parser = parser.PolicyParser( 448 self.arch, kill_action=bpf.KillProcess()) 449 450 def _tokenize(self, line): 451 # pylint: disable=protected-access 452 return list(self.parser._parser_state.tokenize([line]))[0] 453 454 def assertEqualIgnoringToken(self, actual, expected, msg=None): 455 """Similar to assertEqual, but ignores the token field.""" 456 if (actual.syscalls != expected.syscalls or 457 actual.filters != expected.filters): 458 self.fail('%r != %r' % (actual, expected), msg) 459 460 def test_parse_filter_statement(self): 461 """Accept valid filter statements.""" 462 self.assertEqualIgnoringToken( 463 self.parser.parse_filter_statement( 464 self._tokenize('read: arg0 == 0')), 465 parser.ParsedFilterStatement( 466 syscalls=(parser.Syscall('read', 0), ), 467 filters=[ 468 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 469 ], 470 token=None)) 471 self.assertEqualIgnoringToken( 472 self.parser.parse_filter_statement( 473 self._tokenize('{read, write}: arg0 == 0')), 474 parser.ParsedFilterStatement( 475 syscalls=( 476 parser.Syscall('read', 0), 477 parser.Syscall('write', 1), 478 ), 479 filters=[ 480 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 481 ], 482 token=None)) 483 self.assertEqualIgnoringToken( 484 self.parser.parse_filter_statement( 485 self._tokenize('io@libc: arg0 == 0')), 486 parser.ParsedFilterStatement( 487 syscalls=( 488 parser.Syscall('read', 0), 489 parser.Syscall('write', 1), 490 ), 491 filters=[ 492 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 493 ], 494 token=None)) 495 self.assertEqualIgnoringToken( 496 self.parser.parse_filter_statement( 497 self._tokenize('file-io@systemd: arg0 == 0')), 498 parser.ParsedFilterStatement( 499 syscalls=( 500 parser.Syscall('read', 0), 501 parser.Syscall('write', 1), 502 ), 503 filters=[ 504 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 505 ], 506 token=None)) 507 self.assertEqualIgnoringToken( 508 self.parser.parse_filter_statement( 509 self._tokenize('kill: arg0 == 0')), 510 parser.ParsedFilterStatement( 511 syscalls=( 512 parser.Syscall('kill', 62), 513 ), 514 filters=[ 515 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 516 ], 517 token=None)) 518 519 def test_parse_metadata(self): 520 """Accept valid filter statements with metadata.""" 521 self.assertEqualIgnoringToken( 522 self.parser.parse_filter_statement( 523 self._tokenize('read[arch=test]: arg0 == 0')), 524 parser.ParsedFilterStatement( 525 syscalls=( 526 parser.Syscall('read', 0), 527 ), 528 filters=[ 529 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 530 ], 531 token=None)) 532 self.assertEqualIgnoringToken( 533 self.parser.parse_filter_statement( 534 self._tokenize( 535 '{read, nonexistent[arch=nonexistent]}: arg0 == 0')), 536 parser.ParsedFilterStatement( 537 syscalls=( 538 parser.Syscall('read', 0), 539 ), 540 filters=[ 541 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 542 ], 543 token=None)) 544 545 def test_parse_unclosed_brace(self): 546 """Reject unclosed brace.""" 547 with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'): 548 self.parser.parse_filter(self._tokenize('{ allow')) 549 550 def test_parse_invalid_syscall_group(self): 551 """Reject invalid syscall groups.""" 552 with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'): 553 self.parser.parse_filter_statement( 554 self._tokenize('{ read, write: arg0 == 0')) 555 556 def test_parse_missing_colon(self): 557 """Reject missing colon.""" 558 with self.assertRaisesRegex(parser.ParseException, 'missing colon'): 559 self.parser.parse_filter_statement(self._tokenize('read')) 560 561 def test_parse_invalid_colon(self): 562 """Reject invalid colon.""" 563 with self.assertRaisesRegex(parser.ParseException, 'invalid colon'): 564 self.parser.parse_filter_statement(self._tokenize('read arg0')) 565 566 def test_parse_missing_filter(self): 567 """Reject missing filter.""" 568 with self.assertRaisesRegex(parser.ParseException, 'missing filter'): 569 self.parser.parse_filter_statement(self._tokenize('read:')) 570 571 572class ParseFileTests(unittest.TestCase): 573 """Tests for PolicyParser.parse_file.""" 574 575 def setUp(self): 576 self.arch = ARCH_64 577 self.parser = parser.PolicyParser( 578 self.arch, kill_action=bpf.KillProcess()) 579 self.tempdir = tempfile.mkdtemp() 580 581 def tearDown(self): 582 shutil.rmtree(self.tempdir) 583 584 def _write_file(self, filename, contents): 585 """Helper to write out a file for testing.""" 586 path = os.path.join(self.tempdir, filename) 587 with open(path, 'w') as outf: 588 outf.write(contents) 589 return path 590 591 def test_parse_simple(self): 592 """Allow simple policy files.""" 593 path = self._write_file( 594 'test.policy', """ 595 # Comment. 596 read: allow 597 write: allow 598 """) 599 600 self.assertEqual( 601 self.parser.parse_file(path), 602 parser.ParsedPolicy( 603 default_action=bpf.KillProcess(), 604 filter_statements=[ 605 parser.FilterStatement( 606 syscall=parser.Syscall('read', 0), 607 frequency=1, 608 filters=[ 609 parser.Filter(None, bpf.Allow()), 610 ]), 611 parser.FilterStatement( 612 syscall=parser.Syscall('write', 1), 613 frequency=1, 614 filters=[ 615 parser.Filter(None, bpf.Allow()), 616 ]), 617 ])) 618 619 def test_parse_multiline(self): 620 """Allow simple multi-line policy files.""" 621 path = self._write_file( 622 'test.policy', """ 623 # Comment. 624 read: \ 625 allow 626 write: allow 627 """) 628 629 self.assertEqual( 630 self.parser.parse_file(path), 631 parser.ParsedPolicy( 632 default_action=bpf.KillProcess(), 633 filter_statements=[ 634 parser.FilterStatement( 635 syscall=parser.Syscall('read', 0), 636 frequency=1, 637 filters=[ 638 parser.Filter(None, bpf.Allow()), 639 ]), 640 parser.FilterStatement( 641 syscall=parser.Syscall('write', 1), 642 frequency=1, 643 filters=[ 644 parser.Filter(None, bpf.Allow()), 645 ]), 646 ])) 647 648 def test_parse_default(self): 649 """Allow defining a default action.""" 650 path = self._write_file( 651 'test.policy', """ 652 @default kill-thread 653 read: allow 654 """) 655 656 self.assertEqual( 657 self.parser.parse_file(path), 658 parser.ParsedPolicy( 659 default_action=bpf.KillThread(), 660 filter_statements=[ 661 parser.FilterStatement( 662 syscall=parser.Syscall('read', 0), 663 frequency=1, 664 filters=[ 665 parser.Filter(None, bpf.Allow()), 666 ]), 667 ])) 668 669 def test_parse_default_permissive(self): 670 """Reject defining a permissive default action.""" 671 path = self._write_file( 672 'test.policy', """ 673 @default log 674 read: allow 675 """) 676 677 with self.assertRaisesRegex(parser.ParseException, 678 r'invalid permissive default action'): 679 self.parser.parse_file(path) 680 681 def test_parse_simple_grouped(self): 682 """Allow simple policy files.""" 683 path = self._write_file( 684 'test.policy', """ 685 # Comment. 686 {read, write}: allow 687 """) 688 689 self.assertEqual( 690 self.parser.parse_file(path), 691 parser.ParsedPolicy( 692 default_action=bpf.KillProcess(), 693 filter_statements=[ 694 parser.FilterStatement( 695 syscall=parser.Syscall('read', 0), 696 frequency=1, 697 filters=[ 698 parser.Filter(None, bpf.Allow()), 699 ]), 700 parser.FilterStatement( 701 syscall=parser.Syscall('write', 1), 702 frequency=1, 703 filters=[ 704 parser.Filter(None, bpf.Allow()), 705 ]), 706 ])) 707 708 def test_parse_other_arch(self): 709 """Allow entries that only target another architecture.""" 710 path = self._write_file( 711 'test.policy', """ 712 # Comment. 713 read[arch=nonexistent]: allow 714 write: allow 715 """) 716 717 self.assertEqual( 718 self.parser.parse_file(path), 719 parser.ParsedPolicy( 720 default_action=bpf.KillProcess(), 721 filter_statements=[ 722 parser.FilterStatement( 723 syscall=parser.Syscall('write', 1), 724 frequency=1, 725 filters=[ 726 parser.Filter(None, bpf.Allow()), 727 ]), 728 ])) 729 730 def test_parse_include(self): 731 """Allow including policy files.""" 732 path = self._write_file( 733 'test.include.policy', """ 734 {read, write}: arg0 == 0; allow 735 """) 736 path = self._write_file( 737 'test.policy', """ 738 @include ./test.include.policy 739 read: return ENOSYS 740 """) 741 742 self.assertEqual( 743 self.parser.parse_file(path), 744 parser.ParsedPolicy( 745 default_action=bpf.KillProcess(), 746 filter_statements=[ 747 parser.FilterStatement( 748 syscall=parser.Syscall('read', 0), 749 frequency=1, 750 filters=[ 751 parser.Filter([[parser.Atom(0, '==', 0)]], 752 bpf.Allow()), 753 parser.Filter( 754 None, 755 bpf.ReturnErrno( 756 self.arch.constants['ENOSYS'])), 757 ]), 758 parser.FilterStatement( 759 syscall=parser.Syscall('write', 1), 760 frequency=1, 761 filters=[ 762 parser.Filter([[parser.Atom(0, '==', 0)]], 763 bpf.Allow()), 764 parser.Filter(None, bpf.KillProcess()), 765 ]), 766 ])) 767 768 def test_parse_invalid_include(self): 769 """Reject including invalid policy files.""" 770 with self.assertRaisesRegex(parser.ParseException, 771 r'empty include path'): 772 path = self._write_file( 773 'test.policy', """ 774 @include 775 """) 776 self.parser.parse_file(path) 777 778 with self.assertRaisesRegex(parser.ParseException, 779 r'invalid include path'): 780 path = self._write_file( 781 'test.policy', """ 782 @include arg0 783 """) 784 self.parser.parse_file(path) 785 786 with self.assertRaisesRegex(parser.ParseException, 787 r'@include statement nested too deep'): 788 path = self._write_file( 789 'test.policy', """ 790 @include ./test.policy 791 """) 792 self.parser.parse_file(path) 793 794 with self.assertRaisesRegex(parser.ParseException, 795 r'Could not @include .*'): 796 path = self._write_file( 797 'test.policy', """ 798 @include ./nonexistent.policy 799 """) 800 self.parser.parse_file(path) 801 802 def test_parse_frequency(self): 803 """Allow including frequency files.""" 804 self._write_file( 805 'test.frequency', """ 806 read: 2 807 write: 3 808 """) 809 path = self._write_file( 810 'test.policy', """ 811 @frequency ./test.frequency 812 read: allow 813 """) 814 815 self.assertEqual( 816 self.parser.parse_file(path), 817 parser.ParsedPolicy( 818 default_action=bpf.KillProcess(), 819 filter_statements=[ 820 parser.FilterStatement( 821 syscall=parser.Syscall('read', 0), 822 frequency=2, 823 filters=[ 824 parser.Filter(None, bpf.Allow()), 825 ]), 826 ])) 827 828 def test_parse_invalid_frequency(self): 829 """Reject including invalid frequency files.""" 830 path = self._write_file('test.policy', 831 """@frequency ./test.frequency""") 832 833 with self.assertRaisesRegex(parser.ParseException, r'missing colon'): 834 self._write_file('test.frequency', """ 835 read 836 """) 837 self.parser.parse_file(path) 838 839 with self.assertRaisesRegex(parser.ParseException, r'invalid colon'): 840 self._write_file('test.frequency', """ 841 read foo 842 """) 843 self.parser.parse_file(path) 844 845 with self.assertRaisesRegex(parser.ParseException, r'missing number'): 846 self._write_file('test.frequency', """ 847 read: 848 """) 849 self.parser.parse_file(path) 850 851 with self.assertRaisesRegex(parser.ParseException, r'invalid number'): 852 self._write_file('test.frequency', """ 853 read: foo 854 """) 855 self.parser.parse_file(path) 856 857 with self.assertRaisesRegex(parser.ParseException, r'invalid number'): 858 self._write_file('test.frequency', """ 859 read: -1 860 """) 861 self.parser.parse_file(path) 862 863 with self.assertRaisesRegex(parser.ParseException, 864 r'empty frequency path'): 865 path = self._write_file( 866 'test.policy', """ 867 @frequency 868 """) 869 self.parser.parse_file(path) 870 871 with self.assertRaisesRegex(parser.ParseException, 872 r'invalid frequency path'): 873 path = self._write_file( 874 'test.policy', """ 875 @frequency arg0 876 """) 877 self.parser.parse_file(path) 878 879 with self.assertRaisesRegex(parser.ParseException, 880 r'Could not open frequency file.*'): 881 path = self._write_file( 882 'test.policy', """ 883 @frequency ./nonexistent.frequency 884 """) 885 self.parser.parse_file(path) 886 887 def test_parse_multiple_unconditional(self): 888 """Reject actions after an unconditional action.""" 889 path = self._write_file( 890 'test.policy', """ 891 read: allow 892 read: allow 893 """) 894 895 with self.assertRaisesRegex( 896 parser.ParseException, 897 (r'test.policy\(3:17\): ' 898 r'Syscall read.*already had an unconditional action ' 899 r'applied')): 900 self.parser.parse_file(path) 901 902 path = self._write_file( 903 'test.policy', """ 904 read: log 905 read: arg0 == 0; log 906 """) 907 908 with self.assertRaisesRegex( 909 parser.ParseException, 910 (r'test.policy\(3:17\): ' 911 r'Syscall read.*already had an unconditional action ' 912 r'applied')): 913 self.parser.parse_file(path) 914 915 def test_parse_allowlist_denylist_header(self): 916 """Reject trying to compile denylist policy file as allowlist.""" 917 with self.assertRaisesRegex(parser.ParseException, 918 r'policy is denylist, but flag --denylist ' 919 'not passed in'): 920 path = self._write_file( 921 'test.policy', """ 922 @denylist 923 """) 924 self.parser.parse_file(path) 925 926 927class ParseFileDenylistTests(unittest.TestCase): 928 """Tests for PolicyParser.parse_file.""" 929 930 def setUp(self): 931 self.arch = ARCH_64 932 self.kill_action = bpf.KillProcess() 933 self.parser = parser.PolicyParser( 934 self.arch, kill_action=self.kill_action, denylist=True) 935 self.tempdir = tempfile.mkdtemp() 936 937 def tearDown(self): 938 shutil.rmtree(self.tempdir) 939 940 def _write_file(self, filename, contents): 941 """Helper to write out a file for testing.""" 942 path = os.path.join(self.tempdir, filename) 943 with open(path, 'w') as outf: 944 outf.write(contents) 945 return path 946 947 def test_parse_simple(self): 948 """Allow simple denylist policy files.""" 949 path = self._write_file( 950 'test.policy', """ 951 # Comment. 952 @denylist 953 read: return ENOSYS 954 write: return ENOSYS 955 """) 956 957 self.assertEqual( 958 self.parser.parse_file(path), 959 parser.ParsedPolicy( 960 default_action=bpf.Allow(), 961 filter_statements=[ 962 parser.FilterStatement( 963 syscall=parser.Syscall('read', 0), 964 frequency=1, 965 filters=[ 966 parser.Filter(None, bpf.ReturnErrno( 967 self.arch.constants['ENOSYS'])), 968 ]), 969 parser.FilterStatement( 970 syscall=parser.Syscall('write', 1), 971 frequency=1, 972 filters=[ 973 parser.Filter(None, bpf.ReturnErrno( 974 self.arch.constants['ENOSYS'])), 975 ]), 976 ])) 977 978 def test_parse_simple_with_arg(self): 979 """Allow simple denylist policy files.""" 980 path = self._write_file( 981 'test.policy', """ 982 # Comment. 983 @denylist 984 read: return ENOSYS 985 write: arg0 == 0 ; return ENOSYS 986 """) 987 988 self.assertEqual( 989 self.parser.parse_file(path), 990 parser.ParsedPolicy( 991 default_action=bpf.Allow(), 992 filter_statements=[ 993 parser.FilterStatement( 994 syscall=parser.Syscall('read', 0), 995 frequency=1, 996 filters=[ 997 parser.Filter(None, bpf.ReturnErrno( 998 self.arch.constants['ENOSYS'])), 999 ]), 1000 parser.FilterStatement( 1001 syscall=parser.Syscall('write', 1), 1002 frequency=1, 1003 filters=[ 1004 parser.Filter([[parser.Atom(0, '==', 0)]], 1005 bpf.ReturnErrno(self.arch.constants['ENOSYS'])), 1006 parser.Filter(None, bpf.Allow()), 1007 ]), 1008 ])) 1009 1010 1011 def test_parse_denylist_no_header(self): 1012 """Reject trying to compile denylist policy file as allowlist.""" 1013 with self.assertRaisesRegex(parser.ParseException, 1014 r'policy must contain @denylist flag to be ' 1015 'compiled with --denylist flag'): 1016 path = self._write_file( 1017 'test.policy', """ 1018 read: return ENOSYS 1019 """) 1020 self.parser.parse_file(path) 1021 1022if __name__ == '__main__': 1023 unittest.main() 1024