• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import argparse
6import unittest
7from contextlib import contextmanager
8from unittest import mock
9
10from crossbench.exception import (ArgumentTypeMultiException, Entry,
11                                  ExceptionAnnotator, MultiException, annotate,
12                                  annotate_argparsing)
13from tests import test_helper
14
15
16class CustomException(Exception):
17  pass
18
19
20class CustomException2(Exception):
21  pass
22
23
24class CustomValueError(ValueError):
25  pass
26
27class ExceptionHandlerTestCase(unittest.TestCase):
28
29  def test_invalid_get_item(self):
30    annotator = ExceptionAnnotator()
31    with self.assertRaises(TypeError):
32      _ = annotator["invalid key"]
33    with self.assertRaises(IndexError):
34      _ = annotator[1]
35
36  def test_getitem(self):
37    annotator = ExceptionAnnotator()
38    with annotator.capture("exception"):
39      raise CustomException("AAA")
40    with annotator.capture("exception"):
41      raise CustomException("BBB")
42    self.assertEqual(len(annotator), 2)
43    entry_0 = annotator[0]
44    entry_1 = annotator[1]
45    self.assertIsNot(entry_0, entry_1)
46    self.assertIs(annotator[-1], entry_1)
47    with self.assertRaises(IndexError):
48      _ = annotator[2]
49
50  def test_annotate(self):
51    with self.assertRaises(MultiException) as cm:
52      with annotate("BBB"):
53        with annotate("AAA"):
54          raise ValueError("an exception")
55    exception: MultiException = cm.exception
56    annotator: ExceptionAnnotator = exception.annotator
57    self.assertTrue(len(annotator), 1)
58    entry: Entry = annotator[0]
59    self.assertTupleEqual(entry.info_stack, ("BBB", "AAA"))
60    self.assertIsInstance(entry.exception, ValueError)
61
62  def test_annotate_argparse(self):
63    with self.assertRaises(ArgumentTypeMultiException) as cm:
64      with annotate_argparsing("BBB"):
65        with annotate("AAA"):
66          with annotate("000"):
67            raise ValueError("an exception")
68    exception: MultiException = cm.exception
69    self.assertIsInstance(exception, argparse.ArgumentTypeError)
70    annotator: ExceptionAnnotator = exception.annotator
71    self.assertTrue(len(annotator), 1)
72    entry: Entry = annotator[0]
73    self.assertTupleEqual(entry.info_stack, ("BBB", "AAA", "000"))
74    self.assertIsInstance(entry.exception, ValueError)
75
76  def test_annotate_argparse_nested(self):
77    with self.assertRaises(ArgumentTypeMultiException) as cm:
78      with annotate_argparsing("BBB"):
79        with annotate_argparsing("AAA"):
80          with annotate_argparsing("000"):
81            raise CustomValueError("an exception")
82    exception: MultiException = cm.exception
83    self.assertIsInstance(exception, argparse.ArgumentTypeError)
84    annotator: ExceptionAnnotator = exception.annotator
85    self.assertTrue(len(annotator), 1)
86    entry: Entry = annotator[0]
87    self.assertListEqual(
88        annotator.matching(CustomValueError), [
89            entry.exception,
90        ])
91    self.assertTupleEqual(entry.info_stack, ("BBB", "AAA", "000"))
92    self.assertIsInstance(entry.exception, ValueError)
93
94  def test_annotate_argparse_pass_through(self):
95    with self.assertRaises(ArgumentTypeMultiException) as cm:
96      with annotate_argparsing("BBB"):
97        with annotate_argparsing("AAA"):
98          with annotate_argparsing("000"):
99            raise argparse.ArgumentTypeError("some arg type error")
100    self.assertEqual(len(cm.exception), 1)
101    exception: argparse.ArgumentTypeError = cm.exception.matching(
102        argparse.ArgumentTypeError)[0]
103    self.assertIsInstance(exception, argparse.ArgumentTypeError)
104    self.assertEqual(str(exception), "some arg type error")
105
106  def test_annotate_collecting(self):
107    annotator = ExceptionAnnotator()
108    with self.assertRaises(MultiException) as cm:
109      with annotator.annotate("AAA"):
110        with annotator.annotate("000"):
111          raise ValueError("an exception")
112    exception: MultiException = cm.exception
113    self.assertIsInstance(exception, MultiException)
114    self.assertFalse(annotator.is_success)
115    self.assertTrue(len(annotator), 1)
116    entry: Entry = annotator[0]
117    self.assertTupleEqual(entry.info_stack, ("AAA", "000"))
118    self.assertIsInstance(entry.exception, ValueError)
119
120  def test_empty(self):
121    annotator = ExceptionAnnotator()
122    self.assertTrue(annotator.is_success)
123    self.assertEqual(len(annotator), 0)
124    self.assertListEqual(annotator.to_json(), [])
125    with mock.patch("logging.error") as logging_mock:
126      annotator.log()
127    # No exceptions => no error output
128    logging_mock.assert_not_called()
129
130  def test_handle_exception(self):
131    annotator = ExceptionAnnotator()
132    exception = ValueError("custom message")
133    try:
134      raise exception
135    except ValueError as e:
136      annotator.append(e)
137    self.assertFalse(annotator.is_success)
138    serialized = annotator.to_json()
139    self.assertEqual(len(serialized), 1)
140    self.assertEqual(serialized[0]["title"], str(exception))
141    with mock.patch("logging.debug") as logging_mock:
142      annotator.log()
143    logging_mock.assert_has_calls([mock.call(exception)])
144
145  def test_handle_rethrow(self):
146    annotator = ExceptionAnnotator(throw=True)
147    exception = ValueError("custom message")
148    with self.assertRaises(ValueError) as cm:
149      try:
150        raise exception
151      except ValueError as e:
152        annotator.append(e)
153    self.assertEqual(cm.exception, exception)
154    self.assertFalse(annotator.is_success)
155    serialized = annotator.to_json()
156    self.assertEqual(len(serialized), 1)
157    self.assertEqual(serialized[0]["title"], str(exception))
158
159  def test_info_stack(self):
160    annotator = ExceptionAnnotator(throw=True)
161    exception = ValueError("custom message")
162    with self.assertRaises(ValueError) as cm, annotator.info(
163        "info 1", "info 2"):
164      self.assertTupleEqual(annotator.info_stack, ("info 1", "info 2"))
165      try:
166        raise exception
167      except ValueError as e:
168        annotator.append(e)
169    self.assertEqual(cm.exception, exception)
170    self.assertFalse(annotator.is_success)
171    self.assertEqual(len(annotator), 1)
172    entry = annotator[0]
173    self.assertTupleEqual(entry.info_stack, ("info 1", "info 2"))
174    serialized = annotator.to_json()
175    self.assertEqual(len(serialized), 1)
176    self.assertEqual(serialized[0]["title"], str(exception))
177    self.assertEqual(serialized[0]["info_stack"], ("info 1", "info 2"))
178
179  def test_info_stack_logging(self):
180    annotator = ExceptionAnnotator()
181    try:
182      with annotator.info("info 1", "info 2"):
183        raise ValueError("custom message")
184    except ValueError as e:
185      annotator.append(e)
186    with self.assertLogs(level="ERROR") as cm:
187      annotator.log()
188    output = "\n".join(cm.output)
189    self.assertIn("info 1", output)
190    self.assertIn("info 2", output)
191    self.assertIn("custom message", output)
192
193  def test_handle_keyboard_interrupt(self):
194    annotator = ExceptionAnnotator()
195    keyboard_interrupt = KeyboardInterrupt()
196    with mock.patch("sys.exit", side_effect=ValueError) as exit_mock:
197      with self.assertRaises(ValueError) as cm:
198        try:
199          raise keyboard_interrupt
200        except KeyboardInterrupt as e:
201          annotator.append(e)
202      self.assertNotEqual(cm.exception, keyboard_interrupt)
203    exit_mock.assert_called_once_with(0)
204
205  def test_extend(self):
206    annotator_1 = ExceptionAnnotator()
207    try:
208      raise ValueError("error_1")
209    except ValueError as e:
210      annotator_1.append(e)
211    annotator_2 = ExceptionAnnotator()
212    try:
213      raise ValueError("error_2")
214    except ValueError as e:
215      annotator_2.append(e)
216    annotator_3 = ExceptionAnnotator()
217    annotator_4 = ExceptionAnnotator()
218    self.assertFalse(annotator_1.is_success)
219    self.assertFalse(annotator_2.is_success)
220    self.assertTrue(annotator_3.is_success)
221    self.assertTrue(annotator_4.is_success)
222
223    self.assertEqual(len(annotator_1), 1)
224    self.assertEqual(len(annotator_2), 1)
225    annotator_2.extend(annotator_1)
226    self.assertEqual(len(annotator_2), 2)
227    self.assertFalse(annotator_1.is_success)
228    self.assertFalse(annotator_2.is_success)
229
230    self.assertEqual(len(annotator_1), 1)
231    self.assertEqual(len(annotator_3), 0)
232    self.assertEqual(len(annotator_4), 0)
233    annotator_3.extend(annotator_1)
234    annotator_3.extend(annotator_4)
235    self.assertEqual(len(annotator_3), 1)
236    self.assertFalse(annotator_3.is_success)
237    self.assertTrue(annotator_4.is_success)
238
239  def test_extend_nested(self):
240    annotator_1 = ExceptionAnnotator()
241    annotator_2 = ExceptionAnnotator()
242    exception_1 = ValueError("error_1")
243    exception_2 = ValueError("error_2")
244    with annotator_1.capture("info 1", "info 2", exceptions=(ValueError,)):
245      raise exception_1
246    self.assertEqual(len(annotator_1), 1)
247    self.assertEqual(len(annotator_2), 0)
248    with annotator_1.info("info 1", "info 2"):
249      with annotator_2.capture("info 3", "info 4", exceptions=(ValueError,)):
250        raise exception_2
251      annotator_1.extend(annotator_2, is_nested=True)
252    self.assertEqual(len(annotator_1), 2)
253    self.assertEqual(len(annotator_2), 1)
254    self.assertTupleEqual(annotator_1[0].info_stack, ("info 1", "info 2"))
255    self.assertTupleEqual(annotator_1[1].info_stack,
256                          ("info 1", "info 2", "info 3", "info 4"))
257    self.assertTupleEqual(annotator_2[0].info_stack, ("info 3", "info 4"))
258
259  def test_contextmanager(self):
260    annotator = ExceptionAnnotator()
261
262    @contextmanager
263    def context(value):
264      with annotator.capture("entry"):
265        yield value
266
267    with context("custom value") as context_value:
268      raise CustomException("custom exception")
269
270    self.assertEqual(context_value, "custom value")
271    self.assertFalse(annotator.is_success)
272    self.assertEqual(len(annotator), 1)
273    self.assertIn("custom exception", str(annotator[0]))
274
275  def test_contextmanager_raise_before_yield_capture(self):
276
277    @contextmanager
278    def context_simple(value):
279      raise RuntimeError("exception before yield")
280
281    did_run = False
282    with self.assertRaises(RuntimeError) as cm:
283      with context_simple("custom value 1"):
284        did_run = True
285        raise CustomException("custom exception 1")
286    self.assertFalse(did_run)
287    self.assertIn("exception before yield", str(cm.exception))
288
289    annotator = ExceptionAnnotator()
290
291    @contextmanager
292    def context_capture(value):
293      with annotator.capture("entry"):
294        raise RuntimeError("exception before yield")
295      yield value
296
297    did_run = False
298    with self.assertRaises(CustomException) as cm:
299      with context_capture("custom value 2"):
300        did_run = True
301        raise CustomException("custom exception 2")
302    self.assertTrue(did_run)
303    self.assertIn("custom exception 2", str(cm.exception))
304    self.assertFalse(annotator.is_success)
305    self.assertEqual(len(annotator), 1)
306    self.assertIsInstance(annotator[0].exception, RuntimeError)
307
308  def test_contextmanager_raise_before_yield_annotate(self):
309    annotator = ExceptionAnnotator()
310
311    @contextmanager
312    def context_annotate(value):
313      with annotator.annotate("entry"):
314        raise RuntimeError("exception before yield")
315      yield value
316
317    did_run = False
318    with self.assertRaises(MultiException) as cm:
319      with context_annotate("custom value 3"):
320        did_run = True
321        raise CustomException("custom exception 3")
322    self.assertFalse(did_run)
323    self.assertIn("exception before yield", str(cm.exception))
324    self.assertFalse(annotator.is_success)
325    self.assertEqual(len(annotator), 1)
326    self.assertIsInstance(annotator[0].exception, RuntimeError)
327
328
329
330if __name__ == "__main__":
331  test_helper.run_pytest(__file__)
332