• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import asyncio
2import unittest
3
4from unittest import mock
5from . import utils as test_utils
6
7
8class TestPolicy(asyncio.AbstractEventLoopPolicy):
9
10    def __init__(self, loop_factory):
11        self.loop_factory = loop_factory
12        self.loop = None
13
14    def get_event_loop(self):
15        # shouldn't ever be called by asyncio.run()
16        raise RuntimeError
17
18    def new_event_loop(self):
19        return self.loop_factory()
20
21    def set_event_loop(self, loop):
22        if loop is not None:
23            # we want to check if the loop is closed
24            # in BaseTest.tearDown
25            self.loop = loop
26
27
28class BaseTest(unittest.TestCase):
29
30    def new_loop(self):
31        loop = asyncio.BaseEventLoop()
32        loop._process_events = mock.Mock()
33        loop._selector = mock.Mock()
34        loop._selector.select.return_value = ()
35        loop.shutdown_ag_run = False
36
37        async def shutdown_asyncgens():
38            loop.shutdown_ag_run = True
39        loop.shutdown_asyncgens = shutdown_asyncgens
40
41        return loop
42
43    def setUp(self):
44        super().setUp()
45
46        policy = TestPolicy(self.new_loop)
47        asyncio.set_event_loop_policy(policy)
48
49    def tearDown(self):
50        policy = asyncio.get_event_loop_policy()
51        if policy.loop is not None:
52            self.assertTrue(policy.loop.is_closed())
53            self.assertTrue(policy.loop.shutdown_ag_run)
54
55        asyncio.set_event_loop_policy(None)
56        super().tearDown()
57
58
59class RunTests(BaseTest):
60
61    def test_asyncio_run_return(self):
62        async def main():
63            await asyncio.sleep(0)
64            return 42
65
66        self.assertEqual(asyncio.run(main()), 42)
67
68    def test_asyncio_run_raises(self):
69        async def main():
70            await asyncio.sleep(0)
71            raise ValueError('spam')
72
73        with self.assertRaisesRegex(ValueError, 'spam'):
74            asyncio.run(main())
75
76    def test_asyncio_run_only_coro(self):
77        for o in {1, lambda: None}:
78            with self.subTest(obj=o), \
79                    self.assertRaisesRegex(ValueError,
80                                           'a coroutine was expected'):
81                asyncio.run(o)
82
83    def test_asyncio_run_debug(self):
84        async def main(expected):
85            loop = asyncio.get_event_loop()
86            self.assertIs(loop.get_debug(), expected)
87
88        asyncio.run(main(False))
89        asyncio.run(main(True), debug=True)
90        with mock.patch('asyncio.coroutines._is_debug_mode', lambda: True):
91            asyncio.run(main(True))
92            asyncio.run(main(False), debug=False)
93
94    def test_asyncio_run_from_running_loop(self):
95        async def main():
96            coro = main()
97            try:
98                asyncio.run(coro)
99            finally:
100                coro.close()  # Suppress ResourceWarning
101
102        with self.assertRaisesRegex(RuntimeError,
103                                    'cannot be called from a running'):
104            asyncio.run(main())
105
106    def test_asyncio_run_cancels_hanging_tasks(self):
107        lo_task = None
108
109        async def leftover():
110            await asyncio.sleep(0.1)
111
112        async def main():
113            nonlocal lo_task
114            lo_task = asyncio.create_task(leftover())
115            return 123
116
117        self.assertEqual(asyncio.run(main()), 123)
118        self.assertTrue(lo_task.done())
119
120    def test_asyncio_run_reports_hanging_tasks_errors(self):
121        lo_task = None
122        call_exc_handler_mock = mock.Mock()
123
124        async def leftover():
125            try:
126                await asyncio.sleep(0.1)
127            except asyncio.CancelledError:
128                1 / 0
129
130        async def main():
131            loop = asyncio.get_running_loop()
132            loop.call_exception_handler = call_exc_handler_mock
133
134            nonlocal lo_task
135            lo_task = asyncio.create_task(leftover())
136            return 123
137
138        self.assertEqual(asyncio.run(main()), 123)
139        self.assertTrue(lo_task.done())
140
141        call_exc_handler_mock.assert_called_with({
142            'message': test_utils.MockPattern(r'asyncio.run.*shutdown'),
143            'task': lo_task,
144            'exception': test_utils.MockInstanceOf(ZeroDivisionError)
145        })
146
147    def test_asyncio_run_closes_gens_after_hanging_tasks_errors(self):
148        spinner = None
149        lazyboy = None
150
151        class FancyExit(Exception):
152            pass
153
154        async def fidget():
155            while True:
156                yield 1
157                await asyncio.sleep(1)
158
159        async def spin():
160            nonlocal spinner
161            spinner = fidget()
162            try:
163                async for the_meaning_of_life in spinner:  # NoQA
164                    pass
165            except asyncio.CancelledError:
166                1 / 0
167
168        async def main():
169            loop = asyncio.get_running_loop()
170            loop.call_exception_handler = mock.Mock()
171
172            nonlocal lazyboy
173            lazyboy = asyncio.create_task(spin())
174            raise FancyExit
175
176        with self.assertRaises(FancyExit):
177            asyncio.run(main())
178
179        self.assertTrue(lazyboy.done())
180
181        self.assertIsNone(spinner.ag_frame)
182        self.assertFalse(spinner.ag_running)
183