1#!/usr/bin/env python 2# 3# (C) 2012-2013 by Pablo Neira Ayuso <pablo@netfilter.org> 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This software has been sponsored by Sophos Astaro <http://www.sophos.com> 11# 12 13from __future__ import print_function 14import sys 15import os 16import subprocess 17import argparse 18 19IPTABLES = "iptables" 20IP6TABLES = "ip6tables" 21ARPTABLES = "arptables" 22EBTABLES = "ebtables" 23 24IPTABLES_SAVE = "iptables-save" 25IP6TABLES_SAVE = "ip6tables-save" 26ARPTABLES_SAVE = "arptables-save" 27EBTABLES_SAVE = "ebtables-save" 28#IPTABLES_SAVE = ['xtables-save','-4'] 29#IP6TABLES_SAVE = ['xtables-save','-6'] 30 31EXTENSIONS_PATH = "extensions" 32LOGFILE="/tmp/iptables-test.log" 33log_file = None 34 35 36class Colors: 37 HEADER = '\033[95m' 38 BLUE = '\033[94m' 39 GREEN = '\033[92m' 40 YELLOW = '\033[93m' 41 RED = '\033[91m' 42 ENDC = '\033[0m' 43 44 45def print_error(reason, filename=None, lineno=None): 46 ''' 47 Prints an error with nice colors, indicating file and line number. 48 ''' 49 print(filename + ": " + Colors.RED + "ERROR" + 50 Colors.ENDC + ": line %d (%s)" % (lineno, reason)) 51 52 53def delete_rule(iptables, rule, filename, lineno): 54 ''' 55 Removes an iptables rule 56 ''' 57 cmd = iptables + " -D " + rule 58 ret = execute_cmd(cmd, filename, lineno) 59 if ret == 1: 60 reason = "cannot delete: " + iptables + " -I " + rule 61 print_error(reason, filename, lineno) 62 return -1 63 64 return 0 65 66 67def run_test(iptables, rule, rule_save, res, filename, lineno, netns): 68 ''' 69 Executes an unit test. Returns the output of delete_rule(). 70 71 Parameters: 72 :param iptables: string with the iptables command to execute 73 :param rule: string with iptables arguments for the rule to test 74 :param rule_save: string to find the rule in the output of iptables -save 75 :param res: expected result of the rule. Valid values: "OK", "FAIL" 76 :param filename: name of the file tested (used for print_error purposes) 77 :param lineno: line number being tested (used for print_error purposes) 78 ''' 79 ret = 0 80 81 cmd = iptables + " -A " + rule 82 if netns: 83 cmd = "ip netns exec ____iptables-container-test " + EXECUTEABLE + " " + cmd 84 85 ret = execute_cmd(cmd, filename, lineno) 86 87 # 88 # report failed test 89 # 90 if ret: 91 if res == "OK": 92 reason = "cannot load: " + cmd 93 print_error(reason, filename, lineno) 94 return -1 95 else: 96 # do not report this error 97 return 0 98 else: 99 if res == "FAIL": 100 reason = "should fail: " + cmd 101 print_error(reason, filename, lineno) 102 delete_rule(iptables, rule, filename, lineno) 103 return -1 104 105 matching = 0 106 splitted = iptables.split(" ") 107 if len(splitted) == 2: 108 if splitted[1] == '-4': 109 command = IPTABLES_SAVE 110 elif splitted[1] == '-6': 111 command = IP6TABLES_SAVE 112 elif len(splitted) == 1: 113 if splitted[0] == IPTABLES: 114 command = IPTABLES_SAVE 115 elif splitted[0] == IP6TABLES: 116 command = IP6TABLES_SAVE 117 elif splitted[0] == ARPTABLES: 118 command = ARPTABLES_SAVE 119 elif splitted[0] == EBTABLES: 120 command = EBTABLES_SAVE 121 122 command = EXECUTEABLE + " " + command 123 124 if netns: 125 command = "ip netns exec ____iptables-container-test " + command 126 127 args = splitted[1:] 128 proc = subprocess.Popen(command, shell=True, 129 stdin=subprocess.PIPE, 130 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 131 out, err = proc.communicate() 132 133 # 134 # check for segfaults 135 # 136 if proc.returncode == -11: 137 reason = "iptables-save segfaults: " + cmd 138 print_error(reason, filename, lineno) 139 delete_rule(iptables, rule, filename, lineno) 140 return -1 141 142 # find the rule 143 matching = out.find(rule_save.encode('utf-8')) 144 if matching < 0: 145 reason = "cannot find: " + iptables + " -I " + rule 146 print_error(reason, filename, lineno) 147 delete_rule(iptables, rule, filename, lineno) 148 return -1 149 150 # Test "ip netns del NETNS" path with rules in place 151 if netns: 152 return 0 153 154 return delete_rule(iptables, rule, filename, lineno) 155 156def execute_cmd(cmd, filename, lineno): 157 ''' 158 Executes a command, checking for segfaults and returning the command exit 159 code. 160 161 :param cmd: string with the command to be executed 162 :param filename: name of the file tested (used for print_error purposes) 163 :param lineno: line number being tested (used for print_error purposes) 164 ''' 165 global log_file 166 if cmd.startswith('iptables ') or cmd.startswith('ip6tables ') or cmd.startswith('ebtables ') or cmd.startswith('arptables '): 167 cmd = EXECUTEABLE + " " + cmd 168 169 print("command: {}".format(cmd), file=log_file) 170 ret = subprocess.call(cmd, shell=True, universal_newlines=True, 171 stderr=subprocess.STDOUT, stdout=log_file) 172 log_file.flush() 173 174 # generic check for segfaults 175 if ret == -11: 176 reason = "command segfaults: " + cmd 177 print_error(reason, filename, lineno) 178 return ret 179 180 181def run_test_file(filename, netns): 182 ''' 183 Runs a test file 184 185 :param filename: name of the file with the test rules 186 ''' 187 # 188 # if this is not a test file, skip. 189 # 190 if not filename.endswith(".t"): 191 return 0, 0 192 193 if "libipt_" in filename: 194 iptables = IPTABLES 195 elif "libip6t_" in filename: 196 iptables = IP6TABLES 197 elif "libxt_" in filename: 198 iptables = IPTABLES 199 elif "libarpt_" in filename: 200 # only supported with nf_tables backend 201 if EXECUTEABLE != "xtables-nft-multi": 202 return 0, 0 203 iptables = ARPTABLES 204 elif "libebt_" in filename: 205 # only supported with nf_tables backend 206 if EXECUTEABLE != "xtables-nft-multi": 207 return 0, 0 208 iptables = EBTABLES 209 else: 210 # default to iptables if not known prefix 211 iptables = IPTABLES 212 213 f = open(filename) 214 215 tests = 0 216 passed = 0 217 table = "" 218 total_test_passed = True 219 220 if netns: 221 execute_cmd("ip netns add ____iptables-container-test", filename, 0) 222 223 for lineno, line in enumerate(f): 224 if line[0] == "#" or len(line.strip()) == 0: 225 continue 226 227 if line[0] == ":": 228 chain_array = line.rstrip()[1:].split(",") 229 continue 230 231 # external non-iptables invocation, executed as is. 232 if line[0] == "@": 233 external_cmd = line.rstrip()[1:] 234 if netns: 235 external_cmd = "ip netns exec ____iptables-container-test " + external_cmd 236 execute_cmd(external_cmd, filename, lineno) 237 continue 238 239 # external iptables invocation, executed as is. 240 if line[0] == "%": 241 external_cmd = line.rstrip()[1:] 242 if netns: 243 external_cmd = "ip netns exec ____iptables-container-test " + EXECUTEABLE + " " + external_cmd 244 execute_cmd(external_cmd, filename, lineno) 245 continue 246 247 if line[0] == "*": 248 table = line.rstrip()[1:] 249 continue 250 251 if len(chain_array) == 0: 252 print("broken test, missing chain, leaving") 253 sys.exit() 254 255 test_passed = True 256 tests += 1 257 258 for chain in chain_array: 259 item = line.split(";") 260 if table == "": 261 rule = chain + " " + item[0] 262 else: 263 rule = chain + " -t " + table + " " + item[0] 264 265 if item[1] == "=": 266 rule_save = chain + " " + item[0] 267 else: 268 rule_save = chain + " " + item[1] 269 270 res = item[2].rstrip() 271 ret = run_test(iptables, rule, rule_save, 272 res, filename, lineno + 1, netns) 273 274 if ret < 0: 275 test_passed = False 276 total_test_passed = False 277 break 278 279 if test_passed: 280 passed += 1 281 282 if netns: 283 execute_cmd("ip netns del ____iptables-container-test", filename, 0) 284 if total_test_passed: 285 print(filename + ": " + Colors.GREEN + "OK" + Colors.ENDC) 286 287 f.close() 288 return tests, passed 289 290 291def show_missing(): 292 ''' 293 Show the list of missing test files 294 ''' 295 file_list = os.listdir(EXTENSIONS_PATH) 296 testfiles = [i for i in file_list if i.endswith('.t')] 297 libfiles = [i for i in file_list 298 if i.startswith('lib') and i.endswith('.c')] 299 300 def test_name(x): 301 return x[0:-2] + '.t' 302 missing = [test_name(i) for i in libfiles 303 if not test_name(i) in testfiles] 304 305 print('\n'.join(missing)) 306 307 308# 309# main 310# 311def main(): 312 parser = argparse.ArgumentParser(description='Run iptables tests') 313 parser.add_argument('filename', nargs='*', 314 metavar='path/to/file.t', 315 help='Run only this test') 316 parser.add_argument('-H', '--host', action='store_true', 317 help='Run tests against installed binaries') 318 parser.add_argument('-l', '--legacy', action='store_true', 319 help='Test iptables-legacy') 320 parser.add_argument('-m', '--missing', action='store_true', 321 help='Check for missing tests') 322 parser.add_argument('-n', '--nftables', action='store_true', 323 help='Test iptables-over-nftables') 324 parser.add_argument('-N', '--netns', action='store_true', 325 help='Test netnamespace path') 326 args = parser.parse_args() 327 328 # 329 # show list of missing test files 330 # 331 if args.missing: 332 show_missing() 333 return 334 335 global EXECUTEABLE 336 EXECUTEABLE = "xtables-legacy-multi" 337 if args.nftables: 338 EXECUTEABLE = "xtables-nft-multi" 339 340 if os.getuid() != 0: 341 print("You need to be root to run this, sorry") 342 return 343 344 if not args.host: 345 os.putenv("XTABLES_LIBDIR", os.path.abspath(EXTENSIONS_PATH)) 346 os.putenv("PATH", "%s/iptables:%s" % (os.path.abspath(os.path.curdir), 347 os.getenv("PATH"))) 348 349 test_files = 0 350 tests = 0 351 passed = 0 352 353 # setup global var log file 354 global log_file 355 try: 356 log_file = open(LOGFILE, 'w') 357 except IOError: 358 print("Couldn't open log file %s" % LOGFILE) 359 return 360 361 if args.filename: 362 file_list = args.filename 363 else: 364 file_list = [os.path.join(EXTENSIONS_PATH, i) 365 for i in os.listdir(EXTENSIONS_PATH) 366 if i.endswith('.t')] 367 file_list.sort() 368 369 if not args.netns: 370 try: 371 import unshare 372 unshare.unshare(unshare.CLONE_NEWNET) 373 except: 374 print("Cannot run in own namespace, connectivity might break") 375 376 for filename in file_list: 377 file_tests, file_passed = run_test_file(filename, args.netns) 378 if file_tests: 379 tests += file_tests 380 passed += file_passed 381 test_files += 1 382 383 print("%d test files, %d unit tests, %d passed" % (test_files, tests, passed)) 384 385 386if __name__ == '__main__': 387 main() 388