1# Copyright 2019, The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" 16Atest test event handler class. 17""" 18 19from __future__ import print_function 20from collections import deque 21from datetime import timedelta 22import logging 23 24from test_runners import test_runner_base 25 26 27EVENT_NAMES = {'module_started': 'TEST_MODULE_STARTED', 28 'module_ended': 'TEST_MODULE_ENDED', 29 'run_started': 'TEST_RUN_STARTED', 30 'run_ended': 'TEST_RUN_ENDED', 31 # Next three are test-level events 32 'test_started': 'TEST_STARTED', 33 'test_failed': 'TEST_FAILED', 34 'test_ended': 'TEST_ENDED', 35 # Last two failures are runner-level, not test-level. 36 # Invocation failure is broader than run failure. 37 'run_failed': 'TEST_RUN_FAILED', 38 'invocation_failed': 'INVOCATION_FAILED', 39 'test_ignored': 'TEST_IGNORED', 40 'test_assumption_failure': 'TEST_ASSUMPTION_FAILURE', 41 'log_association': 'LOG_ASSOCIATION'} 42 43EVENT_PAIRS = {EVENT_NAMES['module_started']: EVENT_NAMES['module_ended'], 44 EVENT_NAMES['run_started']: EVENT_NAMES['run_ended'], 45 EVENT_NAMES['test_started']: EVENT_NAMES['test_ended']} 46START_EVENTS = list(EVENT_PAIRS.keys()) 47END_EVENTS = list(EVENT_PAIRS.values()) 48TEST_NAME_TEMPLATE = '%s#%s' 49EVENTS_NOT_BALANCED = ('Error: Saw %s Start event and %s End event. These ' 50 'should be equal!') 51 52# time in millisecond. 53ONE_SECOND = 1000 54ONE_MINUTE = 60000 55ONE_HOUR = 3600000 56 57CONNECTION_STATE = { 58 'current_test': None, 59 'last_failed': None, 60 'last_ignored': None, 61 'last_assumption_failed': None, 62 'current_group': None, 63 'current_group_total': None, 64 'test_count': 0, 65 'test_start_time': None} 66 67class EventHandleError(Exception): 68 """Raised when handle event error.""" 69 70class EventHandler(object): 71 """Test Event handle class.""" 72 73 event_stack = deque() 74 75 def __init__(self, reporter, name): 76 self.reporter = reporter 77 self.runner_name = name 78 self.state = CONNECTION_STATE.copy() 79 80 def _module_started(self, event_data): 81 self.state['current_group'] = event_data['moduleName'] 82 self.state['last_failed'] = None 83 self.state['current_test'] = None 84 85 def _run_started(self, event_data): 86 # Technically there can be more than one run per module. 87 self.state['current_group_total'] = event_data['testCount'] 88 self.state['test_count'] = 0 89 self.state['last_failed'] = None 90 self.state['current_test'] = None 91 92 def _test_started(self, event_data): 93 name = TEST_NAME_TEMPLATE % (event_data['className'], 94 event_data['testName']) 95 self.state['current_test'] = name 96 self.state['test_count'] += 1 97 self.state['test_start_time'] = event_data['start_time'] 98 99 def _test_failed(self, event_data): 100 self.state['last_failed'] = {'name': TEST_NAME_TEMPLATE % ( 101 event_data['className'], 102 event_data['testName']), 103 'trace': event_data['trace']} 104 105 def _test_ignored(self, event_data): 106 name = TEST_NAME_TEMPLATE % (event_data['className'], 107 event_data['testName']) 108 self.state['last_ignored'] = name 109 110 def _test_assumption_failure(self, event_data): 111 name = TEST_NAME_TEMPLATE % (event_data['className'], 112 event_data['testName']) 113 self.state['last_assumption_failed'] = name 114 115 def _run_failed(self, event_data): 116 # Module and Test Run probably started, but failure occurred. 117 self.reporter.process_test_result(test_runner_base.TestResult( 118 runner_name=self.runner_name, 119 group_name=self.state['current_group'], 120 test_name=self.state['current_test'], 121 status=test_runner_base.ERROR_STATUS, 122 details=event_data['reason'], 123 test_count=self.state['test_count'], 124 test_time='', 125 runner_total=None, 126 group_total=self.state['current_group_total'], 127 perf_info={})) 128 129 def _invocation_failed(self, event_data): 130 # Broadest possible failure. May not even start the module/test run. 131 self.reporter.process_test_result(test_runner_base.TestResult( 132 runner_name=self.runner_name, 133 group_name=self.state['current_group'], 134 test_name=self.state['current_test'], 135 status=test_runner_base.ERROR_STATUS, 136 details=event_data['cause'], 137 test_count=self.state['test_count'], 138 test_time='', 139 runner_total=None, 140 group_total=self.state['current_group_total'], 141 perf_info={})) 142 143 def _run_ended(self, event_data): 144 pass 145 146 def _module_ended(self, event_data): 147 pass 148 149 def _test_ended(self, event_data): 150 name = TEST_NAME_TEMPLATE % (event_data['className'], 151 event_data['testName']) 152 test_time = '' 153 if self.state['test_start_time']: 154 test_time = self._calc_duration(event_data['end_time'] - 155 self.state['test_start_time']) 156 if self.state['last_failed'] and name == self.state['last_failed']['name']: 157 status = test_runner_base.FAILED_STATUS 158 trace = self.state['last_failed']['trace'] 159 self.state['last_failed'] = None 160 elif (self.state['last_assumption_failed'] and 161 name == self.state['last_assumption_failed']): 162 status = test_runner_base.ASSUMPTION_FAILED 163 self.state['last_assumption_failed'] = None 164 trace = None 165 elif self.state['last_ignored'] and name == self.state['last_ignored']: 166 status = test_runner_base.IGNORED_STATUS 167 self.state['last_ignored'] = None 168 trace = None 169 else: 170 status = test_runner_base.PASSED_STATUS 171 trace = None 172 173 cpu_time = event_data.get('cpu_time', None) 174 real_time = event_data.get('real_time', None) 175 iterations = event_data.get('iterations', None) 176 perf_info = {} 177 if (cpu_time and real_time and iterations): 178 perf_info['cpu_time'] = cpu_time 179 perf_info['real_time'] = real_time 180 perf_info['iterations'] = iterations 181 182 self.reporter.process_test_result(test_runner_base.TestResult( 183 runner_name=self.runner_name, 184 group_name=self.state['current_group'], 185 test_name=name, 186 status=status, 187 details=trace, 188 test_count=self.state['test_count'], 189 test_time=test_time, 190 runner_total=None, 191 perf_info=perf_info, 192 group_total=self.state['current_group_total'])) 193 194 def _log_association(self, event_data): 195 pass 196 197 switch_handler = {EVENT_NAMES['module_started']: _module_started, 198 EVENT_NAMES['run_started']: _run_started, 199 EVENT_NAMES['test_started']: _test_started, 200 EVENT_NAMES['test_failed']: _test_failed, 201 EVENT_NAMES['test_ignored']: _test_ignored, 202 EVENT_NAMES['test_assumption_failure']: _test_assumption_failure, 203 EVENT_NAMES['run_failed']: _run_failed, 204 EVENT_NAMES['invocation_failed']: _invocation_failed, 205 EVENT_NAMES['test_ended']: _test_ended, 206 EVENT_NAMES['run_ended']: _run_ended, 207 EVENT_NAMES['module_ended']: _module_ended, 208 EVENT_NAMES['log_association']: _log_association} 209 210 def process_event(self, event_name, event_data): 211 """Process the events of the test run and call reporter with results. 212 213 Args: 214 event_name: A string of the event name. 215 event_data: A dict of event data. 216 """ 217 logging.debug('Processing %s %s', event_name, event_data) 218 if event_name in START_EVENTS: 219 self.event_stack.append(event_name) 220 elif event_name in END_EVENTS: 221 self._check_events_are_balanced(event_name, self.reporter) 222 if self.switch_handler.has_key(event_name): 223 self.switch_handler[event_name](self, event_data) 224 else: 225 # TODO(b/128875503): Implement the mechanism to inform not handled TF event. 226 logging.debug('Event[%s] is not processable.', event_name) 227 228 def _check_events_are_balanced(self, event_name, reporter): 229 """Check Start events and End events. They should be balanced. 230 231 If they are not balanced, print the error message in 232 state['last_failed'], then raise TradeFedExitError. 233 234 Args: 235 event_name: A string of the event name. 236 reporter: A ResultReporter instance. 237 Raises: 238 TradeFedExitError if we doesn't have a balance of START/END events. 239 """ 240 start_event = self.event_stack.pop() if self.event_stack else None 241 if not start_event or EVENT_PAIRS[start_event] != event_name: 242 # Here bubble up the failed trace in the situation having 243 # TEST_FAILED but never receiving TEST_ENDED. 244 if self.state['last_failed'] and (start_event == 245 EVENT_NAMES['test_started']): 246 reporter.process_test_result(test_runner_base.TestResult( 247 runner_name=self.runner_name, 248 group_name=self.state['current_group'], 249 test_name=self.state['last_failed']['name'], 250 status=test_runner_base.FAILED_STATUS, 251 details=self.state['last_failed']['trace'], 252 test_count=self.state['test_count'], 253 test_time='', 254 runner_total=None, 255 group_total=self.state['current_group_total'], 256 perf_info={})) 257 raise EventHandleError(EVENTS_NOT_BALANCED % (start_event, 258 event_name)) 259 260 @staticmethod 261 def _calc_duration(duration): 262 """Convert duration from ms to 3h2m43.034s. 263 264 Args: 265 duration: millisecond 266 267 Returns: 268 string in h:m:s, m:s, s or millis, depends on the duration. 269 """ 270 delta = timedelta(milliseconds=duration) 271 timestamp = str(delta).split(':') # hh:mm:microsec 272 273 if duration < ONE_SECOND: 274 return "({}ms)".format(duration) 275 elif duration < ONE_MINUTE: 276 return "({:.3f}s)".format(float(timestamp[2])) 277 elif duration < ONE_HOUR: 278 return "({0}m{1:.3f}s)".format(timestamp[1], float(timestamp[2])) 279 return "({0}h{1}m{2:.3f}s)".format(timestamp[0], 280 timestamp[1], float(timestamp[2])) 281