• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests support for new syntax introduced by PEP 492."""
2
3import collections.abc
4import types
5import unittest
6
7try:
8    from test import support
9except ImportError:
10    from asyncio import test_support as support
11from unittest import mock
12
13import asyncio
14from asyncio import test_utils
15
16
17class BaseTest(test_utils.TestCase):
18
19    def setUp(self):
20        super().setUp()
21        self.loop = asyncio.BaseEventLoop()
22        self.loop._process_events = mock.Mock()
23        self.loop._selector = mock.Mock()
24        self.loop._selector.select.return_value = ()
25        self.set_event_loop(self.loop)
26
27
28class LockTests(BaseTest):
29
30    def test_context_manager_async_with(self):
31        primitives = [
32            asyncio.Lock(loop=self.loop),
33            asyncio.Condition(loop=self.loop),
34            asyncio.Semaphore(loop=self.loop),
35            asyncio.BoundedSemaphore(loop=self.loop),
36        ]
37
38        async def test(lock):
39            await asyncio.sleep(0.01, loop=self.loop)
40            self.assertFalse(lock.locked())
41            async with lock as _lock:
42                self.assertIs(_lock, None)
43                self.assertTrue(lock.locked())
44                await asyncio.sleep(0.01, loop=self.loop)
45                self.assertTrue(lock.locked())
46            self.assertFalse(lock.locked())
47
48        for primitive in primitives:
49            self.loop.run_until_complete(test(primitive))
50            self.assertFalse(primitive.locked())
51
52    def test_context_manager_with_await(self):
53        primitives = [
54            asyncio.Lock(loop=self.loop),
55            asyncio.Condition(loop=self.loop),
56            asyncio.Semaphore(loop=self.loop),
57            asyncio.BoundedSemaphore(loop=self.loop),
58        ]
59
60        async def test(lock):
61            await asyncio.sleep(0.01, loop=self.loop)
62            self.assertFalse(lock.locked())
63            with await lock as _lock:
64                self.assertIs(_lock, None)
65                self.assertTrue(lock.locked())
66                await asyncio.sleep(0.01, loop=self.loop)
67                self.assertTrue(lock.locked())
68            self.assertFalse(lock.locked())
69
70        for primitive in primitives:
71            self.loop.run_until_complete(test(primitive))
72            self.assertFalse(primitive.locked())
73
74
75class StreamReaderTests(BaseTest):
76
77    def test_readline(self):
78        DATA = b'line1\nline2\nline3'
79
80        stream = asyncio.StreamReader(loop=self.loop)
81        stream.feed_data(DATA)
82        stream.feed_eof()
83
84        async def reader():
85            data = []
86            async for line in stream:
87                data.append(line)
88            return data
89
90        data = self.loop.run_until_complete(reader())
91        self.assertEqual(data, [b'line1\n', b'line2\n', b'line3'])
92
93
94class CoroutineTests(BaseTest):
95
96    def test_iscoroutine(self):
97        async def foo(): pass
98
99        f = foo()
100        try:
101            self.assertTrue(asyncio.iscoroutine(f))
102        finally:
103            f.close() # silence warning
104
105        # Test that asyncio.iscoroutine() uses collections.abc.Coroutine
106        class FakeCoro:
107            def send(self, value): pass
108            def throw(self, typ, val=None, tb=None): pass
109            def close(self): pass
110            def __await__(self): yield
111
112        self.assertTrue(asyncio.iscoroutine(FakeCoro()))
113
114    def test_iscoroutinefunction(self):
115        async def foo(): pass
116        self.assertTrue(asyncio.iscoroutinefunction(foo))
117
118    def test_function_returning_awaitable(self):
119        class Awaitable:
120            def __await__(self):
121                return ('spam',)
122
123        @asyncio.coroutine
124        def func():
125            return Awaitable()
126
127        coro = func()
128        self.assertEqual(coro.send(None), 'spam')
129        coro.close()
130
131    def test_async_def_coroutines(self):
132        async def bar():
133            return 'spam'
134        async def foo():
135            return await bar()
136
137        # production mode
138        data = self.loop.run_until_complete(foo())
139        self.assertEqual(data, 'spam')
140
141        # debug mode
142        self.loop.set_debug(True)
143        data = self.loop.run_until_complete(foo())
144        self.assertEqual(data, 'spam')
145
146    @mock.patch('asyncio.coroutines.logger')
147    def test_async_def_wrapped(self, m_log):
148        async def foo():
149            pass
150        async def start():
151            foo_coro = foo()
152            self.assertRegex(
153                repr(foo_coro),
154                r'<CoroWrapper .*\.foo\(\) running at .*pep492.*>')
155
156            with support.check_warnings((r'.*foo.*was never',
157                                         RuntimeWarning)):
158                foo_coro = None
159                support.gc_collect()
160                self.assertTrue(m_log.error.called)
161                message = m_log.error.call_args[0][0]
162                self.assertRegex(message,
163                                 r'CoroWrapper.*foo.*was never')
164
165        self.loop.set_debug(True)
166        self.loop.run_until_complete(start())
167
168        async def start():
169            foo_coro = foo()
170            task = asyncio.ensure_future(foo_coro, loop=self.loop)
171            self.assertRegex(repr(task), r'Task.*foo.*running')
172
173        self.loop.run_until_complete(start())
174
175
176    def test_types_coroutine(self):
177        def gen():
178            yield from ()
179            return 'spam'
180
181        @types.coroutine
182        def func():
183            return gen()
184
185        async def coro():
186            wrapper = func()
187            self.assertIsInstance(wrapper, types._GeneratorWrapper)
188            return await wrapper
189
190        data = self.loop.run_until_complete(coro())
191        self.assertEqual(data, 'spam')
192
193    def test_task_print_stack(self):
194        T = None
195
196        async def foo():
197            f = T.get_stack(limit=1)
198            try:
199                self.assertEqual(f[0].f_code.co_name, 'foo')
200            finally:
201                f = None
202
203        async def runner():
204            nonlocal T
205            T = asyncio.ensure_future(foo(), loop=self.loop)
206            await T
207
208        self.loop.run_until_complete(runner())
209
210    def test_double_await(self):
211        async def afunc():
212            await asyncio.sleep(0.1, loop=self.loop)
213
214        async def runner():
215            coro = afunc()
216            t = asyncio.Task(coro, loop=self.loop)
217            try:
218                await asyncio.sleep(0, loop=self.loop)
219                await coro
220            finally:
221                t.cancel()
222
223        self.loop.set_debug(True)
224        with self.assertRaisesRegex(
225            RuntimeError,
226            r'Cannot await.*test_double_await.*\bafunc\b.*while.*\bsleep\b'):
227
228            self.loop.run_until_complete(runner())
229
230
231if __name__ == '__main__':
232    unittest.main()
233