1# Copyright 2020 The ChromiumOS Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""A parser for the Minijail policy file.""" 6 7import collections 8import itertools 9import os.path 10import re 11 12 13try: 14 import bpf 15except ImportError: 16 from minijail import bpf 17 18 19# Representations of numbers with different radix (base) in C. 20HEX_REGEX = r"-?0[xX][0-9a-fA-F]+" 21OCTAL_REGEX = r"-?0[0-7]+" 22DECIMAL_REGEX = r"-?[0-9]+" 23 24 25Token = collections.namedtuple( 26 "Token", ["type", "value", "filename", "line", "line_number", "column"] 27) 28 29# A regex that can tokenize a Minijail policy file line. 30_TOKEN_SPECIFICATION = ( 31 ("COMMENT", r"#.*$"), 32 ("WHITESPACE", r"\s+"), 33 ("CONTINUATION", r"\\$"), 34 ("DEFAULT", r"@default\b"), 35 ("INCLUDE", r"@include\b"), 36 ("FREQUENCY", r"@frequency\b"), 37 ("DENYLIST", r"@denylist$"), 38 ("PATH", r"(?:\.)?/\S+"), 39 ("NUMERIC_CONSTANT", f"{HEX_REGEX}|{OCTAL_REGEX}|{DECIMAL_REGEX}"), 40 ("COLON", r":"), 41 ("SEMICOLON", r";"), 42 ("COMMA", r","), 43 ("BITWISE_COMPLEMENT", r"~"), 44 ("LPAREN", r"\("), 45 ("RPAREN", r"\)"), 46 ("LBRACE", r"\{"), 47 ("RBRACE", r"\}"), 48 ("RBRACKET", r"\]"), 49 ("LBRACKET", r"\["), 50 ("OR", r"\|\|"), 51 ("AND", r"&&"), 52 ("BITWISE_OR", r"\|"), 53 ("OP", r"&|\bin\b|==|!=|<=|<|>=|>"), 54 ("EQUAL", r"="), 55 ("ARGUMENT", r"\barg[0-9]+\b"), 56 ("RETURN", r"\breturn\b"), 57 ( 58 "ACTION", 59 r"\ballow\b|\bkill-process\b|\bkill-thread\b|\bkill\b|\btrap\b|" 60 r"\btrace\b|\blog\b|\buser-notify\b", 61 ), 62 ("IDENTIFIER", r"[a-zA-Z_][a-zA-Z_0-9-@]*"), 63) 64_TOKEN_RE = re.compile( 65 "|".join(r"(?P<%s>%s)" % pair for pair in _TOKEN_SPECIFICATION) 66) 67 68 69class ParseException(Exception): 70 """An exception that is raised when parsing fails.""" 71 72 # pylint: disable=too-many-arguments 73 def __init__( 74 self, message, filename, *, line="", line_number=1, token=None 75 ): 76 if token: 77 line = token.line 78 line_number = token.line_number 79 column = token.column 80 length = len(token.value) 81 else: 82 column = len(line) 83 length = 1 84 85 message = ("%s(%d:%d): %s") % ( 86 filename, 87 line_number, 88 column + 1, 89 message, 90 ) 91 message += "\n %s" % line 92 message += "\n %s%s" % (" " * column, "^" * length) 93 super().__init__(message) 94 95 96class ParserState: 97 """Stores the state of the Parser to provide better diagnostics.""" 98 99 def __init__(self, filename): 100 self._filename = filename 101 self._line = "" 102 self._line_number = 0 103 104 @property 105 def filename(self): 106 """Return the name of the file being processed.""" 107 return self._filename 108 109 @property 110 def line(self): 111 """Return the current line being processed.""" 112 return self._line 113 114 @property 115 def line_number(self): 116 """Return the current line number being processed.""" 117 return self._line_number 118 119 def error(self, message, token=None): 120 """Raise a ParserException with the provided message.""" 121 raise ParseException( 122 message, 123 self.filename, 124 line=self._line, 125 line_number=self._line_number, 126 token=token, 127 ) 128 129 def tokenize(self, lines): 130 """Return a list of tokens for the current line.""" 131 tokens = [] 132 133 for line_number, line in enumerate(lines): 134 self._line_number = line_number + 1 135 self._line = line.rstrip("\r\n") 136 137 last_end = 0 138 for token in _TOKEN_RE.finditer(self._line): 139 if token.start() != last_end: 140 self.error( 141 "invalid token", 142 token=Token( 143 "INVALID", 144 self._line[last_end : token.start()], 145 self.filename, 146 self._line, 147 self._line_number, 148 last_end, 149 ), 150 ) 151 last_end = token.end() 152 153 # Omit whitespace and comments now to avoid sprinkling this 154 # logic elsewhere. 155 if token.lastgroup in ("WHITESPACE", "COMMENT", "CONTINUATION"): 156 continue 157 tokens.append( 158 Token( 159 token.lastgroup, 160 token.group(), 161 self.filename, 162 self._line, 163 self._line_number, 164 token.start(), 165 ) 166 ) 167 if last_end != len(self._line): 168 self.error( 169 "invalid token", 170 token=Token( 171 "INVALID", 172 self._line[last_end:], 173 self.filename, 174 self._line, 175 self._line_number, 176 last_end, 177 ), 178 ) 179 180 if self._line.endswith("\\"): 181 # This line is not finished yet. 182 continue 183 184 if tokens: 185 # Return a copy of the token list so that the caller can be free 186 # to modify it. 187 yield tokens[::] 188 tokens.clear() 189 190 191Atom = collections.namedtuple("Atom", ["argument_index", "op", "value"]) 192"""A single boolean comparison within a filter expression.""" 193 194Filter = collections.namedtuple("Filter", ["expression", "action"]) 195"""The result of parsing a DNF filter expression, with its action. 196 197Since the expression is in Disjunctive Normal Form, it is composed of two levels 198of lists, one for disjunctions and the inner one for conjunctions. The elements 199of the inner list are Atoms. 200""" 201 202Syscall = collections.namedtuple("Syscall", ["name", "number"]) 203"""A system call.""" 204 205ParsedFilterStatement = collections.namedtuple( 206 "ParsedFilterStatement", ["syscalls", "filters", "token"] 207) 208"""The result of parsing a filter statement. 209 210Statements have a list of syscalls, and an associated list of filters that will 211be evaluated sequentially when any of the syscalls is invoked. 212""" 213 214FilterStatement = collections.namedtuple( 215 "FilterStatement", ["syscall", "frequency", "filters"] 216) 217"""The filter list for a particular syscall. 218 219This is a mapping from one syscall to a list of filters that are evaluated 220sequentially. The last filter is always an unconditional action. 221""" 222 223ParsedPolicy = collections.namedtuple( 224 "ParsedPolicy", ["default_action", "filter_statements"] 225) 226"""The result of parsing a minijail .policy file.""" 227 228 229# pylint: disable=too-few-public-methods 230class PolicyParser: 231 """A parser for the Minijail seccomp policy file format.""" 232 233 def __init__( 234 self, 235 arch, 236 *, 237 kill_action, 238 include_depth_limit=10, 239 override_default_action=None, 240 denylist=False, 241 ret_log=False, 242 ): 243 self._parser_states = [ParserState("<memory>")] 244 self._kill_action = kill_action 245 self._include_depth_limit = include_depth_limit 246 if denylist: 247 self._default_action = bpf.Allow() 248 else: 249 self._default_action = self._kill_action 250 self._override_default_action = override_default_action 251 self._frequency_mapping = collections.defaultdict(int) 252 self._arch = arch 253 self._denylist = denylist 254 self._ret_log = ret_log 255 256 @property 257 def _parser_state(self): 258 return self._parser_states[-1] 259 260 # single-constant = identifier 261 # | numeric-constant 262 # ; 263 def _parse_single_constant(self, token): 264 if token.type == "IDENTIFIER": 265 if token.value not in self._arch.constants: 266 self._parser_state.error("invalid constant", token=token) 267 single_constant = self._arch.constants[token.value] 268 elif token.type == "NUMERIC_CONSTANT": 269 # As `int(_, 0)` in Python != `strtol(_, _, 0)` in C, to make sure 270 # the number parsing behaves exactly in C, instead of using `int()` 271 # directly, we list out all the possible formats for octal, decimal 272 # and hex numbers, and determine the corresponding base by regex. 273 try: 274 if re.match(HEX_REGEX, token.value): 275 base = 16 276 elif re.match(OCTAL_REGEX, token.value): 277 base = 8 278 elif re.match(DECIMAL_REGEX, token.value): 279 base = 10 280 else: 281 # This should never happen. 282 raise ValueError 283 single_constant = int(token.value, base=base) 284 except ValueError: 285 self._parser_state.error("invalid constant", token=token) 286 else: 287 self._parser_state.error("invalid constant", token=token) 288 if single_constant > self._arch.max_unsigned: 289 self._parser_state.error("unsigned overflow", token=token) 290 elif single_constant < self._arch.min_signed: 291 self._parser_state.error("signed underflow", token=token) 292 elif single_constant < 0: 293 # This converts the constant to an unsigned representation of the 294 # same value, since BPF only uses unsigned values. 295 single_constant = self._arch.truncate_word(single_constant) 296 return single_constant 297 298 # constant = [ '~' ] , '(' , value , ')' 299 # | [ '~' ] , single-constant 300 # ; 301 def _parse_constant(self, tokens): 302 negate = False 303 if tokens[0].type == "BITWISE_COMPLEMENT": 304 negate = True 305 tokens.pop(0) 306 if not tokens: 307 self._parser_state.error("empty complement") 308 if tokens[0].type == "BITWISE_COMPLEMENT": 309 self._parser_state.error( 310 "invalid double complement", token=tokens[0] 311 ) 312 if tokens[0].type == "LPAREN": 313 last_open_paren = tokens.pop(0) 314 single_value = self.parse_value(tokens) 315 if not tokens or tokens[0].type != "RPAREN": 316 self._parser_state.error( 317 "unclosed parenthesis", token=last_open_paren 318 ) 319 else: 320 single_value = self._parse_single_constant(tokens[0]) 321 tokens.pop(0) 322 if negate: 323 single_value = self._arch.truncate_word(~single_value) 324 return single_value 325 326 # value = constant , [ { '|' , constant } ] 327 # ; 328 def parse_value(self, tokens): 329 """Parse constants separated bitwise OR operator |. 330 331 Constants can be: 332 333 - A number that can be parsed with strtol() in C. 334 - A named constant expression. 335 - A parenthesized, valid constant expression. 336 - A valid constant expression prefixed with the unary bitwise 337 complement operator ~. 338 - A series of valid constant expressions separated by bitwise 339 OR operator |. 340 341 If there is an error parsing any of the constants, the whole process 342 fails. 343 """ 344 345 value = 0 346 while tokens: 347 value |= self._parse_constant(tokens) 348 if not tokens or tokens[0].type != "BITWISE_OR": 349 break 350 tokens.pop(0) 351 else: 352 self._parser_state.error("empty constant") 353 return value 354 355 # atom = argument , op , value 356 # ; 357 def _parse_atom(self, tokens): 358 if not tokens: 359 self._parser_state.error("missing argument") 360 argument = tokens.pop(0) 361 if argument.type != "ARGUMENT": 362 self._parser_state.error("invalid argument", token=argument) 363 364 if not tokens: 365 self._parser_state.error("missing operator") 366 operator = tokens.pop(0) 367 if operator.type != "OP": 368 self._parser_state.error("invalid operator", token=operator) 369 370 value = self.parse_value(tokens) 371 argument_index = int(argument.value[3:]) 372 if not 0 <= argument_index < bpf.MAX_SYSCALL_ARGUMENTS: 373 self._parser_state.error("invalid argument", token=argument) 374 return Atom(argument_index, operator.value, value) 375 376 # clause = atom , [ { '&&' , atom } ] 377 # ; 378 def _parse_clause(self, tokens): 379 atoms = [] 380 while tokens: 381 atoms.append(self._parse_atom(tokens)) 382 if not tokens or tokens[0].type != "AND": 383 break 384 tokens.pop(0) 385 else: 386 self._parser_state.error("empty clause") 387 return atoms 388 389 # argument-expression = clause , [ { '||' , clause } ] 390 # ; 391 def parse_argument_expression(self, tokens): 392 """Parse a argument expression in Disjunctive Normal Form. 393 394 Since BPF disallows back jumps, we build the basic blocks in reverse 395 order so that all the jump targets are known by the time we need to 396 reference them. 397 """ 398 399 clauses = [] 400 while tokens: 401 clauses.append(self._parse_clause(tokens)) 402 if not tokens or tokens[0].type != "OR": 403 break 404 tokens.pop(0) 405 else: 406 self._parser_state.error("empty argument expression") 407 return clauses 408 409 # default-action = 'kill-process' 410 # | 'kill-thread' 411 # | 'kill' 412 # | 'trap' 413 # | 'user-notify' 414 # ; 415 def _parse_default_action(self, tokens): 416 if not tokens: 417 self._parser_state.error("missing default action") 418 action_token = tokens.pop(0) 419 if action_token.type != "ACTION": 420 return self._parser_state.error( 421 "invalid default action", token=action_token 422 ) 423 if action_token.value == "kill-process": 424 return bpf.KillProcess() 425 if action_token.value == "kill-thread": 426 return bpf.KillThread() 427 if action_token.value == "kill": 428 return self._kill_action 429 if action_token.value == "trap": 430 return bpf.Trap() 431 if action_token.value == "user-notify": 432 return bpf.UserNotify() 433 return self._parser_state.error( 434 "invalid permissive default action", token=action_token 435 ) 436 437 # action = 'allow' | '1' 438 # | 'kill-process' 439 # | 'kill-thread' 440 # | 'kill' 441 # | 'trap' 442 # | 'trace' 443 # | 'log' 444 # | 'user-notify' 445 # | 'return' , single-constant 446 # ; 447 def parse_action(self, tokens): 448 if not tokens: 449 self._parser_state.error("missing action") 450 action_token = tokens.pop(0) 451 # denylist policies must specify a return for every line. 452 if self._denylist: 453 if action_token.type != "RETURN": 454 self._parser_state.error("invalid denylist policy") 455 456 if action_token.type == "ACTION": 457 if action_token.value == "allow": 458 return bpf.Allow() 459 if action_token.value == "kill": 460 return self._kill_action 461 if action_token.value == "kill-process": 462 return bpf.KillProcess() 463 if action_token.value == "kill-thread": 464 return bpf.KillThread() 465 if action_token.value == "trap": 466 return bpf.Trap() 467 if action_token.value == "trace": 468 return bpf.Trace() 469 if action_token.value == "user-notify": 470 return bpf.UserNotify() 471 if action_token.value == "log": 472 return bpf.Log() 473 elif action_token.type == "NUMERIC_CONSTANT": 474 constant = self._parse_single_constant(action_token) 475 if constant == 1: 476 return bpf.Allow() 477 elif action_token.type == "RETURN": 478 if not tokens: 479 self._parser_state.error("missing return value") 480 if self._ret_log: 481 tokens.pop(0) 482 return bpf.Log() 483 else: 484 return bpf.ReturnErrno( 485 self._parse_single_constant(tokens.pop(0)) 486 ) 487 return self._parser_state.error("invalid action", token=action_token) 488 489 # single-filter = action 490 # | argument-expression , [ ';' , action ] 491 # | '!','(', argument-expression, [ ';', action ], ')' 492 # ; 493 def _parse_single_filter(self, tokens): 494 if not tokens: 495 self._parser_state.error("missing filter") 496 if tokens[0].type == "ARGUMENT": 497 # Only argument expressions can start with an ARGUMENT token. 498 argument_expression = self.parse_argument_expression(tokens) 499 if tokens and tokens[0].type == "SEMICOLON": 500 tokens.pop(0) 501 action = self.parse_action(tokens) 502 else: 503 action = bpf.Allow() 504 return Filter(argument_expression, action) 505 else: 506 return Filter(None, self.parse_action(tokens)) 507 508 # filter = '{' , single-filter , [ { ',' , single-filter } ] , '}' 509 # | single-filter 510 # ; 511 def parse_filter(self, tokens): 512 """Parse a filter and return a list of Filter objects.""" 513 if not tokens: 514 self._parser_state.error("missing filter") 515 filters = [] 516 if tokens[0].type == "LBRACE": 517 opening_brace = tokens.pop(0) 518 while tokens: 519 filters.append(self._parse_single_filter(tokens)) 520 if not tokens or tokens[0].type != "COMMA": 521 break 522 tokens.pop(0) 523 if not tokens or tokens[0].type != "RBRACE": 524 self._parser_state.error("unclosed brace", token=opening_brace) 525 tokens.pop(0) 526 else: 527 filters.append(self._parse_single_filter(tokens)) 528 return filters 529 530 # key-value-pair = identifier , '=', identifier , [ { ',' , identifier } ] 531 # ; 532 def _parse_key_value_pair(self, tokens): 533 if not tokens: 534 self._parser_state.error("missing key") 535 key = tokens.pop(0) 536 if key.type != "IDENTIFIER": 537 self._parser_state.error("invalid key", token=key) 538 if not tokens: 539 self._parser_state.error("missing equal") 540 if tokens[0].type != "EQUAL": 541 self._parser_state.error("invalid equal", token=tokens[0]) 542 tokens.pop(0) 543 value_list = [] 544 while tokens: 545 value = tokens.pop(0) 546 if value.type != "IDENTIFIER": 547 self._parser_state.error("invalid value", token=value) 548 value_list.append(value.value) 549 if not tokens or tokens[0].type != "COMMA": 550 break 551 tokens.pop(0) 552 else: 553 self._parser_state.error("empty value") 554 return (key.value, value_list) 555 556 # metadata = '[' , key-value-pair , [ { ';' , key-value-pair } ] , ']' 557 # ; 558 def _parse_metadata(self, tokens): 559 if not tokens: 560 self._parser_state.error("missing opening bracket") 561 opening_bracket = tokens.pop(0) 562 if opening_bracket.type != "LBRACKET": 563 self._parser_state.error( 564 "invalid opening bracket", token=opening_bracket 565 ) 566 metadata = {} 567 while tokens: 568 first_token = tokens[0] 569 key, value = self._parse_key_value_pair(tokens) 570 if key in metadata: 571 self._parser_state.error( 572 'duplicate metadata key: "%s"' % key, token=first_token 573 ) 574 metadata[key] = value 575 if not tokens or tokens[0].type != "SEMICOLON": 576 break 577 tokens.pop(0) 578 if not tokens or tokens[0].type != "RBRACKET": 579 self._parser_state.error("unclosed bracket", token=opening_bracket) 580 tokens.pop(0) 581 return metadata 582 583 # syscall-descriptor = syscall-name , [ metadata ] 584 # | syscall-group-name , [ metadata ] 585 # ; 586 def _parse_syscall_descriptor(self, tokens): 587 if not tokens: 588 self._parser_state.error("missing syscall descriptor") 589 syscall_descriptor = tokens.pop(0) 590 # `kill` as a syscall name is a special case since kill is also a valid 591 # action and actions have precendence over identifiers. 592 if ( 593 syscall_descriptor.type != "IDENTIFIER" 594 and syscall_descriptor.value != "kill" 595 ): 596 self._parser_state.error( 597 "invalid syscall descriptor", token=syscall_descriptor 598 ) 599 if tokens and tokens[0].type == "LBRACKET": 600 metadata = self._parse_metadata(tokens) 601 if ( 602 "arch" in metadata 603 and self._arch.arch_name not in metadata["arch"] 604 ): 605 return () 606 if "@" in syscall_descriptor.value: 607 # This is a syscall group. 608 subtokens = syscall_descriptor.value.split("@") 609 if len(subtokens) != 2: 610 self._parser_state.error( 611 "invalid syscall group name", token=syscall_descriptor 612 ) 613 syscall_group_name, syscall_namespace_name = subtokens 614 if syscall_namespace_name not in self._arch.syscall_groups: 615 self._parser_state.error( 616 "nonexistent syscall group namespace", 617 token=syscall_descriptor, 618 ) 619 syscall_namespace = self._arch.syscall_groups[ 620 syscall_namespace_name 621 ] 622 if syscall_group_name not in syscall_namespace: 623 self._parser_state.error( 624 "nonexistent syscall group", token=syscall_descriptor 625 ) 626 return ( 627 Syscall(name, self._arch.syscalls[name]) 628 for name in syscall_namespace[syscall_group_name] 629 ) 630 if syscall_descriptor.value not in self._arch.syscalls: 631 self._parser_state.error( 632 "nonexistent syscall", token=syscall_descriptor 633 ) 634 return ( 635 Syscall( 636 syscall_descriptor.value, 637 self._arch.syscalls[syscall_descriptor.value], 638 ), 639 ) 640 641 # filter-statement = 642 # '{' , syscall-descriptor , [ { ',', syscall-descriptor } ] , '}' , 643 # ':' , filter 644 # | syscall-descriptor , ':' , filter 645 # ; 646 def parse_filter_statement(self, tokens): 647 """Parse a filter statement and return a ParsedFilterStatement.""" 648 if not tokens: 649 self._parser_state.error("empty filter statement") 650 syscall_descriptors = [] 651 if tokens[0].type == "LBRACE": 652 opening_brace = tokens.pop(0) 653 while tokens: 654 syscall_descriptors.extend( 655 self._parse_syscall_descriptor(tokens) 656 ) 657 if not tokens or tokens[0].type != "COMMA": 658 break 659 tokens.pop(0) 660 if not tokens or tokens[0].type != "RBRACE": 661 self._parser_state.error("unclosed brace", token=opening_brace) 662 tokens.pop(0) 663 else: 664 syscall_descriptors.extend(self._parse_syscall_descriptor(tokens)) 665 if not tokens: 666 self._parser_state.error("missing colon") 667 if tokens[0].type != "COLON": 668 self._parser_state.error("invalid colon", token=tokens[0]) 669 # Given that there can be multiple syscalls and filters in a single 670 # filter statement, use the colon token as the anchor for error location 671 # purposes. 672 colon_token = tokens.pop(0) 673 parsed_filter = self.parse_filter(tokens) 674 if not syscall_descriptors: 675 return None 676 return ParsedFilterStatement( 677 tuple(syscall_descriptors), parsed_filter, colon_token 678 ) 679 680 # include-statement = '@include' , posix-path 681 # ; 682 def _parse_include_statement(self, tokens): 683 if not tokens: 684 self._parser_state.error("empty filter statement") 685 if tokens[0].type != "INCLUDE": 686 self._parser_state.error("invalid include", token=tokens[0]) 687 tokens.pop(0) 688 if not tokens: 689 self._parser_state.error("empty include path") 690 include_path = tokens.pop(0) 691 if include_path.type != "PATH": 692 self._parser_state.error("invalid include path", token=include_path) 693 if len(self._parser_states) == self._include_depth_limit: 694 self._parser_state.error("@include statement nested too deep") 695 include_filename = os.path.normpath( 696 os.path.join( 697 os.path.dirname(self._parser_state.filename), include_path.value 698 ) 699 ) 700 if not os.path.isfile(include_filename): 701 self._parser_state.error( 702 "Could not @include %s" % include_filename, token=include_path 703 ) 704 return self._parse_policy_file(include_filename) 705 706 def _parse_frequency_file(self, filename): 707 self._parser_states.append(ParserState(filename)) 708 try: 709 frequency_mapping = collections.defaultdict(int) 710 with open(filename, encoding="utf-8") as frequency_file: 711 for tokens in self._parser_state.tokenize(frequency_file): 712 syscall_numbers = self._parse_syscall_descriptor(tokens) 713 if not tokens: 714 self._parser_state.error("missing colon") 715 if tokens[0].type != "COLON": 716 self._parser_state.error( 717 "invalid colon", token=tokens[0] 718 ) 719 tokens.pop(0) 720 721 if not tokens: 722 self._parser_state.error("missing number") 723 number = tokens.pop(0) 724 if number.type != "NUMERIC_CONSTANT": 725 self._parser_state.error("invalid number", token=number) 726 number_value = int(number.value, base=0) 727 if number_value < 0: 728 self._parser_state.error("invalid number", token=number) 729 730 for syscall_number in syscall_numbers: 731 frequency_mapping[syscall_number] += number_value 732 return frequency_mapping 733 finally: 734 self._parser_states.pop() 735 736 # frequency-statement = '@frequency' , posix-path 737 # ; 738 def _parse_frequency_statement(self, tokens): 739 if not tokens: 740 self._parser_state.error("empty frequency statement") 741 if tokens[0].type != "FREQUENCY": 742 self._parser_state.error("invalid frequency", token=tokens[0]) 743 tokens.pop(0) 744 if not tokens: 745 self._parser_state.error("empty frequency path") 746 frequency_path = tokens.pop(0) 747 if frequency_path.type != "PATH": 748 self._parser_state.error( 749 "invalid frequency path", token=frequency_path 750 ) 751 frequency_filename = os.path.normpath( 752 os.path.join( 753 os.path.dirname(self._parser_state.filename), 754 frequency_path.value, 755 ) 756 ) 757 if not os.path.isfile(frequency_filename): 758 self._parser_state.error( 759 "Could not open frequency file %s" % frequency_filename, 760 token=frequency_path, 761 ) 762 return self._parse_frequency_file(frequency_filename) 763 764 # default-statement = '@default' , default-action 765 # ; 766 def _parse_default_statement(self, tokens): 767 if not tokens: 768 self._parser_state.error("empty default statement") 769 if tokens[0].type != "DEFAULT": 770 self._parser_state.error("invalid default", token=tokens[0]) 771 tokens.pop(0) 772 if not tokens: 773 self._parser_state.error("empty action") 774 return self._parse_default_action(tokens) 775 776 def _parse_policy_file(self, filename): 777 self._parser_states.append(ParserState(filename)) 778 try: 779 statements = [] 780 denylist_header = False 781 with open(filename, encoding="utf-8") as policy_file: 782 for tokens in self._parser_state.tokenize(policy_file): 783 if tokens[0].type == "INCLUDE": 784 statements.extend(self._parse_include_statement(tokens)) 785 elif tokens[0].type == "FREQUENCY": 786 for ( 787 syscall_number, 788 frequency, 789 ) in self._parse_frequency_statement(tokens).items(): 790 self._frequency_mapping[syscall_number] += frequency 791 elif tokens[0].type == "DEFAULT": 792 self._default_action = self._parse_default_statement( 793 tokens 794 ) 795 elif tokens[0].type == "DENYLIST": 796 tokens.pop() 797 if not self._denylist: 798 self._parser_state.error( 799 "policy is denylist, but " 800 "flag --denylist not " 801 "passed in." 802 ) 803 else: 804 denylist_header = True 805 else: 806 statement = self.parse_filter_statement(tokens) 807 if statement is None: 808 # If all the syscalls in the statement are for 809 # another arch, skip the whole statement. 810 continue 811 statements.append(statement) 812 813 if tokens: 814 self._parser_state.error( 815 "extra tokens", token=tokens[0] 816 ) 817 if self._denylist and not denylist_header: 818 self._parser_state.error( 819 "policy must contain @denylist flag to" 820 " be compiled with --denylist flag." 821 ) 822 return statements 823 finally: 824 self._parser_states.pop() 825 826 def parse_file(self, filename): 827 """Parse a file and return the list of FilterStatements.""" 828 self._frequency_mapping = collections.defaultdict(int) 829 try: 830 statements = self._parse_policy_file(filename) 831 except RecursionError: 832 raise ParseException( 833 "recursion limit exceeded", 834 filename, 835 line=self._parser_states[-1].line, 836 ) 837 838 # Collapse statements into a single syscall-to-filter-list, remembering 839 # the token for each filter for better diagnostics. 840 syscall_filter_mapping = {} 841 syscall_filter_definitions = {} 842 filter_statements = [] 843 for syscalls, filters, token in statements: 844 for syscall in syscalls: 845 if syscall not in syscall_filter_mapping: 846 filter_statements.append( 847 FilterStatement( 848 syscall, self._frequency_mapping.get(syscall, 1), [] 849 ) 850 ) 851 syscall_filter_mapping[syscall] = filter_statements[-1] 852 syscall_filter_definitions[syscall] = [] 853 for filt in filters: 854 syscall_filter_mapping[syscall].filters.append(filt) 855 syscall_filter_definitions[syscall].append(token) 856 default_action = self._override_default_action or self._default_action 857 for filter_statement in filter_statements: 858 unconditional_actions_suffix = list( 859 itertools.dropwhile( 860 lambda filt: filt.expression is not None, 861 filter_statement.filters, 862 ) 863 ) 864 if len(unconditional_actions_suffix) == 1: 865 # The last filter already has an unconditional action, no need 866 # to add another one. 867 continue 868 if len(unconditional_actions_suffix) > 1: 869 previous_definition_token = syscall_filter_definitions[ 870 filter_statement.syscall 871 ][-len(unconditional_actions_suffix)] 872 current_definition_token = syscall_filter_definitions[ 873 filter_statement.syscall 874 ][-len(unconditional_actions_suffix) + 1] 875 raise ParseException( 876 ( 877 "Syscall %s (number %d) already had " 878 "an unconditional action applied" 879 ) 880 % ( 881 filter_statement.syscall.name, 882 filter_statement.syscall.number, 883 ), 884 filename=current_definition_token.filename, 885 token=current_definition_token, 886 ) from ParseException( 887 "Previous definition", 888 filename=previous_definition_token.filename, 889 token=previous_definition_token, 890 ) 891 assert not unconditional_actions_suffix 892 filter_statement.filters.append( 893 Filter(expression=None, action=default_action) 894 ) 895 return ParsedPolicy(default_action, filter_statements) 896