1import asyncio 2import contextvars 3import unittest 4from test import support 5 6support.requires_working_socket(module=True) 7 8 9class MyException(Exception): 10 pass 11 12 13def tearDownModule(): 14 asyncio.set_event_loop_policy(None) 15 16 17class TestCM: 18 def __init__(self, ordering, enter_result=None): 19 self.ordering = ordering 20 self.enter_result = enter_result 21 22 async def __aenter__(self): 23 self.ordering.append('enter') 24 return self.enter_result 25 26 async def __aexit__(self, *exc_info): 27 self.ordering.append('exit') 28 29 30class LacksEnterAndExit: 31 pass 32class LacksEnter: 33 async def __aexit__(self, *exc_info): 34 pass 35class LacksExit: 36 async def __aenter__(self): 37 pass 38 39 40VAR = contextvars.ContextVar('VAR', default=()) 41 42 43class TestAsyncCase(unittest.TestCase): 44 maxDiff = None 45 46 def setUp(self): 47 # Ensure that IsolatedAsyncioTestCase instances are destroyed before 48 # starting a new event loop 49 self.addCleanup(support.gc_collect) 50 51 def test_full_cycle(self): 52 expected = ['setUp', 53 'asyncSetUp', 54 'test', 55 'asyncTearDown', 56 'tearDown', 57 'cleanup6', 58 'cleanup5', 59 'cleanup4', 60 'cleanup3', 61 'cleanup2', 62 'cleanup1'] 63 class Test(unittest.IsolatedAsyncioTestCase): 64 def setUp(self): 65 self.assertEqual(events, []) 66 events.append('setUp') 67 VAR.set(VAR.get() + ('setUp',)) 68 self.addCleanup(self.on_cleanup1) 69 self.addAsyncCleanup(self.on_cleanup2) 70 71 async def asyncSetUp(self): 72 self.assertEqual(events, expected[:1]) 73 events.append('asyncSetUp') 74 VAR.set(VAR.get() + ('asyncSetUp',)) 75 self.addCleanup(self.on_cleanup3) 76 self.addAsyncCleanup(self.on_cleanup4) 77 78 async def test_func(self): 79 self.assertEqual(events, expected[:2]) 80 events.append('test') 81 VAR.set(VAR.get() + ('test',)) 82 self.addCleanup(self.on_cleanup5) 83 self.addAsyncCleanup(self.on_cleanup6) 84 85 async def asyncTearDown(self): 86 self.assertEqual(events, expected[:3]) 87 VAR.set(VAR.get() + ('asyncTearDown',)) 88 events.append('asyncTearDown') 89 90 def tearDown(self): 91 self.assertEqual(events, expected[:4]) 92 events.append('tearDown') 93 VAR.set(VAR.get() + ('tearDown',)) 94 95 def on_cleanup1(self): 96 self.assertEqual(events, expected[:10]) 97 events.append('cleanup1') 98 VAR.set(VAR.get() + ('cleanup1',)) 99 nonlocal cvar 100 cvar = VAR.get() 101 102 async def on_cleanup2(self): 103 self.assertEqual(events, expected[:9]) 104 events.append('cleanup2') 105 VAR.set(VAR.get() + ('cleanup2',)) 106 107 def on_cleanup3(self): 108 self.assertEqual(events, expected[:8]) 109 events.append('cleanup3') 110 VAR.set(VAR.get() + ('cleanup3',)) 111 112 async def on_cleanup4(self): 113 self.assertEqual(events, expected[:7]) 114 events.append('cleanup4') 115 VAR.set(VAR.get() + ('cleanup4',)) 116 117 def on_cleanup5(self): 118 self.assertEqual(events, expected[:6]) 119 events.append('cleanup5') 120 VAR.set(VAR.get() + ('cleanup5',)) 121 122 async def on_cleanup6(self): 123 self.assertEqual(events, expected[:5]) 124 events.append('cleanup6') 125 VAR.set(VAR.get() + ('cleanup6',)) 126 127 events = [] 128 cvar = () 129 test = Test("test_func") 130 result = test.run() 131 self.assertEqual(result.errors, []) 132 self.assertEqual(result.failures, []) 133 self.assertEqual(events, expected) 134 self.assertEqual(cvar, tuple(expected)) 135 136 events = [] 137 cvar = () 138 test = Test("test_func") 139 test.debug() 140 self.assertEqual(events, expected) 141 self.assertEqual(cvar, tuple(expected)) 142 test.doCleanups() 143 self.assertEqual(events, expected) 144 self.assertEqual(cvar, tuple(expected)) 145 146 def test_exception_in_setup(self): 147 class Test(unittest.IsolatedAsyncioTestCase): 148 async def asyncSetUp(self): 149 events.append('asyncSetUp') 150 self.addAsyncCleanup(self.on_cleanup) 151 raise MyException() 152 153 async def test_func(self): 154 events.append('test') 155 156 async def asyncTearDown(self): 157 events.append('asyncTearDown') 158 159 async def on_cleanup(self): 160 events.append('cleanup') 161 162 163 events = [] 164 test = Test("test_func") 165 result = test.run() 166 self.assertEqual(events, ['asyncSetUp', 'cleanup']) 167 self.assertIs(result.errors[0][0], test) 168 self.assertIn('MyException', result.errors[0][1]) 169 170 events = [] 171 test = Test("test_func") 172 self.addCleanup(test._tearDownAsyncioRunner) 173 try: 174 test.debug() 175 except MyException: 176 pass 177 else: 178 self.fail('Expected a MyException exception') 179 self.assertEqual(events, ['asyncSetUp']) 180 test.doCleanups() 181 self.assertEqual(events, ['asyncSetUp', 'cleanup']) 182 183 def test_exception_in_test(self): 184 class Test(unittest.IsolatedAsyncioTestCase): 185 async def asyncSetUp(self): 186 events.append('asyncSetUp') 187 188 async def test_func(self): 189 events.append('test') 190 self.addAsyncCleanup(self.on_cleanup) 191 raise MyException() 192 193 async def asyncTearDown(self): 194 events.append('asyncTearDown') 195 196 async def on_cleanup(self): 197 events.append('cleanup') 198 199 events = [] 200 test = Test("test_func") 201 result = test.run() 202 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup']) 203 self.assertIs(result.errors[0][0], test) 204 self.assertIn('MyException', result.errors[0][1]) 205 206 events = [] 207 test = Test("test_func") 208 self.addCleanup(test._tearDownAsyncioRunner) 209 try: 210 test.debug() 211 except MyException: 212 pass 213 else: 214 self.fail('Expected a MyException exception') 215 self.assertEqual(events, ['asyncSetUp', 'test']) 216 test.doCleanups() 217 self.assertEqual(events, ['asyncSetUp', 'test', 'cleanup']) 218 219 def test_exception_in_tear_down(self): 220 class Test(unittest.IsolatedAsyncioTestCase): 221 async def asyncSetUp(self): 222 events.append('asyncSetUp') 223 224 async def test_func(self): 225 events.append('test') 226 self.addAsyncCleanup(self.on_cleanup) 227 228 async def asyncTearDown(self): 229 events.append('asyncTearDown') 230 raise MyException() 231 232 async def on_cleanup(self): 233 events.append('cleanup') 234 235 events = [] 236 test = Test("test_func") 237 result = test.run() 238 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup']) 239 self.assertIs(result.errors[0][0], test) 240 self.assertIn('MyException', result.errors[0][1]) 241 242 events = [] 243 test = Test("test_func") 244 self.addCleanup(test._tearDownAsyncioRunner) 245 try: 246 test.debug() 247 except MyException: 248 pass 249 else: 250 self.fail('Expected a MyException exception') 251 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown']) 252 test.doCleanups() 253 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup']) 254 255 def test_exception_in_tear_clean_up(self): 256 class Test(unittest.IsolatedAsyncioTestCase): 257 async def asyncSetUp(self): 258 events.append('asyncSetUp') 259 260 async def test_func(self): 261 events.append('test') 262 self.addAsyncCleanup(self.on_cleanup1) 263 self.addAsyncCleanup(self.on_cleanup2) 264 265 async def asyncTearDown(self): 266 events.append('asyncTearDown') 267 268 async def on_cleanup1(self): 269 events.append('cleanup1') 270 raise MyException('some error') 271 272 async def on_cleanup2(self): 273 events.append('cleanup2') 274 raise MyException('other error') 275 276 events = [] 277 test = Test("test_func") 278 result = test.run() 279 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2', 'cleanup1']) 280 self.assertIs(result.errors[0][0], test) 281 self.assertIn('MyException: other error', result.errors[0][1]) 282 self.assertIn('MyException: some error', result.errors[1][1]) 283 284 events = [] 285 test = Test("test_func") 286 self.addCleanup(test._tearDownAsyncioRunner) 287 try: 288 test.debug() 289 except MyException: 290 pass 291 else: 292 self.fail('Expected a MyException exception') 293 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2']) 294 test.doCleanups() 295 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2', 'cleanup1']) 296 297 def test_deprecation_of_return_val_from_test(self): 298 # Issue 41322 - deprecate return of value that is not None from a test 299 class Nothing: 300 def __eq__(self, o): 301 return o is None 302 class Test(unittest.IsolatedAsyncioTestCase): 303 async def test1(self): 304 return 1 305 async def test2(self): 306 yield 1 307 async def test3(self): 308 return Nothing() 309 310 with self.assertWarns(DeprecationWarning) as w: 311 Test('test1').run() 312 self.assertIn('It is deprecated to return a value that is not None', str(w.warning)) 313 self.assertIn('test1', str(w.warning)) 314 self.assertEqual(w.filename, __file__) 315 316 with self.assertWarns(DeprecationWarning) as w: 317 Test('test2').run() 318 self.assertIn('It is deprecated to return a value that is not None', str(w.warning)) 319 self.assertIn('test2', str(w.warning)) 320 self.assertEqual(w.filename, __file__) 321 322 with self.assertWarns(DeprecationWarning) as w: 323 Test('test3').run() 324 self.assertIn('It is deprecated to return a value that is not None', str(w.warning)) 325 self.assertIn('test3', str(w.warning)) 326 self.assertEqual(w.filename, __file__) 327 328 def test_cleanups_interleave_order(self): 329 events = [] 330 331 class Test(unittest.IsolatedAsyncioTestCase): 332 async def test_func(self): 333 self.addAsyncCleanup(self.on_sync_cleanup, 1) 334 self.addAsyncCleanup(self.on_async_cleanup, 2) 335 self.addAsyncCleanup(self.on_sync_cleanup, 3) 336 self.addAsyncCleanup(self.on_async_cleanup, 4) 337 338 async def on_sync_cleanup(self, val): 339 events.append(f'sync_cleanup {val}') 340 341 async def on_async_cleanup(self, val): 342 events.append(f'async_cleanup {val}') 343 344 test = Test("test_func") 345 test.run() 346 self.assertEqual(events, ['async_cleanup 4', 347 'sync_cleanup 3', 348 'async_cleanup 2', 349 'sync_cleanup 1']) 350 351 def test_base_exception_from_async_method(self): 352 events = [] 353 class Test(unittest.IsolatedAsyncioTestCase): 354 async def test_base(self): 355 events.append("test_base") 356 raise BaseException() 357 events.append("not it") 358 359 async def test_no_err(self): 360 events.append("test_no_err") 361 362 async def test_cancel(self): 363 raise asyncio.CancelledError() 364 365 test = Test("test_base") 366 output = test.run() 367 self.assertFalse(output.wasSuccessful()) 368 369 test = Test("test_no_err") 370 test.run() 371 self.assertEqual(events, ['test_base', 'test_no_err']) 372 373 test = Test("test_cancel") 374 output = test.run() 375 self.assertFalse(output.wasSuccessful()) 376 377 def test_cancellation_hanging_tasks(self): 378 cancelled = False 379 class Test(unittest.IsolatedAsyncioTestCase): 380 async def test_leaking_task(self): 381 async def coro(): 382 nonlocal cancelled 383 try: 384 await asyncio.sleep(1) 385 except asyncio.CancelledError: 386 cancelled = True 387 raise 388 389 # Leave this running in the background 390 asyncio.create_task(coro()) 391 392 test = Test("test_leaking_task") 393 output = test.run() 394 self.assertTrue(cancelled) 395 396 def test_enterAsyncContext(self): 397 events = [] 398 399 class Test(unittest.IsolatedAsyncioTestCase): 400 async def test_func(slf): 401 slf.addAsyncCleanup(events.append, 'cleanup1') 402 cm = TestCM(events, 42) 403 self.assertEqual(await slf.enterAsyncContext(cm), 42) 404 slf.addAsyncCleanup(events.append, 'cleanup2') 405 events.append('test') 406 407 test = Test('test_func') 408 output = test.run() 409 self.assertTrue(output.wasSuccessful(), output) 410 self.assertEqual(events, ['enter', 'test', 'cleanup2', 'exit', 'cleanup1']) 411 412 def test_enterAsyncContext_arg_errors(self): 413 class Test(unittest.IsolatedAsyncioTestCase): 414 async def test_func(slf): 415 with self.assertRaisesRegex(TypeError, 'asynchronous context manager'): 416 await slf.enterAsyncContext(LacksEnterAndExit()) 417 with self.assertRaisesRegex(TypeError, 'asynchronous context manager'): 418 await slf.enterAsyncContext(LacksEnter()) 419 with self.assertRaisesRegex(TypeError, 'asynchronous context manager'): 420 await slf.enterAsyncContext(LacksExit()) 421 422 test = Test('test_func') 423 output = test.run() 424 self.assertTrue(output.wasSuccessful()) 425 426 def test_debug_cleanup_same_loop(self): 427 class Test(unittest.IsolatedAsyncioTestCase): 428 async def asyncSetUp(self): 429 async def coro(): 430 await asyncio.sleep(0) 431 fut = asyncio.ensure_future(coro()) 432 self.addAsyncCleanup(self.cleanup, fut) 433 events.append('asyncSetUp') 434 435 async def test_func(self): 436 events.append('test') 437 raise MyException() 438 439 async def asyncTearDown(self): 440 events.append('asyncTearDown') 441 442 async def cleanup(self, fut): 443 try: 444 # Raises an exception if in different loop 445 await asyncio.wait([fut]) 446 events.append('cleanup') 447 except: 448 import traceback 449 traceback.print_exc() 450 raise 451 452 events = [] 453 test = Test("test_func") 454 result = test.run() 455 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup']) 456 self.assertIn('MyException', result.errors[0][1]) 457 458 events = [] 459 test = Test("test_func") 460 self.addCleanup(test._tearDownAsyncioRunner) 461 try: 462 test.debug() 463 except MyException: 464 pass 465 else: 466 self.fail('Expected a MyException exception') 467 self.assertEqual(events, ['asyncSetUp', 'test']) 468 test.doCleanups() 469 self.assertEqual(events, ['asyncSetUp', 'test', 'cleanup']) 470 471 def test_setup_get_event_loop(self): 472 # See https://github.com/python/cpython/issues/95736 473 # Make sure the default event loop is not used 474 asyncio.set_event_loop(None) 475 476 class TestCase1(unittest.IsolatedAsyncioTestCase): 477 def setUp(self): 478 asyncio.get_event_loop_policy().get_event_loop() 479 480 async def test_demo1(self): 481 pass 482 483 test = TestCase1('test_demo1') 484 result = test.run() 485 self.assertTrue(result.wasSuccessful()) 486 487if __name__ == "__main__": 488 unittest.main() 489