• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests support for new syntax introduced by PEP 492."""
2
3import sys
4import types
5import unittest
6
7from unittest import mock
8
9import asyncio
10from test.test_asyncio import utils as test_utils
11
12
13def tearDownModule():
14    asyncio.set_event_loop_policy(None)
15
16
17# Test that asyncio.iscoroutine() uses collections.abc.Coroutine
18class FakeCoro:
19    def send(self, value):
20        pass
21
22    def throw(self, typ, val=None, tb=None):
23        pass
24
25    def close(self):
26        pass
27
28    def __await__(self):
29        yield
30
31
32class BaseTest(test_utils.TestCase):
33
34    def setUp(self):
35        super().setUp()
36        self.loop = asyncio.BaseEventLoop()
37        self.loop._process_events = mock.Mock()
38        self.loop._selector = mock.Mock()
39        self.loop._selector.select.return_value = ()
40        self.set_event_loop(self.loop)
41
42
43class LockTests(BaseTest):
44
45    def test_context_manager_async_with(self):
46        with self.assertWarns(DeprecationWarning):
47            primitives = [
48                asyncio.Lock(loop=self.loop),
49                asyncio.Condition(loop=self.loop),
50                asyncio.Semaphore(loop=self.loop),
51                asyncio.BoundedSemaphore(loop=self.loop),
52            ]
53
54        async def test(lock):
55            await asyncio.sleep(0.01)
56            self.assertFalse(lock.locked())
57            async with lock as _lock:
58                self.assertIs(_lock, None)
59                self.assertTrue(lock.locked())
60                await asyncio.sleep(0.01)
61                self.assertTrue(lock.locked())
62            self.assertFalse(lock.locked())
63
64        for primitive in primitives:
65            self.loop.run_until_complete(test(primitive))
66            self.assertFalse(primitive.locked())
67
68    def test_context_manager_with_await(self):
69        with self.assertWarns(DeprecationWarning):
70            primitives = [
71                asyncio.Lock(loop=self.loop),
72                asyncio.Condition(loop=self.loop),
73                asyncio.Semaphore(loop=self.loop),
74                asyncio.BoundedSemaphore(loop=self.loop),
75            ]
76
77        async def test(lock):
78            await asyncio.sleep(0.01)
79            self.assertFalse(lock.locked())
80            with self.assertWarns(DeprecationWarning):
81                with await lock as _lock:
82                    self.assertIs(_lock, None)
83                    self.assertTrue(lock.locked())
84                    await asyncio.sleep(0.01)
85                    self.assertTrue(lock.locked())
86                self.assertFalse(lock.locked())
87
88        for primitive in primitives:
89            self.loop.run_until_complete(test(primitive))
90            self.assertFalse(primitive.locked())
91
92
93class StreamReaderTests(BaseTest):
94
95    def test_readline(self):
96        DATA = b'line1\nline2\nline3'
97
98        stream = asyncio.StreamReader(loop=self.loop)
99        stream.feed_data(DATA)
100        stream.feed_eof()
101
102        async def reader():
103            data = []
104            async for line in stream:
105                data.append(line)
106            return data
107
108        data = self.loop.run_until_complete(reader())
109        self.assertEqual(data, [b'line1\n', b'line2\n', b'line3'])
110
111
112class CoroutineTests(BaseTest):
113
114    def test_iscoroutine(self):
115        async def foo(): pass
116
117        f = foo()
118        try:
119            self.assertTrue(asyncio.iscoroutine(f))
120        finally:
121            f.close() # silence warning
122
123        self.assertTrue(asyncio.iscoroutine(FakeCoro()))
124
125    def test_iscoroutinefunction(self):
126        async def foo(): pass
127        self.assertTrue(asyncio.iscoroutinefunction(foo))
128
129    def test_function_returning_awaitable(self):
130        class Awaitable:
131            def __await__(self):
132                return ('spam',)
133
134        with self.assertWarns(DeprecationWarning):
135            @asyncio.coroutine
136            def func():
137                return Awaitable()
138
139        coro = func()
140        self.assertEqual(coro.send(None), 'spam')
141        coro.close()
142
143    def test_async_def_coroutines(self):
144        async def bar():
145            return 'spam'
146        async def foo():
147            return await bar()
148
149        # production mode
150        data = self.loop.run_until_complete(foo())
151        self.assertEqual(data, 'spam')
152
153        # debug mode
154        self.loop.set_debug(True)
155        data = self.loop.run_until_complete(foo())
156        self.assertEqual(data, 'spam')
157
158    def test_debug_mode_manages_coroutine_origin_tracking(self):
159        async def start():
160            self.assertTrue(sys.get_coroutine_origin_tracking_depth() > 0)
161
162        self.assertEqual(sys.get_coroutine_origin_tracking_depth(), 0)
163        self.loop.set_debug(True)
164        self.loop.run_until_complete(start())
165        self.assertEqual(sys.get_coroutine_origin_tracking_depth(), 0)
166
167    def test_types_coroutine(self):
168        def gen():
169            yield from ()
170            return 'spam'
171
172        @types.coroutine
173        def func():
174            return gen()
175
176        async def coro():
177            wrapper = func()
178            self.assertIsInstance(wrapper, types._GeneratorWrapper)
179            return await wrapper
180
181        data = self.loop.run_until_complete(coro())
182        self.assertEqual(data, 'spam')
183
184    def test_task_print_stack(self):
185        T = None
186
187        async def foo():
188            f = T.get_stack(limit=1)
189            try:
190                self.assertEqual(f[0].f_code.co_name, 'foo')
191            finally:
192                f = None
193
194        async def runner():
195            nonlocal T
196            T = asyncio.ensure_future(foo(), loop=self.loop)
197            await T
198
199        self.loop.run_until_complete(runner())
200
201    def test_double_await(self):
202        async def afunc():
203            await asyncio.sleep(0.1)
204
205        async def runner():
206            coro = afunc()
207            t = self.loop.create_task(coro)
208            try:
209                await asyncio.sleep(0)
210                await coro
211            finally:
212                t.cancel()
213
214        self.loop.set_debug(True)
215        with self.assertRaises(
216                RuntimeError,
217                msg='coroutine is being awaited already'):
218
219            self.loop.run_until_complete(runner())
220
221
222if __name__ == '__main__':
223    unittest.main()
224