1#!/usr/bin/env python3 2 3"""Analyze the test outcomes from a full CI run. 4 5This script can also run on outcomes from a partial run, but the results are 6less likely to be useful. 7""" 8 9import argparse 10import sys 11import traceback 12import re 13 14import check_test_cases 15 16class Results: 17 """Process analysis results.""" 18 19 def __init__(self): 20 self.error_count = 0 21 self.warning_count = 0 22 23 @staticmethod 24 def log(fmt, *args, **kwargs): 25 sys.stderr.write((fmt + '\n').format(*args, **kwargs)) 26 27 def error(self, fmt, *args, **kwargs): 28 self.log('Error: ' + fmt, *args, **kwargs) 29 self.error_count += 1 30 31 def warning(self, fmt, *args, **kwargs): 32 self.log('Warning: ' + fmt, *args, **kwargs) 33 self.warning_count += 1 34 35class TestCaseOutcomes: 36 """The outcomes of one test case across many configurations.""" 37 # pylint: disable=too-few-public-methods 38 39 def __init__(self): 40 # Collect a list of witnesses of the test case succeeding or failing. 41 # Currently we don't do anything with witnesses except count them. 42 # The format of a witness is determined by the read_outcome_file 43 # function; it's the platform and configuration joined by ';'. 44 self.successes = [] 45 self.failures = [] 46 47 def hits(self): 48 """Return the number of times a test case has been run. 49 50 This includes passes and failures, but not skips. 51 """ 52 return len(self.successes) + len(self.failures) 53 54def analyze_coverage(results, outcomes, allow_list, full_coverage): 55 """Check that all available test cases are executed at least once.""" 56 available = check_test_cases.collect_available_test_cases() 57 for key in available: 58 hits = outcomes[key].hits() if key in outcomes else 0 59 if hits == 0 and key not in allow_list: 60 if full_coverage: 61 results.error('Test case not executed: {}', key) 62 else: 63 results.warning('Test case not executed: {}', key) 64 elif hits != 0 and key in allow_list: 65 # Test Case should be removed from the allow list. 66 if full_coverage: 67 results.error('Allow listed test case was executed: {}', key) 68 else: 69 results.warning('Allow listed test case was executed: {}', key) 70 71def analyze_outcomes(outcomes, args): 72 """Run all analyses on the given outcome collection.""" 73 results = Results() 74 analyze_coverage(results, outcomes, args['allow_list'], 75 args['full_coverage']) 76 return results 77 78def read_outcome_file(outcome_file): 79 """Parse an outcome file and return an outcome collection. 80 81An outcome collection is a dictionary mapping keys to TestCaseOutcomes objects. 82The keys are the test suite name and the test case description, separated 83by a semicolon. 84""" 85 outcomes = {} 86 with open(outcome_file, 'r', encoding='utf-8') as input_file: 87 for line in input_file: 88 (platform, config, suite, case, result, _cause) = line.split(';') 89 key = ';'.join([suite, case]) 90 setup = ';'.join([platform, config]) 91 if key not in outcomes: 92 outcomes[key] = TestCaseOutcomes() 93 if result == 'PASS': 94 outcomes[key].successes.append(setup) 95 elif result == 'FAIL': 96 outcomes[key].failures.append(setup) 97 return outcomes 98 99def do_analyze_coverage(outcome_file, args): 100 """Perform coverage analysis.""" 101 outcomes = read_outcome_file(outcome_file) 102 Results.log("\n*** Analyze coverage ***\n") 103 results = analyze_outcomes(outcomes, args) 104 return results.error_count == 0 105 106# List of tasks with a function that can handle this task and additional arguments if required 107TASKS = { 108 'analyze_coverage': { 109 'test_function': do_analyze_coverage, 110 'args': { 111 'allow_list': [ 112 # Algorithm not supported yet 113 'test_suite_psa_crypto_metadata;Asymmetric signature: pure EdDSA', 114 # Algorithm not supported yet 115 'test_suite_psa_crypto_metadata;Cipher: XTS', 116 ], 117 'full_coverage': False, 118 } 119 }, 120} 121 122def main(): 123 try: 124 parser = argparse.ArgumentParser(description=__doc__) 125 parser.add_argument('outcomes', metavar='OUTCOMES.CSV', 126 help='Outcome file to analyze') 127 parser.add_argument('task', default='all', nargs='?', 128 help='Analysis to be done. By default, run all tasks. ' 129 'With one or more TASK, run only those. ' 130 'TASK can be the name of a single task or ' 131 'comma/space-separated list of tasks. ') 132 parser.add_argument('--list', action='store_true', 133 help='List all available tasks and exit.') 134 parser.add_argument('--require-full-coverage', action='store_true', 135 dest='full_coverage', help="Require all available " 136 "test cases to be executed and issue an error " 137 "otherwise. This flag is ignored if 'task' is " 138 "neither 'all' nor 'analyze_coverage'") 139 options = parser.parse_args() 140 141 if options.list: 142 for task in TASKS: 143 Results.log(task) 144 sys.exit(0) 145 146 result = True 147 148 if options.task == 'all': 149 tasks = TASKS.keys() 150 else: 151 tasks = re.split(r'[, ]+', options.task) 152 153 for task in tasks: 154 if task not in TASKS: 155 Results.log('Error: invalid task: {}'.format(task)) 156 sys.exit(1) 157 158 TASKS['analyze_coverage']['args']['full_coverage'] = \ 159 options.full_coverage 160 161 for task in TASKS: 162 if task in tasks: 163 if not TASKS[task]['test_function'](options.outcomes, TASKS[task]['args']): 164 result = False 165 166 if result is False: 167 sys.exit(1) 168 Results.log("SUCCESS :-)") 169 except Exception: # pylint: disable=broad-except 170 # Print the backtrace and exit explicitly with our chosen status. 171 traceback.print_exc() 172 sys.exit(120) 173 174if __name__ == '__main__': 175 main() 176