1# Copyright 2022 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import annotations 6 7import argparse 8import contextlib 9import logging 10import sys 11import traceback as tb 12from dataclasses import dataclass 13from types import TracebackType 14from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type 15 16from crossbench import helper 17from crossbench.types import JsonList 18 19if TYPE_CHECKING: 20 from crossbench.types import JsonDict 21 22TInfoStack = Tuple[str, ...] 23 24TExceptionTypes = Tuple[Type[BaseException], ...] 25 26 27@dataclass 28class Entry: 29 traceback: List[str] 30 exception: BaseException 31 info_stack: TInfoStack 32 33 34class MultiException(ValueError): 35 """Default exception thrown by ExceptionAnnotator.assert_success. 36 It holds on to the ExceptionAnnotator and its previously captured exceptions 37 are automatically added to active ExceptionAnnotator in an 38 ExceptionAnnotationScope.""" 39 40 def __init__(self, message: str, exceptions: ExceptionAnnotator): 41 super().__init__(message) 42 self.exceptions = exceptions 43 44 def __len__(self) -> int: 45 return len(self.exceptions) 46 47 def matching(self, *args: Type[BaseException]) -> List[BaseException]: 48 return self.exceptions.matching(*args) 49 50 @property 51 def annotator(self) -> ExceptionAnnotator: 52 return self.exceptions 53 54 55class ExceptionAnnotationScope: 56 """Used in a with-scope to annotate exceptions with a TInfoStack. 57 58 Used via the capture/annotate/info helper methods on 59 ExceptionAnnotator. 60 """ 61 62 def __init__( 63 self, 64 annotator: ExceptionAnnotator, 65 exception_types: TExceptionTypes, 66 ignore_exception_types: TExceptionTypes, 67 entries: Tuple[str, ...], 68 throw_cls: Optional[Type[BaseException]] = None, 69 ) -> None: 70 logging.debug("ExceptionAnnotationScope: %s", entries) 71 self._annotator = annotator 72 self._exception_types = exception_types 73 self._ignore_exception_types = ignore_exception_types + ( 74 StopIteration, GeneratorExit, StopAsyncIteration) 75 self._ignore_exception_types = ignore_exception_types 76 self._added_info_stack_entries = entries 77 self._throw_cls: Optional[Type[BaseException]] = throw_cls 78 self._previous_info_stack: TInfoStack = () 79 80 def __enter__(self) -> ExceptionAnnotationScope: 81 self._annotator._pending_exceptions.clear() 82 self._previous_info_stack = self._annotator.info_stack 83 self._annotator._info_stack = self._previous_info_stack + ( 84 self._added_info_stack_entries) 85 return self 86 87 def __exit__(self, exception_type: Optional[Type[BaseException]], 88 exception_value: Optional[BaseException], 89 traceback: Optional[TracebackType]) -> bool: 90 if not exception_value or not exception_type: 91 self._annotator._info_stack = self._previous_info_stack 92 # False => exception not handled 93 return False 94 if issubclass(exception_type, self._ignore_exception_types) and ( 95 not issubclass(exception_type, MultiException)): 96 self._annotator._info_stack = self._previous_info_stack 97 # False => exception not handled, directly forward 98 return False 99 logging.debug("Intermediate Exception: %s:%s", exception_type, 100 exception_value) 101 if self._exception_types and exception_type and ( 102 issubclass(exception_type, MultiException) or 103 issubclass(exception_type, self._exception_types)): 104 # Handle matching exceptions directly here and prevent further 105 # exception handling by returning True. 106 self._annotator.append(exception_value) 107 self._annotator._info_stack = self._previous_info_stack 108 if self._throw_cls: 109 self._annotator.assert_success( 110 exception_cls=self._throw_cls, 111 log=False, 112 ) 113 return True 114 if exception_value not in self._annotator._pending_exceptions: 115 self._annotator._pending_exceptions[ 116 exception_value] = self._annotator.info_stack 117 # False => exception not handled 118 return False 119 120class ExceptionAnnotator: 121 """Collects exceptions with full backtraces and user-provided info stacks. 122 123 Additional stack information is constructed from active 124 ExceptionAnnotationScopes. 125 """ 126 127 def __init__(self, 128 throw: bool = False, 129 throw_cls: Optional[Type[BaseException]] = None) -> None: 130 self._exceptions: List[Entry] = [] 131 self.throw: bool = throw 132 self._throw_cls: Optional[Type[BaseException]] = throw_cls 133 # The info_stack adds additional meta information to handle exceptions. 134 # Unlike the source-based backtrace, this can contain dynamic information 135 # for easier debugging. 136 self._info_stack: TInfoStack = () 137 # Associates raised exception with the info_stack at that time for later 138 # use in the `handle` method. 139 # This is cleared whenever we enter a new ExceptionAnnotationScope. 140 self._pending_exceptions: Dict[BaseException, TInfoStack] = {} 141 142 @property 143 def is_success(self) -> bool: 144 return len(self._exceptions) == 0 145 146 @property 147 def info_stack(self) -> TInfoStack: 148 return self._info_stack 149 150 @property 151 def exceptions(self) -> List[Entry]: 152 return self._exceptions 153 154 def __getitem__(self, key: Any) -> Entry: 155 if not isinstance(key, int): 156 raise TypeError(f"Expected int key, but got: {key}") 157 return self._exceptions[key] 158 159 def __len__(self) -> int: 160 return len(self._exceptions) 161 162 def matching(self, *args: Type[BaseException]) -> List[BaseException]: 163 result = [] 164 for entry in self._exceptions: 165 exception = entry.exception 166 if isinstance(exception, *args): 167 result.append(exception) 168 return result 169 170 def assert_success(self, 171 message: Optional[str] = None, 172 exception_cls: Type[BaseException] = MultiException, 173 log: bool = True) -> None: 174 if self.is_success: 175 return 176 if log: 177 self.log() 178 if message is None: 179 message = "{}" 180 message = message.format(self) 181 if issubclass(exception_cls, MultiException): 182 exception = exception_cls(message, self) 183 raise exception 184 raise exception_cls(message) 185 186 def info(self, *stack_entries: str) -> ExceptionAnnotationScope: 187 """Only sets info stack entries, exceptions are passed-through.""" 188 return ExceptionAnnotationScope(self, tuple(), tuple(), stack_entries) 189 190 def capture( 191 self, 192 *stack_entries: str, 193 exceptions: TExceptionTypes = (Exception,), 194 ignore: TExceptionTypes = tuple(), 195 ) -> ExceptionAnnotationScope: 196 """Sets info stack entries and captures exceptions. 197 - Does not rethrow captured exceptions 198 - Does not directly throw a MultiExceptions, unless assert_success() 199 is called. """ 200 return ExceptionAnnotationScope(self, exceptions, ignore, stack_entries, 201 self._throw_cls) 202 203 @contextlib.contextmanager 204 def annotate(self, 205 *stack_entries, 206 exceptions: TExceptionTypes = (Exception,), 207 ignore: TExceptionTypes = tuple()): 208 """Sets info stack entries and rethrows an annotated 209 MultiException by default .""" 210 with self.capture(*stack_entries, exceptions=exceptions, ignore=ignore): 211 yield self 212 self.assert_success() 213 214 def extend(self, annotator: ExceptionAnnotator, 215 is_nested: bool = False) -> None: 216 if is_nested: 217 self._extend_with_prepended_stack_info(annotator) 218 else: 219 self._exceptions.extend(annotator.exceptions) 220 221 def _extend_with_prepended_stack_info(self, 222 annotator: ExceptionAnnotator) -> None: 223 if annotator == self: 224 return 225 for entry in annotator.exceptions: 226 merged_info_stack = self.info_stack + entry.info_stack 227 merged_entry = Entry(entry.traceback, entry.exception, merged_info_stack) 228 self._exceptions.append(merged_entry) 229 230 def append(self, exception: BaseException) -> None: 231 traceback_str = tb.format_exc() 232 logging.debug("Intermediate Exception %s:%s", type(exception), exception) 233 logging.debug(traceback_str) 234 traceback: List[str] = traceback_str.splitlines() 235 if isinstance(exception, KeyboardInterrupt): 236 # Fast exit on KeyboardInterrupts for a better user experience. 237 sys.exit(0) 238 if isinstance(exception, MultiException): 239 # Directly add exceptions from nested annotators. 240 self.extend(exception.exceptions, is_nested=True) 241 else: 242 stack = self.info_stack 243 if exception in self._pending_exceptions: 244 stack = self._pending_exceptions[exception] 245 self._exceptions.append(Entry(traceback, exception, stack)) 246 if self.throw: 247 raise # pylint: disable=misplaced-bare-raise 248 249 def log(self) -> None: 250 if self.is_success: 251 return 252 logging.error("=" * 80) 253 logging.error("ERRORS occurred (1/%d):", len(self._exceptions)) 254 logging.error("=" * 80) 255 for entry in self._exceptions: 256 logging.debug(entry.exception) 257 logging.debug("\n".join(entry.traceback)) 258 logging.debug("-" * 80) 259 is_first_entry = True 260 grouped_entries: Dict[TInfoStack, List[Entry]] = helper.group_by( 261 self._exceptions, key=lambda entry: entry.info_stack, sort_key=None) 262 for info_stack, entries in grouped_entries.items(): 263 logging_level = logging.ERROR if is_first_entry else logging.DEBUG 264 is_first_entry = False 265 if info_stack: 266 info = "Info: " 267 joiner = "\n" + (" " * (len(info) - 2)) + "> " 268 message = f"{info}{joiner.join(info_stack)}" 269 logging.log(logging_level, message) 270 for entry in entries: 271 logging.log(logging_level, "- " * 40) 272 logging.log(logging_level, "Type: %s:", 273 helper.type_name(type(entry.exception))) 274 logging.log(logging_level, " %s", self.format_exception(entry)) 275 logging_level = logging.DEBUG 276 logging.log(logging_level, "-" * 80) 277 278 def error_messages(self) -> List[str]: 279 return [self.format_exception(entry) for entry in self._exceptions] 280 281 def to_json(self) -> JsonList: 282 return [{ 283 "info_stack": entry.info_stack, 284 "type": helper.type_name(type(entry.exception)), 285 "title": self.format_exception(entry), 286 "trace": entry.traceback 287 } for entry in self._exceptions] 288 289 def format_exception(self, entry: Entry) -> str: 290 msg = str(entry.exception).strip() 291 # Try to print the source line for empty AssertionError 292 if not msg and isinstance(entry.exception, AssertionError): 293 return entry.traceback[-2].strip() 294 return msg 295 296 def __str__(self) -> str: 297 if len(self._exceptions) == 1: 298 entry = self._exceptions[0] 299 stack = "\n\t".join(entry.info_stack) 300 return f"{stack}: {entry.exception}" 301 302 return "\n".join( 303 f"{entry.info_stack}: {entry.exception}" for entry in self._exceptions) 304 305 306# Expose simpler name 307Annotator = ExceptionAnnotator 308 309def annotate( 310 *stack_entries: str, 311 exceptions: TExceptionTypes = (Exception,), 312 ignore: TExceptionTypes = tuple(), 313 throw_cls: Optional[Type[BaseException]] = MultiException 314) -> ExceptionAnnotationScope: 315 """Use to annotate an exception. 316 By default this will throw a MultiException which can keep track of 317 more annotations.""" 318 return ExceptionAnnotator(throw_cls=throw_cls).capture( 319 *stack_entries, exceptions=exceptions, ignore=ignore) 320 321 322class ArgumentTypeMultiException(MultiException, argparse.ArgumentTypeError): 323 pass 324 325 326def annotate_argparsing(*stack_entries: str, 327 exceptions: TExceptionTypes = (Exception,)): 328 """Use this to annotate argument parsing-related code blocks to get more 329 readable annotated exception back. 330 - Wraps multiple exception in an ArgumentTypeMultiException 331 - Single ArgumentTypeError are raised directly 332 """ 333 return annotate( 334 *stack_entries, 335 exceptions=exceptions, 336 throw_cls=ArgumentTypeMultiException) 337 338 339class UnreachableError(RuntimeError): 340 """Used for making checker tools happy in places where it's not directly 341 obvious that we always return, for instance due to using one of the above 342 exception annotations that could in theory mute exceptions and create an 343 additional return path. 344 """ 345 346 def __init__(self) -> None: 347 super().__init__("Unreachable Code") 348