• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3
4"""
5tdc.py - Linux tc (Traffic Control) unit test driver
6
7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
8"""
9
10import re
11import os
12import sys
13import argparse
14import json
15import subprocess
16from collections import OrderedDict
17from string import Template
18
19from tdc_config import *
20from tdc_helper import *
21
22
23USE_NS = True
24
25
26def replace_keywords(cmd):
27    """
28    For a given executable command, substitute any known
29    variables contained within NAMES with the correct values
30    """
31    tcmd = Template(cmd)
32    subcmd = tcmd.safe_substitute(NAMES)
33    return subcmd
34
35
36def exec_cmd(command, nsonly=True):
37    """
38    Perform any required modifications on an executable command, then run
39    it in a subprocess and return the results.
40    """
41    if (USE_NS and nsonly):
42        command = 'ip netns exec $NS ' + command
43
44    if '$' in command:
45        command = replace_keywords(command)
46
47    proc = subprocess.Popen(command,
48        shell=True,
49        stdout=subprocess.PIPE,
50        stderr=subprocess.PIPE)
51    (rawout, serr) = proc.communicate()
52
53    if proc.returncode != 0:
54        foutput = serr.decode("utf-8")
55    else:
56        foutput = rawout.decode("utf-8")
57
58    proc.stdout.close()
59    proc.stderr.close()
60    return proc, foutput
61
62
63def prepare_env(cmdlist):
64    """
65    Execute the setup/teardown commands for a test case. Optionally
66    terminate test execution if the command fails.
67    """
68    for cmdinfo in cmdlist:
69        if (type(cmdinfo) == list):
70            exit_codes = cmdinfo[1:]
71            cmd = cmdinfo[0]
72        else:
73            exit_codes = [0]
74            cmd = cmdinfo
75
76        if (len(cmd) == 0):
77            continue
78
79        (proc, foutput) = exec_cmd(cmd)
80
81        if proc.returncode not in exit_codes:
82            print
83            print("Could not execute:")
84            print(cmd)
85            print("\nError message:")
86            print(foutput)
87            print("\nAborting test run.")
88            ns_destroy()
89            exit(1)
90
91
92def test_runner(filtered_tests, args):
93    """
94    Driver function for the unit tests.
95
96    Prints information about the tests being run, executes the setup and
97    teardown commands and the command under test itself. Also determines
98    success/failure based on the information in the test case and generates
99    TAP output accordingly.
100    """
101    testlist = filtered_tests
102    tcount = len(testlist)
103    index = 1
104    tap = str(index) + ".." + str(tcount) + "\n"
105
106    for tidx in testlist:
107        result = True
108        tresult = ""
109        if "flower" in tidx["category"] and args.device == None:
110            continue
111        print("Test " + tidx["id"] + ": " + tidx["name"])
112        prepare_env(tidx["setup"])
113        (p, procout) = exec_cmd(tidx["cmdUnderTest"])
114        exit_code = p.returncode
115
116        if (exit_code != int(tidx["expExitCode"])):
117            result = False
118            print("exit:", exit_code, int(tidx["expExitCode"]))
119            print(procout)
120        else:
121            match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL)
122            (p, procout) = exec_cmd(tidx["verifyCmd"])
123            match_index = re.findall(match_pattern, procout)
124            if len(match_index) != int(tidx["matchCount"]):
125                result = False
126
127        if result == True:
128            tresult += "ok "
129        else:
130            tresult += "not ok "
131        tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n"
132
133        if result == False:
134            tap += procout
135
136        prepare_env(tidx["teardown"])
137        index += 1
138
139    return tap
140
141
142def ns_create():
143    """
144    Create the network namespace in which the tests will be run and set up
145    the required network devices for it.
146    """
147    if (USE_NS):
148        cmd = 'ip netns add $NS'
149        exec_cmd(cmd, False)
150        cmd = 'ip link add $DEV0 type veth peer name $DEV1'
151        exec_cmd(cmd, False)
152        cmd = 'ip link set $DEV1 netns $NS'
153        exec_cmd(cmd, False)
154        cmd = 'ip link set $DEV0 up'
155        exec_cmd(cmd, False)
156        cmd = 'ip -n $NS link set $DEV1 up'
157        exec_cmd(cmd, False)
158        cmd = 'ip link set $DEV2 netns $NS'
159        exec_cmd(cmd, False)
160        cmd = 'ip -n $NS link set $DEV2 up'
161        exec_cmd(cmd, False)
162
163
164def ns_destroy():
165    """
166    Destroy the network namespace for testing (and any associated network
167    devices as well)
168    """
169    if (USE_NS):
170        cmd = 'ip netns delete $NS'
171        exec_cmd(cmd, False)
172
173
174def has_blank_ids(idlist):
175    """
176    Search the list for empty ID fields and return true/false accordingly.
177    """
178    return not(all(k for k in idlist))
179
180
181def load_from_file(filename):
182    """
183    Open the JSON file containing the test cases and return them as an
184    ordered dictionary object.
185    """
186    with open(filename) as test_data:
187        testlist = json.load(test_data, object_pairs_hook=OrderedDict)
188    idlist = get_id_list(testlist)
189    if (has_blank_ids(idlist)):
190        for k in testlist:
191            k['filename'] = filename
192    return testlist
193
194
195def args_parse():
196    """
197    Create the argument parser.
198    """
199    parser = argparse.ArgumentParser(description='Linux TC unit tests')
200    return parser
201
202
203def set_args(parser):
204    """
205    Set the command line arguments for tdc.
206    """
207    parser.add_argument('-p', '--path', type=str,
208                        help='The full path to the tc executable to use')
209    parser.add_argument('-c', '--category', type=str, nargs='?', const='+c',
210                        help='Run tests only from the specified category, or if no category is specified, list known categories.')
211    parser.add_argument('-f', '--file', type=str,
212                        help='Run tests from the specified file')
213    parser.add_argument('-l', '--list', type=str, nargs='?', const="", metavar='CATEGORY',
214                        help='List all test cases, or those only within the specified category')
215    parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID',
216                        help='Display the test case with specified id')
217    parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID',
218                        help='Execute the single test case with specified ID')
219    parser.add_argument('-i', '--id', action='store_true', dest='gen_id',
220                        help='Generate ID numbers for new test cases')
221    parser.add_argument('-d', '--device',
222                        help='Execute the test case in flower category')
223    return parser
224
225
226def check_default_settings(args):
227    """
228    Process any arguments overriding the default settings, and ensure the
229    settings are correct.
230    """
231    # Allow for overriding specific settings
232    global NAMES
233
234    if args.path != None:
235         NAMES['TC'] = args.path
236    if args.device != None:
237         NAMES['DEV2'] = args.device
238    if not os.path.isfile(NAMES['TC']):
239        print("The specified tc path " + NAMES['TC'] + " does not exist.")
240        exit(1)
241
242
243def get_id_list(alltests):
244    """
245    Generate a list of all IDs in the test cases.
246    """
247    return [x["id"] for x in alltests]
248
249
250def check_case_id(alltests):
251    """
252    Check for duplicate test case IDs.
253    """
254    idl = get_id_list(alltests)
255    return [x for x in idl if idl.count(x) > 1]
256
257
258def does_id_exist(alltests, newid):
259    """
260    Check if a given ID already exists in the list of test cases.
261    """
262    idl = get_id_list(alltests)
263    return (any(newid == x for x in idl))
264
265
266def generate_case_ids(alltests):
267    """
268    If a test case has a blank ID field, generate a random hex ID for it
269    and then write the test cases back to disk.
270    """
271    import random
272    for c in alltests:
273        if (c["id"] == ""):
274            while True:
275                newid = str('%04x' % random.randrange(16**4))
276                if (does_id_exist(alltests, newid)):
277                    continue
278                else:
279                    c['id'] = newid
280                    break
281
282    ufilename = []
283    for c in alltests:
284        if ('filename' in c):
285            ufilename.append(c['filename'])
286    ufilename = get_unique_item(ufilename)
287    for f in ufilename:
288        testlist = []
289        for t in alltests:
290            if 'filename' in t:
291                if t['filename'] == f:
292                    del t['filename']
293                    testlist.append(t)
294        outfile = open(f, "w")
295        json.dump(testlist, outfile, indent=4)
296        outfile.close()
297
298
299def get_test_cases(args):
300    """
301    If a test case file is specified, retrieve tests from that file.
302    Otherwise, glob for all json files in subdirectories and load from
303    each one.
304    """
305    import fnmatch
306    if args.file != None:
307        if not os.path.isfile(args.file):
308            print("The specified test case file " + args.file + " does not exist.")
309            exit(1)
310        flist = [args.file]
311    else:
312        flist = []
313        for root, dirnames, filenames in os.walk('tc-tests'):
314            for filename in fnmatch.filter(filenames, '*.json'):
315                flist.append(os.path.join(root, filename))
316    alltests = list()
317    for casefile in flist:
318        alltests = alltests + (load_from_file(casefile))
319    return alltests
320
321
322def set_operation_mode(args):
323    """
324    Load the test case data and process remaining arguments to determine
325    what the script should do for this run, and call the appropriate
326    function.
327    """
328    alltests = get_test_cases(args)
329
330    if args.gen_id:
331        idlist = get_id_list(alltests)
332        if (has_blank_ids(idlist)):
333            alltests = generate_case_ids(alltests)
334        else:
335            print("No empty ID fields found in test files.")
336        exit(0)
337
338    duplicate_ids = check_case_id(alltests)
339    if (len(duplicate_ids) > 0):
340        print("The following test case IDs are not unique:")
341        print(str(set(duplicate_ids)))
342        print("Please correct them before continuing.")
343        exit(1)
344
345    ucat = get_test_categories(alltests)
346
347    if args.showID:
348        show_test_case_by_id(alltests, args.showID[0])
349        exit(0)
350
351    if args.execute:
352        target_id = args.execute[0]
353    else:
354        target_id = ""
355
356    if args.category:
357        if (args.category == '+c'):
358            print("Available categories:")
359            print_sll(ucat)
360            exit(0)
361        else:
362            target_category = args.category
363    else:
364        target_category = ""
365
366
367    testcases = get_categorized_testlist(alltests, ucat)
368
369    if args.list:
370        if (len(args.list) == 0):
371            list_test_cases(alltests)
372            exit(0)
373        elif(len(args.list > 0)):
374            if (args.list not in ucat):
375                print("Unknown category " + args.list)
376                print("Available categories:")
377                print_sll(ucat)
378                exit(1)
379            list_test_cases(testcases[args.list])
380            exit(0)
381
382    if (os.geteuid() != 0):
383        print("This script must be run with root privileges.\n")
384        exit(1)
385
386    ns_create()
387
388    if (len(target_category) == 0):
389        if (len(target_id) > 0):
390            alltests = list(filter(lambda x: target_id in x['id'], alltests))
391            if (len(alltests) == 0):
392                print("Cannot find a test case with ID matching " + target_id)
393                exit(1)
394        catresults = test_runner(alltests, args)
395        print("All test results: " + "\n\n" + catresults)
396    elif (len(target_category) > 0):
397        if (target_category == "flower") and args.device == None:
398            print("Please specify a NIC device (-d) to run category flower")
399            exit(1)
400        if (target_category not in ucat):
401            print("Specified category is not present in this file.")
402            exit(1)
403        else:
404            catresults = test_runner(testcases[target_category], args)
405            print("Category " + target_category + "\n\n" + catresults)
406
407    ns_destroy()
408
409
410def main():
411    """
412    Start of execution; set up argument parser and get the arguments,
413    and start operations.
414    """
415    parser = args_parse()
416    parser = set_args(parser)
417    (args, remaining) = parser.parse_known_args()
418    check_default_settings(args)
419
420    set_operation_mode(args)
421
422    exit(0)
423
424
425if __name__ == "__main__":
426    main()
427