1#!/usr/bin/env python3 2 3"""Analyze the test outcomes from a full CI run. 4 5This script can also run on outcomes from a partial run, but the results are 6less likely to be useful. 7""" 8 9import argparse 10import sys 11import traceback 12import re 13 14import check_test_cases 15 16class Results: 17 """Process analysis results.""" 18 19 def __init__(self): 20 self.error_count = 0 21 self.warning_count = 0 22 23 @staticmethod 24 def log(fmt, *args, **kwargs): 25 sys.stderr.write((fmt + '\n').format(*args, **kwargs)) 26 27 def error(self, fmt, *args, **kwargs): 28 self.log('Error: ' + fmt, *args, **kwargs) 29 self.error_count += 1 30 31 def warning(self, fmt, *args, **kwargs): 32 self.log('Warning: ' + fmt, *args, **kwargs) 33 self.warning_count += 1 34 35class TestCaseOutcomes: 36 """The outcomes of one test case across many configurations.""" 37 # pylint: disable=too-few-public-methods 38 39 def __init__(self): 40 # Collect a list of witnesses of the test case succeeding or failing. 41 # Currently we don't do anything with witnesses except count them. 42 # The format of a witness is determined by the read_outcome_file 43 # function; it's the platform and configuration joined by ';'. 44 self.successes = [] 45 self.failures = [] 46 47 def hits(self): 48 """Return the number of times a test case has been run. 49 50 This includes passes and failures, but not skips. 51 """ 52 return len(self.successes) + len(self.failures) 53 54def analyze_coverage(results, outcomes): 55 """Check that all available test cases are executed at least once.""" 56 available = check_test_cases.collect_available_test_cases() 57 for key in available: 58 hits = outcomes[key].hits() if key in outcomes else 0 59 if hits == 0: 60 # Make this a warning, not an error, as long as we haven't 61 # fixed this branch to have full coverage of test cases. 62 results.warning('Test case not executed: {}', key) 63 64def analyze_driver_vs_reference(outcomes, component_ref, component_driver, ignored_tests): 65 """Check that all tests executed in the reference component are also 66 executed in the corresponding driver component. 67 Skip test suites provided in ignored_tests list. 68 """ 69 available = check_test_cases.collect_available_test_cases() 70 result = True 71 72 for key in available: 73 # Skip ignored test suites 74 test_suite = key.split(';')[0] # retrieve test suit name 75 test_suite = test_suite.split('.')[0] # retrieve main part of test suit name 76 if test_suite in ignored_tests: 77 continue 78 # Continue if test was not executed by any component 79 hits = outcomes[key].hits() if key in outcomes else 0 80 if hits == 0: 81 continue 82 # Search for tests that run in reference component and not in driver component 83 driver_test_passed = False 84 reference_test_passed = False 85 for entry in outcomes[key].successes: 86 if component_driver in entry: 87 driver_test_passed = True 88 if component_ref in entry: 89 reference_test_passed = True 90 if(driver_test_passed is False and reference_test_passed is True): 91 print('{}: driver: skipped/failed; reference: passed'.format(key)) 92 result = False 93 return result 94 95def analyze_outcomes(outcomes): 96 """Run all analyses on the given outcome collection.""" 97 results = Results() 98 analyze_coverage(results, outcomes) 99 return results 100 101def read_outcome_file(outcome_file): 102 """Parse an outcome file and return an outcome collection. 103 104An outcome collection is a dictionary mapping keys to TestCaseOutcomes objects. 105The keys are the test suite name and the test case description, separated 106by a semicolon. 107""" 108 outcomes = {} 109 with open(outcome_file, 'r', encoding='utf-8') as input_file: 110 for line in input_file: 111 (platform, config, suite, case, result, _cause) = line.split(';') 112 key = ';'.join([suite, case]) 113 setup = ';'.join([platform, config]) 114 if key not in outcomes: 115 outcomes[key] = TestCaseOutcomes() 116 if result == 'PASS': 117 outcomes[key].successes.append(setup) 118 elif result == 'FAIL': 119 outcomes[key].failures.append(setup) 120 return outcomes 121 122def do_analyze_coverage(outcome_file, args): 123 """Perform coverage analysis.""" 124 del args # unused 125 outcomes = read_outcome_file(outcome_file) 126 results = analyze_outcomes(outcomes) 127 return results.error_count == 0 128 129def do_analyze_driver_vs_reference(outcome_file, args): 130 """Perform driver vs reference analyze.""" 131 ignored_tests = ['test_suite_' + x for x in args['ignored_suites']] 132 133 outcomes = read_outcome_file(outcome_file) 134 return analyze_driver_vs_reference(outcomes, args['component_ref'], 135 args['component_driver'], ignored_tests) 136 137# List of tasks with a function that can handle this task and additional arguments if required 138TASKS = { 139 'analyze_coverage': { 140 'test_function': do_analyze_coverage, 141 'args': {}}, 142 'analyze_driver_vs_reference_hash': { 143 'test_function': do_analyze_driver_vs_reference, 144 'args': { 145 'component_ref': 'test_psa_crypto_config_reference_hash_use_psa', 146 'component_driver': 'test_psa_crypto_config_accel_hash_use_psa', 147 'ignored_suites': ['shax', 'mdx', # the software implementations that are being excluded 148 'md', # the legacy abstraction layer that's being excluded 149 ]}} 150} 151 152def main(): 153 try: 154 parser = argparse.ArgumentParser(description=__doc__) 155 parser.add_argument('outcomes', metavar='OUTCOMES.CSV', 156 help='Outcome file to analyze') 157 parser.add_argument('task', default='all', nargs='?', 158 help='Analysis to be done. By default, run all tasks. ' 159 'With one or more TASK, run only those. ' 160 'TASK can be the name of a single task or ' 161 'comma/space-separated list of tasks. ') 162 parser.add_argument('--list', action='store_true', 163 help='List all available tasks and exit.') 164 options = parser.parse_args() 165 166 if options.list: 167 for task in TASKS: 168 print(task) 169 sys.exit(0) 170 171 result = True 172 173 if options.task == 'all': 174 tasks = TASKS.keys() 175 else: 176 tasks = re.split(r'[, ]+', options.task) 177 178 for task in tasks: 179 if task not in TASKS: 180 print('Error: invalid task: {}'.format(task)) 181 sys.exit(1) 182 183 for task in TASKS: 184 if task in tasks: 185 if not TASKS[task]['test_function'](options.outcomes, TASKS[task]['args']): 186 result = False 187 188 if result is False: 189 sys.exit(1) 190 print("SUCCESS :-)") 191 except Exception: # pylint: disable=broad-except 192 # Print the backtrace and exit explicitly with our chosen status. 193 traceback.print_exc() 194 sys.exit(120) 195 196if __name__ == '__main__': 197 main() 198