• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import argparse
8import contextlib
9import logging
10import sys
11import traceback as tb
12from dataclasses import dataclass
13from types import TracebackType
14from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type
15
16from crossbench import helper
17from crossbench.types import JsonList
18
19if TYPE_CHECKING:
20  from crossbench.types import JsonDict
21
22TInfoStack = Tuple[str, ...]
23
24TExceptionTypes = Tuple[Type[BaseException], ...]
25
26
27@dataclass
28class Entry:
29  traceback: List[str]
30  exception: BaseException
31  info_stack: TInfoStack
32
33
34class MultiException(ValueError):
35  """Default exception thrown by ExceptionAnnotator.assert_success.
36  It holds on to the ExceptionAnnotator and its previously captured exceptions
37  are automatically added to active ExceptionAnnotator in an
38  ExceptionAnnotationScope."""
39
40  def __init__(self, message: str, exceptions: ExceptionAnnotator):
41    super().__init__(message)
42    self.exceptions = exceptions
43
44  def __len__(self) -> int:
45    return len(self.exceptions)
46
47  def matching(self, *args: Type[BaseException]) -> List[BaseException]:
48    return self.exceptions.matching(*args)
49
50  @property
51  def annotator(self) -> ExceptionAnnotator:
52    return self.exceptions
53
54
55class ExceptionAnnotationScope:
56  """Used in a with-scope to annotate exceptions with a TInfoStack.
57
58  Used via the capture/annotate/info helper methods on
59  ExceptionAnnotator.
60  """
61
62  def __init__(
63      self,
64      annotator: ExceptionAnnotator,
65      exception_types: TExceptionTypes,
66      ignore_exception_types: TExceptionTypes,
67      entries: Tuple[str, ...],
68      throw_cls: Optional[Type[BaseException]] = None,
69  ) -> None:
70    logging.debug("ExceptionAnnotationScope: %s", entries)
71    self._annotator = annotator
72    self._exception_types = exception_types
73    self._ignore_exception_types = ignore_exception_types + (
74        StopIteration, GeneratorExit, StopAsyncIteration)
75    self._ignore_exception_types = ignore_exception_types
76    self._added_info_stack_entries = entries
77    self._throw_cls: Optional[Type[BaseException]] = throw_cls
78    self._previous_info_stack: TInfoStack = ()
79
80  def __enter__(self) -> ExceptionAnnotationScope:
81    self._annotator._pending_exceptions.clear()
82    self._previous_info_stack = self._annotator.info_stack
83    self._annotator._info_stack = self._previous_info_stack + (
84        self._added_info_stack_entries)
85    return self
86
87  def __exit__(self, exception_type: Optional[Type[BaseException]],
88               exception_value: Optional[BaseException],
89               traceback: Optional[TracebackType]) -> bool:
90    if not exception_value or not exception_type:
91      self._annotator._info_stack = self._previous_info_stack
92      # False => exception not handled
93      return False
94    if issubclass(exception_type, self._ignore_exception_types) and (
95        not issubclass(exception_type, MultiException)):
96      self._annotator._info_stack = self._previous_info_stack
97      # False => exception not handled, directly forward
98      return False
99    logging.debug("Intermediate Exception: %s:%s", exception_type,
100                  exception_value)
101    if self._exception_types and exception_type and (
102        issubclass(exception_type, MultiException) or
103        issubclass(exception_type, self._exception_types)):
104      # Handle matching exceptions directly here and prevent further
105      # exception handling by returning True.
106      self._annotator.append(exception_value)
107      self._annotator._info_stack = self._previous_info_stack
108      if self._throw_cls:
109        self._annotator.assert_success(
110            exception_cls=self._throw_cls,
111            log=False,
112        )
113      return True
114    if exception_value not in self._annotator._pending_exceptions:
115      self._annotator._pending_exceptions[
116          exception_value] = self._annotator.info_stack
117    # False => exception not handled
118    return False
119
120class ExceptionAnnotator:
121  """Collects exceptions with full backtraces and user-provided info stacks.
122
123  Additional stack information is constructed from active
124  ExceptionAnnotationScopes.
125  """
126
127  def __init__(self,
128               throw: bool = False,
129               throw_cls: Optional[Type[BaseException]] = None) -> None:
130    self._exceptions: List[Entry] = []
131    self.throw: bool = throw
132    self._throw_cls: Optional[Type[BaseException]] = throw_cls
133    # The info_stack adds additional meta information to handle exceptions.
134    # Unlike the source-based backtrace, this can contain dynamic information
135    # for easier debugging.
136    self._info_stack: TInfoStack = ()
137    # Associates raised exception with the info_stack at that time for later
138    # use in the `handle` method.
139    # This is cleared whenever we enter a  new ExceptionAnnotationScope.
140    self._pending_exceptions: Dict[BaseException, TInfoStack] = {}
141
142  @property
143  def is_success(self) -> bool:
144    return len(self._exceptions) == 0
145
146  @property
147  def info_stack(self) -> TInfoStack:
148    return self._info_stack
149
150  @property
151  def exceptions(self) -> List[Entry]:
152    return self._exceptions
153
154  def __getitem__(self, key: Any) -> Entry:
155    if not isinstance(key, int):
156      raise TypeError(f"Expected int key, but got: {key}")
157    return self._exceptions[key]
158
159  def __len__(self) -> int:
160    return len(self._exceptions)
161
162  def matching(self, *args: Type[BaseException]) -> List[BaseException]:
163    result = []
164    for entry in self._exceptions:
165      exception = entry.exception
166      if isinstance(exception, *args):
167        result.append(exception)
168    return result
169
170  def assert_success(self,
171                     message: Optional[str] = None,
172                     exception_cls: Type[BaseException] = MultiException,
173                     log: bool = True) -> None:
174    if self.is_success:
175      return
176    if log:
177      self.log()
178    if message is None:
179      message = "{}"
180    message = message.format(self)
181    if issubclass(exception_cls, MultiException):
182      exception = exception_cls(message, self)
183      raise exception
184    raise exception_cls(message)
185
186  def info(self, *stack_entries: str) -> ExceptionAnnotationScope:
187    """Only sets info stack entries, exceptions are passed-through."""
188    return ExceptionAnnotationScope(self, tuple(), tuple(), stack_entries)
189
190  def capture(
191      self,
192      *stack_entries: str,
193      exceptions: TExceptionTypes = (Exception,),
194      ignore: TExceptionTypes = tuple(),
195  ) -> ExceptionAnnotationScope:
196    """Sets info stack entries and captures exceptions.
197    - Does not rethrow captured exceptions
198    - Does not directly throw a MultiExceptions, unless assert_success()
199      is called. """
200    return ExceptionAnnotationScope(self, exceptions, ignore, stack_entries,
201                                    self._throw_cls)
202
203  @contextlib.contextmanager
204  def annotate(self,
205               *stack_entries,
206               exceptions: TExceptionTypes = (Exception,),
207               ignore: TExceptionTypes = tuple()):
208    """Sets info stack entries and rethrows an annotated
209      MultiException by default ."""
210    with self.capture(*stack_entries, exceptions=exceptions, ignore=ignore):
211      yield self
212    self.assert_success()
213
214  def extend(self, annotator: ExceptionAnnotator,
215             is_nested: bool = False) -> None:
216    if is_nested:
217      self._extend_with_prepended_stack_info(annotator)
218    else:
219      self._exceptions.extend(annotator.exceptions)
220
221  def _extend_with_prepended_stack_info(self,
222                                        annotator: ExceptionAnnotator) -> None:
223    if annotator == self:
224      return
225    for entry in annotator.exceptions:
226      merged_info_stack = self.info_stack + entry.info_stack
227      merged_entry = Entry(entry.traceback, entry.exception, merged_info_stack)
228      self._exceptions.append(merged_entry)
229
230  def append(self, exception: BaseException) -> None:
231    traceback_str = tb.format_exc()
232    logging.debug("Intermediate Exception %s:%s", type(exception), exception)
233    logging.debug(traceback_str)
234    traceback: List[str] = traceback_str.splitlines()
235    if isinstance(exception, KeyboardInterrupt):
236      # Fast exit on KeyboardInterrupts for a better user experience.
237      sys.exit(0)
238    if isinstance(exception, MultiException):
239      # Directly add exceptions from nested annotators.
240      self.extend(exception.exceptions, is_nested=True)
241    else:
242      stack = self.info_stack
243      if exception in self._pending_exceptions:
244        stack = self._pending_exceptions[exception]
245      self._exceptions.append(Entry(traceback, exception, stack))
246    if self.throw:
247      raise  # pylint: disable=misplaced-bare-raise
248
249  def log(self) -> None:
250    if self.is_success:
251      return
252    logging.error("=" * 80)
253    logging.error("ERRORS occurred (1/%d):", len(self._exceptions))
254    logging.error("=" * 80)
255    for entry in self._exceptions:
256      logging.debug(entry.exception)
257      logging.debug("\n".join(entry.traceback))
258      logging.debug("-" * 80)
259    is_first_entry = True
260    grouped_entries: Dict[TInfoStack, List[Entry]] = helper.group_by(
261        self._exceptions, key=lambda entry: entry.info_stack, sort_key=None)
262    for info_stack, entries in grouped_entries.items():
263      logging_level = logging.ERROR if is_first_entry else logging.DEBUG
264      is_first_entry = False
265      if info_stack:
266        info = "Info: "
267        joiner = "\n" + (" " * (len(info) - 2)) + "> "
268        message = f"{info}{joiner.join(info_stack)}"
269        logging.log(logging_level, message)
270      for entry in entries:
271        logging.log(logging_level, "- " * 40)
272        logging.log(logging_level, "Type: %s:",
273                    helper.type_name(type(entry.exception)))
274        logging.log(logging_level, "      %s", self.format_exception(entry))
275        logging_level = logging.DEBUG
276      logging.log(logging_level, "-" * 80)
277
278  def error_messages(self) -> List[str]:
279    return [self.format_exception(entry) for entry in self._exceptions]
280
281  def to_json(self) -> JsonList:
282    return [{
283        "info_stack": entry.info_stack,
284        "type": helper.type_name(type(entry.exception)),
285        "title": self.format_exception(entry),
286        "trace": entry.traceback
287    } for entry in self._exceptions]
288
289  def format_exception(self, entry: Entry) -> str:
290    msg = str(entry.exception).strip()
291    # Try to print the source line for empty AssertionError
292    if not msg and isinstance(entry.exception, AssertionError):
293      return entry.traceback[-2].strip()
294    return msg
295
296  def __str__(self) -> str:
297    if len(self._exceptions) == 1:
298      entry = self._exceptions[0]
299      stack = "\n\t".join(entry.info_stack)
300      return f"{stack}: {entry.exception}"
301
302    return "\n".join(
303        f"{entry.info_stack}: {entry.exception}" for entry in self._exceptions)
304
305
306# Expose simpler name
307Annotator = ExceptionAnnotator
308
309def annotate(
310    *stack_entries: str,
311    exceptions: TExceptionTypes = (Exception,),
312    ignore: TExceptionTypes = tuple(),
313    throw_cls: Optional[Type[BaseException]] = MultiException
314) -> ExceptionAnnotationScope:
315  """Use to annotate an exception.
316  By default this will throw a MultiException which can keep track of
317  more annotations."""
318  return ExceptionAnnotator(throw_cls=throw_cls).capture(
319      *stack_entries, exceptions=exceptions, ignore=ignore)
320
321
322class ArgumentTypeMultiException(MultiException, argparse.ArgumentTypeError):
323  pass
324
325
326def annotate_argparsing(*stack_entries: str,
327                        exceptions: TExceptionTypes = (Exception,)):
328  """Use this to annotate argument parsing-related code blocks to get more
329  readable annotated exception back.
330  - Wraps multiple exception in an ArgumentTypeMultiException
331  - Single ArgumentTypeError are raised directly
332  """
333  return annotate(
334      *stack_entries,
335      exceptions=exceptions,
336      throw_cls=ArgumentTypeMultiException)
337
338
339class UnreachableError(RuntimeError):
340  """Used for making checker tools happy in places where it's not directly
341  obvious that we always return, for instance due to using one of the above
342  exception annotations that could in theory mute exceptions and create an
343  additional return path.
344  """
345
346  def __init__(self) -> None:
347    super().__init__("Unreachable Code")
348