1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3 4""" 5tdc.py - Linux tc (Traffic Control) unit test driver 6 7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com> 8""" 9 10import re 11import os 12import sys 13import argparse 14import importlib 15import json 16import subprocess 17import time 18import traceback 19from collections import OrderedDict 20from string import Template 21 22from tdc_config import * 23from tdc_helper import * 24 25import TdcPlugin 26 27 28class PluginMgrTestFail(Exception): 29 def __init__(self, stage, output, message): 30 self.stage = stage 31 self.output = output 32 self.message = message 33 34class PluginMgr: 35 def __init__(self, argparser): 36 super().__init__() 37 self.plugins = {} 38 self.plugin_instances = [] 39 self.args = [] 40 self.argparser = argparser 41 42 # TODO, put plugins in order 43 plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins') 44 for dirpath, dirnames, filenames in os.walk(plugindir): 45 for fn in filenames: 46 if (fn.endswith('.py') and 47 not fn == '__init__.py' and 48 not fn.startswith('#') and 49 not fn.startswith('.#')): 50 mn = fn[0:-3] 51 foo = importlib.import_module('plugins.' + mn) 52 self.plugins[mn] = foo 53 self.plugin_instances.append(foo.SubPlugin()) 54 55 def call_pre_suite(self, testcount, testidlist): 56 for pgn_inst in self.plugin_instances: 57 pgn_inst.pre_suite(testcount, testidlist) 58 59 def call_post_suite(self, index): 60 for pgn_inst in reversed(self.plugin_instances): 61 pgn_inst.post_suite(index) 62 63 def call_pre_case(self, test_ordinal, testid): 64 for pgn_inst in self.plugin_instances: 65 try: 66 pgn_inst.pre_case(test_ordinal, testid) 67 except Exception as ee: 68 print('exception {} in call to pre_case for {} plugin'. 69 format(ee, pgn_inst.__class__)) 70 print('test_ordinal is {}'.format(test_ordinal)) 71 print('testid is {}'.format(testid)) 72 raise 73 74 def call_post_case(self): 75 for pgn_inst in reversed(self.plugin_instances): 76 pgn_inst.post_case() 77 78 def call_pre_execute(self): 79 for pgn_inst in self.plugin_instances: 80 pgn_inst.pre_execute() 81 82 def call_post_execute(self): 83 for pgn_inst in reversed(self.plugin_instances): 84 pgn_inst.post_execute() 85 86 def call_add_args(self, parser): 87 for pgn_inst in self.plugin_instances: 88 parser = pgn_inst.add_args(parser) 89 return parser 90 91 def call_check_args(self, args, remaining): 92 for pgn_inst in self.plugin_instances: 93 pgn_inst.check_args(args, remaining) 94 95 def call_adjust_command(self, stage, command): 96 for pgn_inst in self.plugin_instances: 97 command = pgn_inst.adjust_command(stage, command) 98 return command 99 100 @staticmethod 101 def _make_argparser(args): 102 self.argparser = argparse.ArgumentParser( 103 description='Linux TC unit tests') 104 105 106def replace_keywords(cmd): 107 """ 108 For a given executable command, substitute any known 109 variables contained within NAMES with the correct values 110 """ 111 tcmd = Template(cmd) 112 subcmd = tcmd.safe_substitute(NAMES) 113 return subcmd 114 115 116def exec_cmd(args, pm, stage, command): 117 """ 118 Perform any required modifications on an executable command, then run 119 it in a subprocess and return the results. 120 """ 121 if len(command.strip()) == 0: 122 return None, None 123 if '$' in command: 124 command = replace_keywords(command) 125 126 command = pm.call_adjust_command(stage, command) 127 if args.verbose > 0: 128 print('command "{}"'.format(command)) 129 proc = subprocess.Popen(command, 130 shell=True, 131 stdout=subprocess.PIPE, 132 stderr=subprocess.PIPE, 133 env=ENVIR) 134 (rawout, serr) = proc.communicate() 135 136 if proc.returncode != 0 and len(serr) > 0: 137 foutput = serr.decode("utf-8", errors="ignore") 138 else: 139 foutput = rawout.decode("utf-8", errors="ignore") 140 141 proc.stdout.close() 142 proc.stderr.close() 143 return proc, foutput 144 145 146def prepare_env(args, pm, stage, prefix, cmdlist, output = None): 147 """ 148 Execute the setup/teardown commands for a test case. 149 Optionally terminate test execution if the command fails. 150 """ 151 if args.verbose > 0: 152 print('{}'.format(prefix)) 153 for cmdinfo in cmdlist: 154 if isinstance(cmdinfo, list): 155 exit_codes = cmdinfo[1:] 156 cmd = cmdinfo[0] 157 else: 158 exit_codes = [0] 159 cmd = cmdinfo 160 161 if not cmd: 162 continue 163 164 (proc, foutput) = exec_cmd(args, pm, stage, cmd) 165 166 if proc and (proc.returncode not in exit_codes): 167 print('', file=sys.stderr) 168 print("{} *** Could not execute: \"{}\"".format(prefix, cmd), 169 file=sys.stderr) 170 print("\n{} *** Error message: \"{}\"".format(prefix, foutput), 171 file=sys.stderr) 172 print("returncode {}; expected {}".format(proc.returncode, 173 exit_codes)) 174 print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr) 175 print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr) 176 print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr) 177 raise PluginMgrTestFail( 178 stage, output, 179 '"{}" did not complete successfully'.format(prefix)) 180 181def run_one_test(pm, args, index, tidx): 182 global NAMES 183 result = True 184 tresult = "" 185 tap = "" 186 if args.verbose > 0: 187 print("\t====================\n=====> ", end="") 188 print("Test " + tidx["id"] + ": " + tidx["name"]) 189 190 # populate NAMES with TESTID for this test 191 NAMES['TESTID'] = tidx['id'] 192 193 pm.call_pre_case(index, tidx['id']) 194 prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"]) 195 196 if (args.verbose > 0): 197 print('-----> execute stage') 198 pm.call_pre_execute() 199 (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"]) 200 if p: 201 exit_code = p.returncode 202 else: 203 exit_code = None 204 205 pm.call_post_execute() 206 207 if (exit_code is None or exit_code != int(tidx["expExitCode"])): 208 result = False 209 print("exit: {!r}".format(exit_code)) 210 print("exit: {}".format(int(tidx["expExitCode"]))) 211 #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"]))) 212 print(procout) 213 else: 214 if args.verbose > 0: 215 print('-----> verify stage') 216 match_pattern = re.compile( 217 str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE) 218 (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"]) 219 if procout: 220 match_index = re.findall(match_pattern, procout) 221 if len(match_index) != int(tidx["matchCount"]): 222 result = False 223 elif int(tidx["matchCount"]) != 0: 224 result = False 225 226 if not result: 227 tresult += 'not ' 228 tresult += 'ok {} - {} # {}\n'.format(str(index), tidx['id'], tidx['name']) 229 tap += tresult 230 231 if result == False: 232 if procout: 233 tap += procout 234 else: 235 tap += 'No output!\n' 236 237 prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout) 238 pm.call_post_case() 239 240 index += 1 241 242 # remove TESTID from NAMES 243 del(NAMES['TESTID']) 244 return tap 245 246def test_runner(pm, args, filtered_tests): 247 """ 248 Driver function for the unit tests. 249 250 Prints information about the tests being run, executes the setup and 251 teardown commands and the command under test itself. Also determines 252 success/failure based on the information in the test case and generates 253 TAP output accordingly. 254 """ 255 testlist = filtered_tests 256 tcount = len(testlist) 257 index = 1 258 tap = '' 259 badtest = None 260 stage = None 261 emergency_exit = False 262 emergency_exit_message = '' 263 264 if args.notap: 265 if args.verbose: 266 tap = 'notap requested: omitting test plan\n' 267 else: 268 tap = str(index) + ".." + str(tcount) + "\n" 269 try: 270 pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist]) 271 except Exception as ee: 272 ex_type, ex, ex_tb = sys.exc_info() 273 print('Exception {} {} (caught in pre_suite).'. 274 format(ex_type, ex)) 275 # when the extra print statements are uncommented, 276 # the traceback does not appear between them 277 # (it appears way earlier in the tdc.py output) 278 # so don't bother ... 279 # print('--------------------(') 280 # print('traceback') 281 traceback.print_tb(ex_tb) 282 # print('--------------------)') 283 emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex) 284 emergency_exit = True 285 stage = 'pre-SUITE' 286 287 if emergency_exit: 288 pm.call_post_suite(index) 289 return emergency_exit_message 290 if args.verbose > 1: 291 print('give test rig 2 seconds to stabilize') 292 time.sleep(2) 293 for tidx in testlist: 294 if "flower" in tidx["category"] and args.device == None: 295 if args.verbose > 1: 296 print('Not executing test {} {} because DEV2 not defined'. 297 format(tidx['id'], tidx['name'])) 298 continue 299 try: 300 badtest = tidx # in case it goes bad 301 tap += run_one_test(pm, args, index, tidx) 302 except PluginMgrTestFail as pmtf: 303 ex_type, ex, ex_tb = sys.exc_info() 304 stage = pmtf.stage 305 message = pmtf.message 306 output = pmtf.output 307 print(message) 308 print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'. 309 format(ex_type, ex, index, tidx['id'], tidx['name'], stage)) 310 print('---------------') 311 print('traceback') 312 traceback.print_tb(ex_tb) 313 print('---------------') 314 if stage == 'teardown': 315 print('accumulated output for this test:') 316 if pmtf.output: 317 print(pmtf.output) 318 print('---------------') 319 break 320 index += 1 321 322 # if we failed in setup or teardown, 323 # fill in the remaining tests with ok-skipped 324 count = index 325 if not args.notap: 326 tap += 'about to flush the tap output if tests need to be skipped\n' 327 if tcount + 1 != index: 328 for tidx in testlist[index - 1:]: 329 msg = 'skipped - previous {} failed'.format(stage) 330 tap += 'ok {} - {} # {} {} {}\n'.format( 331 count, tidx['id'], msg, index, badtest.get('id', '--Unknown--')) 332 count += 1 333 334 tap += 'done flushing skipped test tap output\n' 335 336 if args.pause: 337 print('Want to pause\nPress enter to continue ...') 338 if input(sys.stdin): 339 print('got something on stdin') 340 341 pm.call_post_suite(index) 342 343 return tap 344 345def has_blank_ids(idlist): 346 """ 347 Search the list for empty ID fields and return true/false accordingly. 348 """ 349 return not(all(k for k in idlist)) 350 351 352def load_from_file(filename): 353 """ 354 Open the JSON file containing the test cases and return them 355 as list of ordered dictionary objects. 356 """ 357 try: 358 with open(filename) as test_data: 359 testlist = json.load(test_data, object_pairs_hook=OrderedDict) 360 except json.JSONDecodeError as jde: 361 print('IGNORING test case file {}\n\tBECAUSE: {}'.format(filename, jde)) 362 testlist = list() 363 else: 364 idlist = get_id_list(testlist) 365 if (has_blank_ids(idlist)): 366 for k in testlist: 367 k['filename'] = filename 368 return testlist 369 370 371def args_parse(): 372 """ 373 Create the argument parser. 374 """ 375 parser = argparse.ArgumentParser(description='Linux TC unit tests') 376 return parser 377 378 379def set_args(parser): 380 """ 381 Set the command line arguments for tdc. 382 """ 383 parser.add_argument( 384 '-p', '--path', type=str, 385 help='The full path to the tc executable to use') 386 sg = parser.add_argument_group( 387 'selection', 'select which test cases: ' + 388 'files plus directories; filtered by categories plus testids') 389 ag = parser.add_argument_group( 390 'action', 'select action to perform on selected test cases') 391 392 sg.add_argument( 393 '-D', '--directory', nargs='+', metavar='DIR', 394 help='Collect tests from the specified directory(ies) ' + 395 '(default [tc-tests])') 396 sg.add_argument( 397 '-f', '--file', nargs='+', metavar='FILE', 398 help='Run tests from the specified file(s)') 399 sg.add_argument( 400 '-c', '--category', nargs='*', metavar='CATG', default=['+c'], 401 help='Run tests only from the specified category/ies, ' + 402 'or if no category/ies is/are specified, list known categories.') 403 sg.add_argument( 404 '-e', '--execute', nargs='+', metavar='ID', 405 help='Execute the specified test cases with specified IDs') 406 ag.add_argument( 407 '-l', '--list', action='store_true', 408 help='List all test cases, or those only within the specified category') 409 ag.add_argument( 410 '-s', '--show', action='store_true', dest='showID', 411 help='Display the selected test cases') 412 ag.add_argument( 413 '-i', '--id', action='store_true', dest='gen_id', 414 help='Generate ID numbers for new test cases') 415 parser.add_argument( 416 '-v', '--verbose', action='count', default=0, 417 help='Show the commands that are being run') 418 parser.add_argument( 419 '-N', '--notap', action='store_true', 420 help='Suppress tap results for command under test') 421 parser.add_argument('-d', '--device', 422 help='Execute the test case in flower category') 423 parser.add_argument( 424 '-P', '--pause', action='store_true', 425 help='Pause execution just before post-suite stage') 426 return parser 427 428 429def check_default_settings(args, remaining, pm): 430 """ 431 Process any arguments overriding the default settings, 432 and ensure the settings are correct. 433 """ 434 # Allow for overriding specific settings 435 global NAMES 436 437 if args.path != None: 438 NAMES['TC'] = args.path 439 if args.device != None: 440 NAMES['DEV2'] = args.device 441 if not os.path.isfile(NAMES['TC']): 442 print("The specified tc path " + NAMES['TC'] + " does not exist.") 443 exit(1) 444 445 pm.call_check_args(args, remaining) 446 447 448def get_id_list(alltests): 449 """ 450 Generate a list of all IDs in the test cases. 451 """ 452 return [x["id"] for x in alltests] 453 454 455def check_case_id(alltests): 456 """ 457 Check for duplicate test case IDs. 458 """ 459 idl = get_id_list(alltests) 460 return [x for x in idl if idl.count(x) > 1] 461 462 463def does_id_exist(alltests, newid): 464 """ 465 Check if a given ID already exists in the list of test cases. 466 """ 467 idl = get_id_list(alltests) 468 return (any(newid == x for x in idl)) 469 470 471def generate_case_ids(alltests): 472 """ 473 If a test case has a blank ID field, generate a random hex ID for it 474 and then write the test cases back to disk. 475 """ 476 import random 477 for c in alltests: 478 if (c["id"] == ""): 479 while True: 480 newid = str('{:04x}'.format(random.randrange(16**4))) 481 if (does_id_exist(alltests, newid)): 482 continue 483 else: 484 c['id'] = newid 485 break 486 487 ufilename = [] 488 for c in alltests: 489 if ('filename' in c): 490 ufilename.append(c['filename']) 491 ufilename = get_unique_item(ufilename) 492 for f in ufilename: 493 testlist = [] 494 for t in alltests: 495 if 'filename' in t: 496 if t['filename'] == f: 497 del t['filename'] 498 testlist.append(t) 499 outfile = open(f, "w") 500 json.dump(testlist, outfile, indent=4) 501 outfile.write("\n") 502 outfile.close() 503 504def filter_tests_by_id(args, testlist): 505 ''' 506 Remove tests from testlist that are not in the named id list. 507 If id list is empty, return empty list. 508 ''' 509 newlist = list() 510 if testlist and args.execute: 511 target_ids = args.execute 512 513 if isinstance(target_ids, list) and (len(target_ids) > 0): 514 newlist = list(filter(lambda x: x['id'] in target_ids, testlist)) 515 return newlist 516 517def filter_tests_by_category(args, testlist): 518 ''' 519 Remove tests from testlist that are not in a named category. 520 ''' 521 answer = list() 522 if args.category and testlist: 523 test_ids = list() 524 for catg in set(args.category): 525 if catg == '+c': 526 continue 527 print('considering category {}'.format(catg)) 528 for tc in testlist: 529 if catg in tc['category'] and tc['id'] not in test_ids: 530 answer.append(tc) 531 test_ids.append(tc['id']) 532 533 return answer 534 535def get_test_cases(args): 536 """ 537 If a test case file is specified, retrieve tests from that file. 538 Otherwise, glob for all json files in subdirectories and load from 539 each one. 540 Also, if requested, filter by category, and add tests matching 541 certain ids. 542 """ 543 import fnmatch 544 545 flist = [] 546 testdirs = ['tc-tests'] 547 548 if args.file: 549 # at least one file was specified - remove the default directory 550 testdirs = [] 551 552 for ff in args.file: 553 if not os.path.isfile(ff): 554 print("IGNORING file " + ff + "\n\tBECAUSE does not exist.") 555 else: 556 flist.append(os.path.abspath(ff)) 557 558 if args.directory: 559 testdirs = args.directory 560 561 for testdir in testdirs: 562 for root, dirnames, filenames in os.walk(testdir): 563 for filename in fnmatch.filter(filenames, '*.json'): 564 candidate = os.path.abspath(os.path.join(root, filename)) 565 if candidate not in testdirs: 566 flist.append(candidate) 567 568 alltestcases = list() 569 for casefile in flist: 570 alltestcases = alltestcases + (load_from_file(casefile)) 571 572 allcatlist = get_test_categories(alltestcases) 573 allidlist = get_id_list(alltestcases) 574 575 testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist) 576 idtestcases = filter_tests_by_id(args, alltestcases) 577 cattestcases = filter_tests_by_category(args, alltestcases) 578 579 cat_ids = [x['id'] for x in cattestcases] 580 if args.execute: 581 if args.category: 582 alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids] 583 else: 584 alltestcases = idtestcases 585 else: 586 if cat_ids: 587 alltestcases = cattestcases 588 else: 589 # just accept the existing value of alltestcases, 590 # which has been filtered by file/directory 591 pass 592 593 return allcatlist, allidlist, testcases_by_cats, alltestcases 594 595 596def set_operation_mode(pm, args): 597 """ 598 Load the test case data and process remaining arguments to determine 599 what the script should do for this run, and call the appropriate 600 function. 601 """ 602 ucat, idlist, testcases, alltests = get_test_cases(args) 603 604 if args.gen_id: 605 if (has_blank_ids(idlist)): 606 alltests = generate_case_ids(alltests) 607 else: 608 print("No empty ID fields found in test files.") 609 exit(0) 610 611 duplicate_ids = check_case_id(alltests) 612 if (len(duplicate_ids) > 0): 613 print("The following test case IDs are not unique:") 614 print(str(set(duplicate_ids))) 615 print("Please correct them before continuing.") 616 exit(1) 617 618 if args.showID: 619 for atest in alltests: 620 print_test_case(atest) 621 exit(0) 622 623 if isinstance(args.category, list) and (len(args.category) == 0): 624 print("Available categories:") 625 print_sll(ucat) 626 exit(0) 627 628 if args.list: 629 if args.list: 630 list_test_cases(alltests) 631 exit(0) 632 633 if len(alltests): 634 catresults = test_runner(pm, args, alltests) 635 else: 636 catresults = 'No tests found\n' 637 if args.notap: 638 print('Tap output suppression requested\n') 639 else: 640 print('All test results: \n\n{}'.format(catresults)) 641 642def main(): 643 """ 644 Start of execution; set up argument parser and get the arguments, 645 and start operations. 646 """ 647 parser = args_parse() 648 parser = set_args(parser) 649 pm = PluginMgr(parser) 650 parser = pm.call_add_args(parser) 651 (args, remaining) = parser.parse_known_args() 652 args.NAMES = NAMES 653 check_default_settings(args, remaining, pm) 654 if args.verbose > 2: 655 print('args is {}'.format(args)) 656 657 set_operation_mode(pm, args) 658 659 exit(0) 660 661 662if __name__ == "__main__": 663 main() 664