1"""Tests support for new syntax introduced by PEP 492.""" 2 3import collections.abc 4import types 5import unittest 6 7try: 8 from test import support 9except ImportError: 10 from asyncio import test_support as support 11from unittest import mock 12 13import asyncio 14from asyncio import test_utils 15 16 17class BaseTest(test_utils.TestCase): 18 19 def setUp(self): 20 super().setUp() 21 self.loop = asyncio.BaseEventLoop() 22 self.loop._process_events = mock.Mock() 23 self.loop._selector = mock.Mock() 24 self.loop._selector.select.return_value = () 25 self.set_event_loop(self.loop) 26 27 28class LockTests(BaseTest): 29 30 def test_context_manager_async_with(self): 31 primitives = [ 32 asyncio.Lock(loop=self.loop), 33 asyncio.Condition(loop=self.loop), 34 asyncio.Semaphore(loop=self.loop), 35 asyncio.BoundedSemaphore(loop=self.loop), 36 ] 37 38 async def test(lock): 39 await asyncio.sleep(0.01, loop=self.loop) 40 self.assertFalse(lock.locked()) 41 async with lock as _lock: 42 self.assertIs(_lock, None) 43 self.assertTrue(lock.locked()) 44 await asyncio.sleep(0.01, loop=self.loop) 45 self.assertTrue(lock.locked()) 46 self.assertFalse(lock.locked()) 47 48 for primitive in primitives: 49 self.loop.run_until_complete(test(primitive)) 50 self.assertFalse(primitive.locked()) 51 52 def test_context_manager_with_await(self): 53 primitives = [ 54 asyncio.Lock(loop=self.loop), 55 asyncio.Condition(loop=self.loop), 56 asyncio.Semaphore(loop=self.loop), 57 asyncio.BoundedSemaphore(loop=self.loop), 58 ] 59 60 async def test(lock): 61 await asyncio.sleep(0.01, loop=self.loop) 62 self.assertFalse(lock.locked()) 63 with await lock as _lock: 64 self.assertIs(_lock, None) 65 self.assertTrue(lock.locked()) 66 await asyncio.sleep(0.01, loop=self.loop) 67 self.assertTrue(lock.locked()) 68 self.assertFalse(lock.locked()) 69 70 for primitive in primitives: 71 self.loop.run_until_complete(test(primitive)) 72 self.assertFalse(primitive.locked()) 73 74 75class StreamReaderTests(BaseTest): 76 77 def test_readline(self): 78 DATA = b'line1\nline2\nline3' 79 80 stream = asyncio.StreamReader(loop=self.loop) 81 stream.feed_data(DATA) 82 stream.feed_eof() 83 84 async def reader(): 85 data = [] 86 async for line in stream: 87 data.append(line) 88 return data 89 90 data = self.loop.run_until_complete(reader()) 91 self.assertEqual(data, [b'line1\n', b'line2\n', b'line3']) 92 93 94class CoroutineTests(BaseTest): 95 96 def test_iscoroutine(self): 97 async def foo(): pass 98 99 f = foo() 100 try: 101 self.assertTrue(asyncio.iscoroutine(f)) 102 finally: 103 f.close() # silence warning 104 105 # Test that asyncio.iscoroutine() uses collections.abc.Coroutine 106 class FakeCoro: 107 def send(self, value): pass 108 def throw(self, typ, val=None, tb=None): pass 109 def close(self): pass 110 def __await__(self): yield 111 112 self.assertTrue(asyncio.iscoroutine(FakeCoro())) 113 114 def test_iscoroutinefunction(self): 115 async def foo(): pass 116 self.assertTrue(asyncio.iscoroutinefunction(foo)) 117 118 def test_function_returning_awaitable(self): 119 class Awaitable: 120 def __await__(self): 121 return ('spam',) 122 123 @asyncio.coroutine 124 def func(): 125 return Awaitable() 126 127 coro = func() 128 self.assertEqual(coro.send(None), 'spam') 129 coro.close() 130 131 def test_async_def_coroutines(self): 132 async def bar(): 133 return 'spam' 134 async def foo(): 135 return await bar() 136 137 # production mode 138 data = self.loop.run_until_complete(foo()) 139 self.assertEqual(data, 'spam') 140 141 # debug mode 142 self.loop.set_debug(True) 143 data = self.loop.run_until_complete(foo()) 144 self.assertEqual(data, 'spam') 145 146 @mock.patch('asyncio.coroutines.logger') 147 def test_async_def_wrapped(self, m_log): 148 async def foo(): 149 pass 150 async def start(): 151 foo_coro = foo() 152 self.assertRegex( 153 repr(foo_coro), 154 r'<CoroWrapper .*\.foo\(\) running at .*pep492.*>') 155 156 with support.check_warnings((r'.*foo.*was never', 157 RuntimeWarning)): 158 foo_coro = None 159 support.gc_collect() 160 self.assertTrue(m_log.error.called) 161 message = m_log.error.call_args[0][0] 162 self.assertRegex(message, 163 r'CoroWrapper.*foo.*was never') 164 165 self.loop.set_debug(True) 166 self.loop.run_until_complete(start()) 167 168 async def start(): 169 foo_coro = foo() 170 task = asyncio.ensure_future(foo_coro, loop=self.loop) 171 self.assertRegex(repr(task), r'Task.*foo.*running') 172 173 self.loop.run_until_complete(start()) 174 175 176 def test_types_coroutine(self): 177 def gen(): 178 yield from () 179 return 'spam' 180 181 @types.coroutine 182 def func(): 183 return gen() 184 185 async def coro(): 186 wrapper = func() 187 self.assertIsInstance(wrapper, types._GeneratorWrapper) 188 return await wrapper 189 190 data = self.loop.run_until_complete(coro()) 191 self.assertEqual(data, 'spam') 192 193 def test_task_print_stack(self): 194 T = None 195 196 async def foo(): 197 f = T.get_stack(limit=1) 198 try: 199 self.assertEqual(f[0].f_code.co_name, 'foo') 200 finally: 201 f = None 202 203 async def runner(): 204 nonlocal T 205 T = asyncio.ensure_future(foo(), loop=self.loop) 206 await T 207 208 self.loop.run_until_complete(runner()) 209 210 def test_double_await(self): 211 async def afunc(): 212 await asyncio.sleep(0.1, loop=self.loop) 213 214 async def runner(): 215 coro = afunc() 216 t = asyncio.Task(coro, loop=self.loop) 217 try: 218 await asyncio.sleep(0, loop=self.loop) 219 await coro 220 finally: 221 t.cancel() 222 223 self.loop.set_debug(True) 224 with self.assertRaisesRegex( 225 RuntimeError, 226 r'Cannot await.*test_double_await.*\bafunc\b.*while.*\bsleep\b'): 227 228 self.loop.run_until_complete(runner()) 229 230 231if __name__ == '__main__': 232 unittest.main() 233