1"""Tests support for new syntax introduced by PEP 492.""" 2 3import sys 4import types 5import unittest 6 7from unittest import mock 8 9import asyncio 10from test.test_asyncio import utils as test_utils 11 12 13def tearDownModule(): 14 asyncio.set_event_loop_policy(None) 15 16 17# Test that asyncio.iscoroutine() uses collections.abc.Coroutine 18class FakeCoro: 19 def send(self, value): 20 pass 21 22 def throw(self, typ, val=None, tb=None): 23 pass 24 25 def close(self): 26 pass 27 28 def __await__(self): 29 yield 30 31 32class BaseTest(test_utils.TestCase): 33 34 def setUp(self): 35 super().setUp() 36 self.loop = asyncio.BaseEventLoop() 37 self.loop._process_events = mock.Mock() 38 self.loop._selector = mock.Mock() 39 self.loop._selector.select.return_value = () 40 self.set_event_loop(self.loop) 41 42 43class LockTests(BaseTest): 44 45 def test_context_manager_async_with(self): 46 with self.assertWarns(DeprecationWarning): 47 primitives = [ 48 asyncio.Lock(loop=self.loop), 49 asyncio.Condition(loop=self.loop), 50 asyncio.Semaphore(loop=self.loop), 51 asyncio.BoundedSemaphore(loop=self.loop), 52 ] 53 54 async def test(lock): 55 await asyncio.sleep(0.01) 56 self.assertFalse(lock.locked()) 57 async with lock as _lock: 58 self.assertIs(_lock, None) 59 self.assertTrue(lock.locked()) 60 await asyncio.sleep(0.01) 61 self.assertTrue(lock.locked()) 62 self.assertFalse(lock.locked()) 63 64 for primitive in primitives: 65 self.loop.run_until_complete(test(primitive)) 66 self.assertFalse(primitive.locked()) 67 68 def test_context_manager_with_await(self): 69 with self.assertWarns(DeprecationWarning): 70 primitives = [ 71 asyncio.Lock(loop=self.loop), 72 asyncio.Condition(loop=self.loop), 73 asyncio.Semaphore(loop=self.loop), 74 asyncio.BoundedSemaphore(loop=self.loop), 75 ] 76 77 async def test(lock): 78 await asyncio.sleep(0.01) 79 self.assertFalse(lock.locked()) 80 with self.assertWarns(DeprecationWarning): 81 with await lock as _lock: 82 self.assertIs(_lock, None) 83 self.assertTrue(lock.locked()) 84 await asyncio.sleep(0.01) 85 self.assertTrue(lock.locked()) 86 self.assertFalse(lock.locked()) 87 88 for primitive in primitives: 89 self.loop.run_until_complete(test(primitive)) 90 self.assertFalse(primitive.locked()) 91 92 93class StreamReaderTests(BaseTest): 94 95 def test_readline(self): 96 DATA = b'line1\nline2\nline3' 97 98 stream = asyncio.StreamReader(loop=self.loop) 99 stream.feed_data(DATA) 100 stream.feed_eof() 101 102 async def reader(): 103 data = [] 104 async for line in stream: 105 data.append(line) 106 return data 107 108 data = self.loop.run_until_complete(reader()) 109 self.assertEqual(data, [b'line1\n', b'line2\n', b'line3']) 110 111 112class CoroutineTests(BaseTest): 113 114 def test_iscoroutine(self): 115 async def foo(): pass 116 117 f = foo() 118 try: 119 self.assertTrue(asyncio.iscoroutine(f)) 120 finally: 121 f.close() # silence warning 122 123 self.assertTrue(asyncio.iscoroutine(FakeCoro())) 124 125 def test_iscoroutinefunction(self): 126 async def foo(): pass 127 self.assertTrue(asyncio.iscoroutinefunction(foo)) 128 129 def test_function_returning_awaitable(self): 130 class Awaitable: 131 def __await__(self): 132 return ('spam',) 133 134 with self.assertWarns(DeprecationWarning): 135 @asyncio.coroutine 136 def func(): 137 return Awaitable() 138 139 coro = func() 140 self.assertEqual(coro.send(None), 'spam') 141 coro.close() 142 143 def test_async_def_coroutines(self): 144 async def bar(): 145 return 'spam' 146 async def foo(): 147 return await bar() 148 149 # production mode 150 data = self.loop.run_until_complete(foo()) 151 self.assertEqual(data, 'spam') 152 153 # debug mode 154 self.loop.set_debug(True) 155 data = self.loop.run_until_complete(foo()) 156 self.assertEqual(data, 'spam') 157 158 def test_debug_mode_manages_coroutine_origin_tracking(self): 159 async def start(): 160 self.assertTrue(sys.get_coroutine_origin_tracking_depth() > 0) 161 162 self.assertEqual(sys.get_coroutine_origin_tracking_depth(), 0) 163 self.loop.set_debug(True) 164 self.loop.run_until_complete(start()) 165 self.assertEqual(sys.get_coroutine_origin_tracking_depth(), 0) 166 167 def test_types_coroutine(self): 168 def gen(): 169 yield from () 170 return 'spam' 171 172 @types.coroutine 173 def func(): 174 return gen() 175 176 async def coro(): 177 wrapper = func() 178 self.assertIsInstance(wrapper, types._GeneratorWrapper) 179 return await wrapper 180 181 data = self.loop.run_until_complete(coro()) 182 self.assertEqual(data, 'spam') 183 184 def test_task_print_stack(self): 185 T = None 186 187 async def foo(): 188 f = T.get_stack(limit=1) 189 try: 190 self.assertEqual(f[0].f_code.co_name, 'foo') 191 finally: 192 f = None 193 194 async def runner(): 195 nonlocal T 196 T = asyncio.ensure_future(foo(), loop=self.loop) 197 await T 198 199 self.loop.run_until_complete(runner()) 200 201 def test_double_await(self): 202 async def afunc(): 203 await asyncio.sleep(0.1) 204 205 async def runner(): 206 coro = afunc() 207 t = self.loop.create_task(coro) 208 try: 209 await asyncio.sleep(0) 210 await coro 211 finally: 212 t.cancel() 213 214 self.loop.set_debug(True) 215 with self.assertRaises( 216 RuntimeError, 217 msg='coroutine is being awaited already'): 218 219 self.loop.run_until_complete(runner()) 220 221 222if __name__ == '__main__': 223 unittest.main() 224