• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# (C) 2012-2013 by Pablo Neira Ayuso <pablo@netfilter.org>
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This software has been sponsored by Sophos Astaro <http://www.sophos.com>
11#
12
13from __future__ import print_function
14import sys
15import os
16import subprocess
17import argparse
18
19IPTABLES = "iptables"
20IP6TABLES = "ip6tables"
21ARPTABLES = "arptables"
22EBTABLES = "ebtables"
23
24IPTABLES_SAVE = "iptables-save"
25IP6TABLES_SAVE = "ip6tables-save"
26ARPTABLES_SAVE = "arptables-save"
27EBTABLES_SAVE = "ebtables-save"
28#IPTABLES_SAVE = ['xtables-save','-4']
29#IP6TABLES_SAVE = ['xtables-save','-6']
30
31EXTENSIONS_PATH = "extensions"
32LOGFILE="/tmp/iptables-test.log"
33log_file = None
34
35
36class Colors:
37    HEADER = '\033[95m'
38    BLUE = '\033[94m'
39    GREEN = '\033[92m'
40    YELLOW = '\033[93m'
41    RED = '\033[91m'
42    ENDC = '\033[0m'
43
44
45def print_error(reason, filename=None, lineno=None):
46    '''
47    Prints an error with nice colors, indicating file and line number.
48    '''
49    print(filename + ": " + Colors.RED + "ERROR" +
50        Colors.ENDC + ": line %d (%s)" % (lineno, reason))
51
52
53def delete_rule(iptables, rule, filename, lineno):
54    '''
55    Removes an iptables rule
56    '''
57    cmd = iptables + " -D " + rule
58    ret = execute_cmd(cmd, filename, lineno)
59    if ret == 1:
60        reason = "cannot delete: " + iptables + " -I " + rule
61        print_error(reason, filename, lineno)
62        return -1
63
64    return 0
65
66
67def run_test(iptables, rule, rule_save, res, filename, lineno, netns):
68    '''
69    Executes an unit test. Returns the output of delete_rule().
70
71    Parameters:
72    :param  iptables: string with the iptables command to execute
73    :param rule: string with iptables arguments for the rule to test
74    :param rule_save: string to find the rule in the output of iptables -save
75    :param res: expected result of the rule. Valid values: "OK", "FAIL"
76    :param filename: name of the file tested (used for print_error purposes)
77    :param lineno: line number being tested (used for print_error purposes)
78    '''
79    ret = 0
80
81    cmd = iptables + " -A " + rule
82    if netns:
83            cmd = "ip netns exec ____iptables-container-test " + EXECUTEABLE + " " + cmd
84
85    ret = execute_cmd(cmd, filename, lineno)
86
87    #
88    # report failed test
89    #
90    if ret:
91        if res == "OK":
92            reason = "cannot load: " + cmd
93            print_error(reason, filename, lineno)
94            return -1
95        else:
96            # do not report this error
97            return 0
98    else:
99        if res == "FAIL":
100            reason = "should fail: " + cmd
101            print_error(reason, filename, lineno)
102            delete_rule(iptables, rule, filename, lineno)
103            return -1
104
105    matching = 0
106    splitted = iptables.split(" ")
107    if len(splitted) == 2:
108        if splitted[1] == '-4':
109            command = IPTABLES_SAVE
110        elif splitted[1] == '-6':
111            command = IP6TABLES_SAVE
112    elif len(splitted) == 1:
113        if splitted[0] == IPTABLES:
114            command = IPTABLES_SAVE
115        elif splitted[0] == IP6TABLES:
116            command = IP6TABLES_SAVE
117        elif splitted[0] == ARPTABLES:
118            command = ARPTABLES_SAVE
119        elif splitted[0] == EBTABLES:
120            command = EBTABLES_SAVE
121
122    command = EXECUTEABLE + " " + command
123
124    if netns:
125            command = "ip netns exec ____iptables-container-test " + command
126
127    args = splitted[1:]
128    proc = subprocess.Popen(command, shell=True,
129                            stdin=subprocess.PIPE,
130                            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
131    out, err = proc.communicate()
132
133    #
134    # check for segfaults
135    #
136    if proc.returncode == -11:
137        reason = "iptables-save segfaults: " + cmd
138        print_error(reason, filename, lineno)
139        delete_rule(iptables, rule, filename, lineno)
140        return -1
141
142    # find the rule
143    matching = out.find(rule_save.encode('utf-8'))
144    if matching < 0:
145        reason = "cannot find: " + iptables + " -I " + rule
146        print_error(reason, filename, lineno)
147        delete_rule(iptables, rule, filename, lineno)
148        return -1
149
150    # Test "ip netns del NETNS" path with rules in place
151    if netns:
152        return 0
153
154    return delete_rule(iptables, rule, filename, lineno)
155
156def execute_cmd(cmd, filename, lineno):
157    '''
158    Executes a command, checking for segfaults and returning the command exit
159    code.
160
161    :param cmd: string with the command to be executed
162    :param filename: name of the file tested (used for print_error purposes)
163    :param lineno: line number being tested (used for print_error purposes)
164    '''
165    global log_file
166    if cmd.startswith('iptables ') or cmd.startswith('ip6tables ') or cmd.startswith('ebtables ') or cmd.startswith('arptables '):
167        cmd = EXECUTEABLE + " " + cmd
168
169    print("command: {}".format(cmd), file=log_file)
170    ret = subprocess.call(cmd, shell=True, universal_newlines=True,
171        stderr=subprocess.STDOUT, stdout=log_file)
172    log_file.flush()
173
174    # generic check for segfaults
175    if ret  == -11:
176        reason = "command segfaults: " + cmd
177        print_error(reason, filename, lineno)
178    return ret
179
180
181def run_test_file(filename, netns):
182    '''
183    Runs a test file
184
185    :param filename: name of the file with the test rules
186    '''
187    #
188    # if this is not a test file, skip.
189    #
190    if not filename.endswith(".t"):
191        return 0, 0
192
193    if "libipt_" in filename:
194        iptables = IPTABLES
195    elif "libip6t_" in filename:
196        iptables = IP6TABLES
197    elif "libxt_"  in filename:
198        iptables = IPTABLES
199    elif "libarpt_" in filename:
200        # only supported with nf_tables backend
201        if EXECUTEABLE != "xtables-nft-multi":
202           return 0, 0
203        iptables = ARPTABLES
204    elif "libebt_" in filename:
205        # only supported with nf_tables backend
206        if EXECUTEABLE != "xtables-nft-multi":
207           return 0, 0
208        iptables = EBTABLES
209    else:
210        # default to iptables if not known prefix
211        iptables = IPTABLES
212
213    f = open(filename)
214
215    tests = 0
216    passed = 0
217    table = ""
218    total_test_passed = True
219
220    if netns:
221        execute_cmd("ip netns add ____iptables-container-test", filename, 0)
222
223    for lineno, line in enumerate(f):
224        if line[0] == "#" or len(line.strip()) == 0:
225            continue
226
227        if line[0] == ":":
228            chain_array = line.rstrip()[1:].split(",")
229            continue
230
231        # external non-iptables invocation, executed as is.
232        if line[0] == "@":
233            external_cmd = line.rstrip()[1:]
234            if netns:
235                external_cmd = "ip netns exec ____iptables-container-test " + external_cmd
236            execute_cmd(external_cmd, filename, lineno)
237            continue
238
239        # external iptables invocation, executed as is.
240        if line[0] == "%":
241            external_cmd = line.rstrip()[1:]
242            if netns:
243                external_cmd = "ip netns exec ____iptables-container-test " + EXECUTEABLE + " " + external_cmd
244            execute_cmd(external_cmd, filename, lineno)
245            continue
246
247        if line[0] == "*":
248            table = line.rstrip()[1:]
249            continue
250
251        if len(chain_array) == 0:
252            print("broken test, missing chain, leaving")
253            sys.exit()
254
255        test_passed = True
256        tests += 1
257
258        for chain in chain_array:
259            item = line.split(";")
260            if table == "":
261                rule = chain + " " + item[0]
262            else:
263                rule = chain + " -t " + table + " " + item[0]
264
265            if item[1] == "=":
266                rule_save = chain + " " + item[0]
267            else:
268                rule_save = chain + " " + item[1]
269
270            res = item[2].rstrip()
271            ret = run_test(iptables, rule, rule_save,
272                           res, filename, lineno + 1, netns)
273
274            if ret < 0:
275                test_passed = False
276                total_test_passed = False
277                break
278
279        if test_passed:
280            passed += 1
281
282    if netns:
283        execute_cmd("ip netns del ____iptables-container-test", filename, 0)
284    if total_test_passed:
285        print(filename + ": " + Colors.GREEN + "OK" + Colors.ENDC)
286
287    f.close()
288    return tests, passed
289
290
291def show_missing():
292    '''
293    Show the list of missing test files
294    '''
295    file_list = os.listdir(EXTENSIONS_PATH)
296    testfiles = [i for i in file_list if i.endswith('.t')]
297    libfiles = [i for i in file_list
298                if i.startswith('lib') and i.endswith('.c')]
299
300    def test_name(x):
301        return x[0:-2] + '.t'
302    missing = [test_name(i) for i in libfiles
303               if not test_name(i) in testfiles]
304
305    print('\n'.join(missing))
306
307
308#
309# main
310#
311def main():
312    parser = argparse.ArgumentParser(description='Run iptables tests')
313    parser.add_argument('filename', nargs='*',
314                        metavar='path/to/file.t',
315                        help='Run only this test')
316    parser.add_argument('-H', '--host', action='store_true',
317                        help='Run tests against installed binaries')
318    parser.add_argument('-l', '--legacy', action='store_true',
319                        help='Test iptables-legacy')
320    parser.add_argument('-m', '--missing', action='store_true',
321                        help='Check for missing tests')
322    parser.add_argument('-n', '--nftables', action='store_true',
323                        help='Test iptables-over-nftables')
324    parser.add_argument('-N', '--netns', action='store_true',
325                        help='Test netnamespace path')
326    args = parser.parse_args()
327
328    #
329    # show list of missing test files
330    #
331    if args.missing:
332        show_missing()
333        return
334
335    global EXECUTEABLE
336    EXECUTEABLE = "xtables-legacy-multi"
337    if args.nftables:
338        EXECUTEABLE = "xtables-nft-multi"
339
340    if os.getuid() != 0:
341        print("You need to be root to run this, sorry")
342        return
343
344    if not args.host:
345        os.putenv("XTABLES_LIBDIR", os.path.abspath(EXTENSIONS_PATH))
346        os.putenv("PATH", "%s/iptables:%s" % (os.path.abspath(os.path.curdir),
347                                              os.getenv("PATH")))
348
349    test_files = 0
350    tests = 0
351    passed = 0
352
353    # setup global var log file
354    global log_file
355    try:
356        log_file = open(LOGFILE, 'w')
357    except IOError:
358        print("Couldn't open log file %s" % LOGFILE)
359        return
360
361    if args.filename:
362        file_list = args.filename
363    else:
364        file_list = [os.path.join(EXTENSIONS_PATH, i)
365                     for i in os.listdir(EXTENSIONS_PATH)
366                     if i.endswith('.t')]
367        file_list.sort()
368
369    if not args.netns:
370        try:
371            import unshare
372            unshare.unshare(unshare.CLONE_NEWNET)
373        except:
374            print("Cannot run in own namespace, connectivity might break")
375
376    for filename in file_list:
377        file_tests, file_passed = run_test_file(filename, args.netns)
378        if file_tests:
379            tests += file_tests
380            passed += file_passed
381            test_files += 1
382
383    print("%d test files, %d unit tests, %d passed" % (test_files, tests, passed))
384
385
386if __name__ == '__main__':
387    main()
388