• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import asyncio
2import contextvars
3import unittest
4from test import support
5
6support.requires_working_socket(module=True)
7
8
9class MyException(Exception):
10    pass
11
12
13def tearDownModule():
14    asyncio.set_event_loop_policy(None)
15
16
17class TestCM:
18    def __init__(self, ordering, enter_result=None):
19        self.ordering = ordering
20        self.enter_result = enter_result
21
22    async def __aenter__(self):
23        self.ordering.append('enter')
24        return self.enter_result
25
26    async def __aexit__(self, *exc_info):
27        self.ordering.append('exit')
28
29
30class LacksEnterAndExit:
31    pass
32class LacksEnter:
33    async def __aexit__(self, *exc_info):
34        pass
35class LacksExit:
36    async def __aenter__(self):
37        pass
38
39
40VAR = contextvars.ContextVar('VAR', default=())
41
42
43class TestAsyncCase(unittest.TestCase):
44    maxDiff = None
45
46    def setUp(self):
47        # Ensure that IsolatedAsyncioTestCase instances are destroyed before
48        # starting a new event loop
49        self.addCleanup(support.gc_collect)
50
51    def test_full_cycle(self):
52        expected = ['setUp',
53                    'asyncSetUp',
54                    'test',
55                    'asyncTearDown',
56                    'tearDown',
57                    'cleanup6',
58                    'cleanup5',
59                    'cleanup4',
60                    'cleanup3',
61                    'cleanup2',
62                    'cleanup1']
63        class Test(unittest.IsolatedAsyncioTestCase):
64            def setUp(self):
65                self.assertEqual(events, [])
66                events.append('setUp')
67                VAR.set(VAR.get() + ('setUp',))
68                self.addCleanup(self.on_cleanup1)
69                self.addAsyncCleanup(self.on_cleanup2)
70
71            async def asyncSetUp(self):
72                self.assertEqual(events, expected[:1])
73                events.append('asyncSetUp')
74                VAR.set(VAR.get() + ('asyncSetUp',))
75                self.addCleanup(self.on_cleanup3)
76                self.addAsyncCleanup(self.on_cleanup4)
77
78            async def test_func(self):
79                self.assertEqual(events, expected[:2])
80                events.append('test')
81                VAR.set(VAR.get() + ('test',))
82                self.addCleanup(self.on_cleanup5)
83                self.addAsyncCleanup(self.on_cleanup6)
84
85            async def asyncTearDown(self):
86                self.assertEqual(events, expected[:3])
87                VAR.set(VAR.get() + ('asyncTearDown',))
88                events.append('asyncTearDown')
89
90            def tearDown(self):
91                self.assertEqual(events, expected[:4])
92                events.append('tearDown')
93                VAR.set(VAR.get() + ('tearDown',))
94
95            def on_cleanup1(self):
96                self.assertEqual(events, expected[:10])
97                events.append('cleanup1')
98                VAR.set(VAR.get() + ('cleanup1',))
99                nonlocal cvar
100                cvar = VAR.get()
101
102            async def on_cleanup2(self):
103                self.assertEqual(events, expected[:9])
104                events.append('cleanup2')
105                VAR.set(VAR.get() + ('cleanup2',))
106
107            def on_cleanup3(self):
108                self.assertEqual(events, expected[:8])
109                events.append('cleanup3')
110                VAR.set(VAR.get() + ('cleanup3',))
111
112            async def on_cleanup4(self):
113                self.assertEqual(events, expected[:7])
114                events.append('cleanup4')
115                VAR.set(VAR.get() + ('cleanup4',))
116
117            def on_cleanup5(self):
118                self.assertEqual(events, expected[:6])
119                events.append('cleanup5')
120                VAR.set(VAR.get() + ('cleanup5',))
121
122            async def on_cleanup6(self):
123                self.assertEqual(events, expected[:5])
124                events.append('cleanup6')
125                VAR.set(VAR.get() + ('cleanup6',))
126
127        events = []
128        cvar = ()
129        test = Test("test_func")
130        result = test.run()
131        self.assertEqual(result.errors, [])
132        self.assertEqual(result.failures, [])
133        self.assertEqual(events, expected)
134        self.assertEqual(cvar, tuple(expected))
135
136        events = []
137        cvar = ()
138        test = Test("test_func")
139        test.debug()
140        self.assertEqual(events, expected)
141        self.assertEqual(cvar, tuple(expected))
142        test.doCleanups()
143        self.assertEqual(events, expected)
144        self.assertEqual(cvar, tuple(expected))
145
146    def test_exception_in_setup(self):
147        class Test(unittest.IsolatedAsyncioTestCase):
148            async def asyncSetUp(self):
149                events.append('asyncSetUp')
150                self.addAsyncCleanup(self.on_cleanup)
151                raise MyException()
152
153            async def test_func(self):
154                events.append('test')
155
156            async def asyncTearDown(self):
157                events.append('asyncTearDown')
158
159            async def on_cleanup(self):
160                events.append('cleanup')
161
162
163        events = []
164        test = Test("test_func")
165        result = test.run()
166        self.assertEqual(events, ['asyncSetUp', 'cleanup'])
167        self.assertIs(result.errors[0][0], test)
168        self.assertIn('MyException', result.errors[0][1])
169
170        events = []
171        test = Test("test_func")
172        self.addCleanup(test._tearDownAsyncioRunner)
173        try:
174            test.debug()
175        except MyException:
176            pass
177        else:
178            self.fail('Expected a MyException exception')
179        self.assertEqual(events, ['asyncSetUp'])
180        test.doCleanups()
181        self.assertEqual(events, ['asyncSetUp', 'cleanup'])
182
183    def test_exception_in_test(self):
184        class Test(unittest.IsolatedAsyncioTestCase):
185            async def asyncSetUp(self):
186                events.append('asyncSetUp')
187
188            async def test_func(self):
189                events.append('test')
190                self.addAsyncCleanup(self.on_cleanup)
191                raise MyException()
192
193            async def asyncTearDown(self):
194                events.append('asyncTearDown')
195
196            async def on_cleanup(self):
197                events.append('cleanup')
198
199        events = []
200        test = Test("test_func")
201        result = test.run()
202        self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
203        self.assertIs(result.errors[0][0], test)
204        self.assertIn('MyException', result.errors[0][1])
205
206        events = []
207        test = Test("test_func")
208        self.addCleanup(test._tearDownAsyncioRunner)
209        try:
210            test.debug()
211        except MyException:
212            pass
213        else:
214            self.fail('Expected a MyException exception')
215        self.assertEqual(events, ['asyncSetUp', 'test'])
216        test.doCleanups()
217        self.assertEqual(events, ['asyncSetUp', 'test', 'cleanup'])
218
219    def test_exception_in_tear_down(self):
220        class Test(unittest.IsolatedAsyncioTestCase):
221            async def asyncSetUp(self):
222                events.append('asyncSetUp')
223
224            async def test_func(self):
225                events.append('test')
226                self.addAsyncCleanup(self.on_cleanup)
227
228            async def asyncTearDown(self):
229                events.append('asyncTearDown')
230                raise MyException()
231
232            async def on_cleanup(self):
233                events.append('cleanup')
234
235        events = []
236        test = Test("test_func")
237        result = test.run()
238        self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
239        self.assertIs(result.errors[0][0], test)
240        self.assertIn('MyException', result.errors[0][1])
241
242        events = []
243        test = Test("test_func")
244        self.addCleanup(test._tearDownAsyncioRunner)
245        try:
246            test.debug()
247        except MyException:
248            pass
249        else:
250            self.fail('Expected a MyException exception')
251        self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown'])
252        test.doCleanups()
253        self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
254
255    def test_exception_in_tear_clean_up(self):
256        class Test(unittest.IsolatedAsyncioTestCase):
257            async def asyncSetUp(self):
258                events.append('asyncSetUp')
259
260            async def test_func(self):
261                events.append('test')
262                self.addAsyncCleanup(self.on_cleanup1)
263                self.addAsyncCleanup(self.on_cleanup2)
264
265            async def asyncTearDown(self):
266                events.append('asyncTearDown')
267
268            async def on_cleanup1(self):
269                events.append('cleanup1')
270                raise MyException('some error')
271
272            async def on_cleanup2(self):
273                events.append('cleanup2')
274                raise MyException('other error')
275
276        events = []
277        test = Test("test_func")
278        result = test.run()
279        self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2', 'cleanup1'])
280        self.assertIs(result.errors[0][0], test)
281        self.assertIn('MyException: other error', result.errors[0][1])
282        self.assertIn('MyException: some error', result.errors[1][1])
283
284        events = []
285        test = Test("test_func")
286        self.addCleanup(test._tearDownAsyncioRunner)
287        try:
288            test.debug()
289        except MyException:
290            pass
291        else:
292            self.fail('Expected a MyException exception')
293        self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2'])
294        test.doCleanups()
295        self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2', 'cleanup1'])
296
297    def test_deprecation_of_return_val_from_test(self):
298        # Issue 41322 - deprecate return of value that is not None from a test
299        class Nothing:
300            def __eq__(self, o):
301                return o is None
302        class Test(unittest.IsolatedAsyncioTestCase):
303            async def test1(self):
304                return 1
305            async def test2(self):
306                yield 1
307            async def test3(self):
308                return Nothing()
309
310        with self.assertWarns(DeprecationWarning) as w:
311            Test('test1').run()
312        self.assertIn('It is deprecated to return a value that is not None', str(w.warning))
313        self.assertIn('test1', str(w.warning))
314        self.assertEqual(w.filename, __file__)
315
316        with self.assertWarns(DeprecationWarning) as w:
317            Test('test2').run()
318        self.assertIn('It is deprecated to return a value that is not None', str(w.warning))
319        self.assertIn('test2', str(w.warning))
320        self.assertEqual(w.filename, __file__)
321
322        with self.assertWarns(DeprecationWarning) as w:
323            Test('test3').run()
324        self.assertIn('It is deprecated to return a value that is not None', str(w.warning))
325        self.assertIn('test3', str(w.warning))
326        self.assertEqual(w.filename, __file__)
327
328    def test_cleanups_interleave_order(self):
329        events = []
330
331        class Test(unittest.IsolatedAsyncioTestCase):
332            async def test_func(self):
333                self.addAsyncCleanup(self.on_sync_cleanup, 1)
334                self.addAsyncCleanup(self.on_async_cleanup, 2)
335                self.addAsyncCleanup(self.on_sync_cleanup, 3)
336                self.addAsyncCleanup(self.on_async_cleanup, 4)
337
338            async def on_sync_cleanup(self, val):
339                events.append(f'sync_cleanup {val}')
340
341            async def on_async_cleanup(self, val):
342                events.append(f'async_cleanup {val}')
343
344        test = Test("test_func")
345        test.run()
346        self.assertEqual(events, ['async_cleanup 4',
347                                  'sync_cleanup 3',
348                                  'async_cleanup 2',
349                                  'sync_cleanup 1'])
350
351    def test_base_exception_from_async_method(self):
352        events = []
353        class Test(unittest.IsolatedAsyncioTestCase):
354            async def test_base(self):
355                events.append("test_base")
356                raise BaseException()
357                events.append("not it")
358
359            async def test_no_err(self):
360                events.append("test_no_err")
361
362            async def test_cancel(self):
363                raise asyncio.CancelledError()
364
365        test = Test("test_base")
366        output = test.run()
367        self.assertFalse(output.wasSuccessful())
368
369        test = Test("test_no_err")
370        test.run()
371        self.assertEqual(events, ['test_base', 'test_no_err'])
372
373        test = Test("test_cancel")
374        output = test.run()
375        self.assertFalse(output.wasSuccessful())
376
377    def test_cancellation_hanging_tasks(self):
378        cancelled = False
379        class Test(unittest.IsolatedAsyncioTestCase):
380            async def test_leaking_task(self):
381                async def coro():
382                    nonlocal cancelled
383                    try:
384                        await asyncio.sleep(1)
385                    except asyncio.CancelledError:
386                        cancelled = True
387                        raise
388
389                # Leave this running in the background
390                asyncio.create_task(coro())
391
392        test = Test("test_leaking_task")
393        output = test.run()
394        self.assertTrue(cancelled)
395
396    def test_enterAsyncContext(self):
397        events = []
398
399        class Test(unittest.IsolatedAsyncioTestCase):
400            async def test_func(slf):
401                slf.addAsyncCleanup(events.append, 'cleanup1')
402                cm = TestCM(events, 42)
403                self.assertEqual(await slf.enterAsyncContext(cm), 42)
404                slf.addAsyncCleanup(events.append, 'cleanup2')
405                events.append('test')
406
407        test = Test('test_func')
408        output = test.run()
409        self.assertTrue(output.wasSuccessful(), output)
410        self.assertEqual(events, ['enter', 'test', 'cleanup2', 'exit', 'cleanup1'])
411
412    def test_enterAsyncContext_arg_errors(self):
413        class Test(unittest.IsolatedAsyncioTestCase):
414            async def test_func(slf):
415                with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
416                    await slf.enterAsyncContext(LacksEnterAndExit())
417                with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
418                    await slf.enterAsyncContext(LacksEnter())
419                with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
420                    await slf.enterAsyncContext(LacksExit())
421
422        test = Test('test_func')
423        output = test.run()
424        self.assertTrue(output.wasSuccessful())
425
426    def test_debug_cleanup_same_loop(self):
427        class Test(unittest.IsolatedAsyncioTestCase):
428            async def asyncSetUp(self):
429                async def coro():
430                    await asyncio.sleep(0)
431                fut = asyncio.ensure_future(coro())
432                self.addAsyncCleanup(self.cleanup, fut)
433                events.append('asyncSetUp')
434
435            async def test_func(self):
436                events.append('test')
437                raise MyException()
438
439            async def asyncTearDown(self):
440                events.append('asyncTearDown')
441
442            async def cleanup(self, fut):
443                try:
444                    # Raises an exception if in different loop
445                    await asyncio.wait([fut])
446                    events.append('cleanup')
447                except:
448                    import traceback
449                    traceback.print_exc()
450                    raise
451
452        events = []
453        test = Test("test_func")
454        result = test.run()
455        self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
456        self.assertIn('MyException', result.errors[0][1])
457
458        events = []
459        test = Test("test_func")
460        self.addCleanup(test._tearDownAsyncioRunner)
461        try:
462            test.debug()
463        except MyException:
464            pass
465        else:
466            self.fail('Expected a MyException exception')
467        self.assertEqual(events, ['asyncSetUp', 'test'])
468        test.doCleanups()
469        self.assertEqual(events, ['asyncSetUp', 'test', 'cleanup'])
470
471    def test_setup_get_event_loop(self):
472        # See https://github.com/python/cpython/issues/95736
473        # Make sure the default event loop is not used
474        asyncio.set_event_loop(None)
475
476        class TestCase1(unittest.IsolatedAsyncioTestCase):
477            def setUp(self):
478                asyncio.get_event_loop_policy().get_event_loop()
479
480            async def test_demo1(self):
481                pass
482
483        test = TestCase1('test_demo1')
484        result = test.run()
485        self.assertTrue(result.wasSuccessful())
486
487if __name__ == "__main__":
488    unittest.main()
489