• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2
3"""Analyze the test outcomes from a full CI run.
4
5This script can also run on outcomes from a partial run, but the results are
6less likely to be useful.
7"""
8
9import argparse
10import sys
11import traceback
12import re
13
14import check_test_cases
15
16class Results:
17    """Process analysis results."""
18
19    def __init__(self):
20        self.error_count = 0
21        self.warning_count = 0
22
23    @staticmethod
24    def log(fmt, *args, **kwargs):
25        sys.stderr.write((fmt + '\n').format(*args, **kwargs))
26
27    def error(self, fmt, *args, **kwargs):
28        self.log('Error: ' + fmt, *args, **kwargs)
29        self.error_count += 1
30
31    def warning(self, fmt, *args, **kwargs):
32        self.log('Warning: ' + fmt, *args, **kwargs)
33        self.warning_count += 1
34
35class TestCaseOutcomes:
36    """The outcomes of one test case across many configurations."""
37    # pylint: disable=too-few-public-methods
38
39    def __init__(self):
40        # Collect a list of witnesses of the test case succeeding or failing.
41        # Currently we don't do anything with witnesses except count them.
42        # The format of a witness is determined by the read_outcome_file
43        # function; it's the platform and configuration joined by ';'.
44        self.successes = []
45        self.failures = []
46
47    def hits(self):
48        """Return the number of times a test case has been run.
49
50        This includes passes and failures, but not skips.
51        """
52        return len(self.successes) + len(self.failures)
53
54def analyze_coverage(results, outcomes):
55    """Check that all available test cases are executed at least once."""
56    available = check_test_cases.collect_available_test_cases()
57    for key in available:
58        hits = outcomes[key].hits() if key in outcomes else 0
59        if hits == 0:
60            # Make this a warning, not an error, as long as we haven't
61            # fixed this branch to have full coverage of test cases.
62            results.warning('Test case not executed: {}', key)
63
64def analyze_driver_vs_reference(outcomes, component_ref, component_driver, ignored_tests):
65    """Check that all tests executed in the reference component are also
66    executed in the corresponding driver component.
67    Skip test suites provided in ignored_tests list.
68    """
69    available = check_test_cases.collect_available_test_cases()
70    result = True
71
72    for key in available:
73        # Skip ignored test suites
74        test_suite = key.split(';')[0] # retrieve test suit name
75        test_suite = test_suite.split('.')[0] # retrieve main part of test suit name
76        if test_suite in ignored_tests:
77            continue
78        # Continue if test was not executed by any component
79        hits = outcomes[key].hits() if key in outcomes else 0
80        if hits == 0:
81            continue
82        # Search for tests that run in reference component and not in driver component
83        driver_test_passed = False
84        reference_test_passed = False
85        for entry in outcomes[key].successes:
86            if component_driver in entry:
87                driver_test_passed = True
88            if component_ref in entry:
89                reference_test_passed = True
90        if(driver_test_passed is False and reference_test_passed is True):
91            print('{}: driver: skipped/failed; reference: passed'.format(key))
92            result = False
93    return result
94
95def analyze_outcomes(outcomes):
96    """Run all analyses on the given outcome collection."""
97    results = Results()
98    analyze_coverage(results, outcomes)
99    return results
100
101def read_outcome_file(outcome_file):
102    """Parse an outcome file and return an outcome collection.
103
104An outcome collection is a dictionary mapping keys to TestCaseOutcomes objects.
105The keys are the test suite name and the test case description, separated
106by a semicolon.
107"""
108    outcomes = {}
109    with open(outcome_file, 'r', encoding='utf-8') as input_file:
110        for line in input_file:
111            (platform, config, suite, case, result, _cause) = line.split(';')
112            key = ';'.join([suite, case])
113            setup = ';'.join([platform, config])
114            if key not in outcomes:
115                outcomes[key] = TestCaseOutcomes()
116            if result == 'PASS':
117                outcomes[key].successes.append(setup)
118            elif result == 'FAIL':
119                outcomes[key].failures.append(setup)
120    return outcomes
121
122def do_analyze_coverage(outcome_file, args):
123    """Perform coverage analysis."""
124    del args # unused
125    outcomes = read_outcome_file(outcome_file)
126    results = analyze_outcomes(outcomes)
127    return results.error_count == 0
128
129def do_analyze_driver_vs_reference(outcome_file, args):
130    """Perform driver vs reference analyze."""
131    ignored_tests = ['test_suite_' + x for x in args['ignored_suites']]
132
133    outcomes = read_outcome_file(outcome_file)
134    return analyze_driver_vs_reference(outcomes, args['component_ref'],
135                                       args['component_driver'], ignored_tests)
136
137# List of tasks with a function that can handle this task and additional arguments if required
138TASKS = {
139    'analyze_coverage':                 {
140        'test_function': do_analyze_coverage,
141        'args': {}},
142    'analyze_driver_vs_reference_hash': {
143        'test_function': do_analyze_driver_vs_reference,
144        'args': {
145            'component_ref': 'test_psa_crypto_config_reference_hash_use_psa',
146            'component_driver': 'test_psa_crypto_config_accel_hash_use_psa',
147            'ignored_suites': ['shax', 'mdx', # the software implementations that are being excluded
148                               'md',  # the legacy abstraction layer that's being excluded
149                              ]}}
150}
151
152def main():
153    try:
154        parser = argparse.ArgumentParser(description=__doc__)
155        parser.add_argument('outcomes', metavar='OUTCOMES.CSV',
156                            help='Outcome file to analyze')
157        parser.add_argument('task', default='all', nargs='?',
158                            help='Analysis to be done. By default, run all tasks. '
159                                 'With one or more TASK, run only those. '
160                                 'TASK can be the name of a single task or '
161                                 'comma/space-separated list of tasks. ')
162        parser.add_argument('--list', action='store_true',
163                            help='List all available tasks and exit.')
164        options = parser.parse_args()
165
166        if options.list:
167            for task in TASKS:
168                print(task)
169            sys.exit(0)
170
171        result = True
172
173        if options.task == 'all':
174            tasks = TASKS.keys()
175        else:
176            tasks = re.split(r'[, ]+', options.task)
177
178            for task in tasks:
179                if task not in TASKS:
180                    print('Error: invalid task: {}'.format(task))
181                    sys.exit(1)
182
183        for task in TASKS:
184            if task in tasks:
185                if not TASKS[task]['test_function'](options.outcomes, TASKS[task]['args']):
186                    result = False
187
188        if result is False:
189            sys.exit(1)
190        print("SUCCESS :-)")
191    except Exception: # pylint: disable=broad-except
192        # Print the backtrace and exit explicitly with our chosen status.
193        traceback.print_exc()
194        sys.exit(120)
195
196if __name__ == '__main__':
197    main()
198