1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3 4""" 5tdc.py - Linux tc (Traffic Control) unit test driver 6 7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com> 8""" 9 10import re 11import os 12import sys 13import argparse 14import json 15import subprocess 16from collections import OrderedDict 17from string import Template 18 19from tdc_config import * 20from tdc_helper import * 21 22 23USE_NS = True 24 25 26def replace_keywords(cmd): 27 """ 28 For a given executable command, substitute any known 29 variables contained within NAMES with the correct values 30 """ 31 tcmd = Template(cmd) 32 subcmd = tcmd.safe_substitute(NAMES) 33 return subcmd 34 35 36def exec_cmd(command, nsonly=True): 37 """ 38 Perform any required modifications on an executable command, then run 39 it in a subprocess and return the results. 40 """ 41 if (USE_NS and nsonly): 42 command = 'ip netns exec $NS ' + command 43 44 if '$' in command: 45 command = replace_keywords(command) 46 47 proc = subprocess.Popen(command, 48 shell=True, 49 stdout=subprocess.PIPE, 50 stderr=subprocess.PIPE) 51 (rawout, serr) = proc.communicate() 52 53 if proc.returncode != 0: 54 foutput = serr.decode("utf-8") 55 else: 56 foutput = rawout.decode("utf-8") 57 58 proc.stdout.close() 59 proc.stderr.close() 60 return proc, foutput 61 62 63def prepare_env(cmdlist): 64 """ 65 Execute the setup/teardown commands for a test case. Optionally 66 terminate test execution if the command fails. 67 """ 68 for cmdinfo in cmdlist: 69 if (type(cmdinfo) == list): 70 exit_codes = cmdinfo[1:] 71 cmd = cmdinfo[0] 72 else: 73 exit_codes = [0] 74 cmd = cmdinfo 75 76 if (len(cmd) == 0): 77 continue 78 79 (proc, foutput) = exec_cmd(cmd) 80 81 if proc.returncode not in exit_codes: 82 print 83 print("Could not execute:") 84 print(cmd) 85 print("\nError message:") 86 print(foutput) 87 print("\nAborting test run.") 88 ns_destroy() 89 exit(1) 90 91 92def test_runner(filtered_tests, args): 93 """ 94 Driver function for the unit tests. 95 96 Prints information about the tests being run, executes the setup and 97 teardown commands and the command under test itself. Also determines 98 success/failure based on the information in the test case and generates 99 TAP output accordingly. 100 """ 101 testlist = filtered_tests 102 tcount = len(testlist) 103 index = 1 104 tap = str(index) + ".." + str(tcount) + "\n" 105 106 for tidx in testlist: 107 result = True 108 tresult = "" 109 if "flower" in tidx["category"] and args.device == None: 110 continue 111 print("Test " + tidx["id"] + ": " + tidx["name"]) 112 prepare_env(tidx["setup"]) 113 (p, procout) = exec_cmd(tidx["cmdUnderTest"]) 114 exit_code = p.returncode 115 116 if (exit_code != int(tidx["expExitCode"])): 117 result = False 118 print("exit:", exit_code, int(tidx["expExitCode"])) 119 print(procout) 120 else: 121 match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL) 122 (p, procout) = exec_cmd(tidx["verifyCmd"]) 123 match_index = re.findall(match_pattern, procout) 124 if len(match_index) != int(tidx["matchCount"]): 125 result = False 126 127 if result == True: 128 tresult += "ok " 129 else: 130 tresult += "not ok " 131 tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n" 132 133 if result == False: 134 tap += procout 135 136 prepare_env(tidx["teardown"]) 137 index += 1 138 139 return tap 140 141 142def ns_create(): 143 """ 144 Create the network namespace in which the tests will be run and set up 145 the required network devices for it. 146 """ 147 if (USE_NS): 148 cmd = 'ip netns add $NS' 149 exec_cmd(cmd, False) 150 cmd = 'ip link add $DEV0 type veth peer name $DEV1' 151 exec_cmd(cmd, False) 152 cmd = 'ip link set $DEV1 netns $NS' 153 exec_cmd(cmd, False) 154 cmd = 'ip link set $DEV0 up' 155 exec_cmd(cmd, False) 156 cmd = 'ip -n $NS link set $DEV1 up' 157 exec_cmd(cmd, False) 158 cmd = 'ip link set $DEV2 netns $NS' 159 exec_cmd(cmd, False) 160 cmd = 'ip -n $NS link set $DEV2 up' 161 exec_cmd(cmd, False) 162 163 164def ns_destroy(): 165 """ 166 Destroy the network namespace for testing (and any associated network 167 devices as well) 168 """ 169 if (USE_NS): 170 cmd = 'ip netns delete $NS' 171 exec_cmd(cmd, False) 172 173 174def has_blank_ids(idlist): 175 """ 176 Search the list for empty ID fields and return true/false accordingly. 177 """ 178 return not(all(k for k in idlist)) 179 180 181def load_from_file(filename): 182 """ 183 Open the JSON file containing the test cases and return them as an 184 ordered dictionary object. 185 """ 186 with open(filename) as test_data: 187 testlist = json.load(test_data, object_pairs_hook=OrderedDict) 188 idlist = get_id_list(testlist) 189 if (has_blank_ids(idlist)): 190 for k in testlist: 191 k['filename'] = filename 192 return testlist 193 194 195def args_parse(): 196 """ 197 Create the argument parser. 198 """ 199 parser = argparse.ArgumentParser(description='Linux TC unit tests') 200 return parser 201 202 203def set_args(parser): 204 """ 205 Set the command line arguments for tdc. 206 """ 207 parser.add_argument('-p', '--path', type=str, 208 help='The full path to the tc executable to use') 209 parser.add_argument('-c', '--category', type=str, nargs='?', const='+c', 210 help='Run tests only from the specified category, or if no category is specified, list known categories.') 211 parser.add_argument('-f', '--file', type=str, 212 help='Run tests from the specified file') 213 parser.add_argument('-l', '--list', type=str, nargs='?', const="", metavar='CATEGORY', 214 help='List all test cases, or those only within the specified category') 215 parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID', 216 help='Display the test case with specified id') 217 parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID', 218 help='Execute the single test case with specified ID') 219 parser.add_argument('-i', '--id', action='store_true', dest='gen_id', 220 help='Generate ID numbers for new test cases') 221 parser.add_argument('-d', '--device', 222 help='Execute the test case in flower category') 223 return parser 224 225 226def check_default_settings(args): 227 """ 228 Process any arguments overriding the default settings, and ensure the 229 settings are correct. 230 """ 231 # Allow for overriding specific settings 232 global NAMES 233 234 if args.path != None: 235 NAMES['TC'] = args.path 236 if args.device != None: 237 NAMES['DEV2'] = args.device 238 if not os.path.isfile(NAMES['TC']): 239 print("The specified tc path " + NAMES['TC'] + " does not exist.") 240 exit(1) 241 242 243def get_id_list(alltests): 244 """ 245 Generate a list of all IDs in the test cases. 246 """ 247 return [x["id"] for x in alltests] 248 249 250def check_case_id(alltests): 251 """ 252 Check for duplicate test case IDs. 253 """ 254 idl = get_id_list(alltests) 255 return [x for x in idl if idl.count(x) > 1] 256 257 258def does_id_exist(alltests, newid): 259 """ 260 Check if a given ID already exists in the list of test cases. 261 """ 262 idl = get_id_list(alltests) 263 return (any(newid == x for x in idl)) 264 265 266def generate_case_ids(alltests): 267 """ 268 If a test case has a blank ID field, generate a random hex ID for it 269 and then write the test cases back to disk. 270 """ 271 import random 272 for c in alltests: 273 if (c["id"] == ""): 274 while True: 275 newid = str('%04x' % random.randrange(16**4)) 276 if (does_id_exist(alltests, newid)): 277 continue 278 else: 279 c['id'] = newid 280 break 281 282 ufilename = [] 283 for c in alltests: 284 if ('filename' in c): 285 ufilename.append(c['filename']) 286 ufilename = get_unique_item(ufilename) 287 for f in ufilename: 288 testlist = [] 289 for t in alltests: 290 if 'filename' in t: 291 if t['filename'] == f: 292 del t['filename'] 293 testlist.append(t) 294 outfile = open(f, "w") 295 json.dump(testlist, outfile, indent=4) 296 outfile.close() 297 298 299def get_test_cases(args): 300 """ 301 If a test case file is specified, retrieve tests from that file. 302 Otherwise, glob for all json files in subdirectories and load from 303 each one. 304 """ 305 import fnmatch 306 if args.file != None: 307 if not os.path.isfile(args.file): 308 print("The specified test case file " + args.file + " does not exist.") 309 exit(1) 310 flist = [args.file] 311 else: 312 flist = [] 313 for root, dirnames, filenames in os.walk('tc-tests'): 314 for filename in fnmatch.filter(filenames, '*.json'): 315 flist.append(os.path.join(root, filename)) 316 alltests = list() 317 for casefile in flist: 318 alltests = alltests + (load_from_file(casefile)) 319 return alltests 320 321 322def set_operation_mode(args): 323 """ 324 Load the test case data and process remaining arguments to determine 325 what the script should do for this run, and call the appropriate 326 function. 327 """ 328 alltests = get_test_cases(args) 329 330 if args.gen_id: 331 idlist = get_id_list(alltests) 332 if (has_blank_ids(idlist)): 333 alltests = generate_case_ids(alltests) 334 else: 335 print("No empty ID fields found in test files.") 336 exit(0) 337 338 duplicate_ids = check_case_id(alltests) 339 if (len(duplicate_ids) > 0): 340 print("The following test case IDs are not unique:") 341 print(str(set(duplicate_ids))) 342 print("Please correct them before continuing.") 343 exit(1) 344 345 ucat = get_test_categories(alltests) 346 347 if args.showID: 348 show_test_case_by_id(alltests, args.showID[0]) 349 exit(0) 350 351 if args.execute: 352 target_id = args.execute[0] 353 else: 354 target_id = "" 355 356 if args.category: 357 if (args.category == '+c'): 358 print("Available categories:") 359 print_sll(ucat) 360 exit(0) 361 else: 362 target_category = args.category 363 else: 364 target_category = "" 365 366 367 testcases = get_categorized_testlist(alltests, ucat) 368 369 if args.list: 370 if (len(args.list) == 0): 371 list_test_cases(alltests) 372 exit(0) 373 elif(len(args.list > 0)): 374 if (args.list not in ucat): 375 print("Unknown category " + args.list) 376 print("Available categories:") 377 print_sll(ucat) 378 exit(1) 379 list_test_cases(testcases[args.list]) 380 exit(0) 381 382 if (os.geteuid() != 0): 383 print("This script must be run with root privileges.\n") 384 exit(1) 385 386 ns_create() 387 388 if (len(target_category) == 0): 389 if (len(target_id) > 0): 390 alltests = list(filter(lambda x: target_id in x['id'], alltests)) 391 if (len(alltests) == 0): 392 print("Cannot find a test case with ID matching " + target_id) 393 exit(1) 394 catresults = test_runner(alltests, args) 395 print("All test results: " + "\n\n" + catresults) 396 elif (len(target_category) > 0): 397 if (target_category == "flower") and args.device == None: 398 print("Please specify a NIC device (-d) to run category flower") 399 exit(1) 400 if (target_category not in ucat): 401 print("Specified category is not present in this file.") 402 exit(1) 403 else: 404 catresults = test_runner(testcases[target_category], args) 405 print("Category " + target_category + "\n\n" + catresults) 406 407 ns_destroy() 408 409 410def main(): 411 """ 412 Start of execution; set up argument parser and get the arguments, 413 and start operations. 414 """ 415 parser = args_parse() 416 parser = set_args(parser) 417 (args, remaining) = parser.parse_known_args() 418 check_default_settings(args) 419 420 set_operation_mode(args) 421 422 exit(0) 423 424 425if __name__ == "__main__": 426 main() 427