• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import asyncio
2import time
3import threading
4import unittest
5
6from test.support import socket_helper
7from test.test_asyncio import utils as test_utils
8from test.test_asyncio import functional as func_tests
9
10
11def tearDownModule():
12    asyncio.set_event_loop_policy(None)
13
14
15class BaseStartServer(func_tests.FunctionalTestCaseMixin):
16
17    def new_loop(self):
18        raise NotImplementedError
19
20    def test_start_server_1(self):
21        HELLO_MSG = b'1' * 1024 * 5 + b'\n'
22
23        def client(sock, addr):
24            for i in range(10):
25                time.sleep(0.2)
26                if srv.is_serving():
27                    break
28            else:
29                raise RuntimeError
30
31            sock.settimeout(2)
32            sock.connect(addr)
33            sock.send(HELLO_MSG)
34            sock.recv_all(1)
35            sock.close()
36
37        async def serve(reader, writer):
38            await reader.readline()
39            main_task.cancel()
40            writer.write(b'1')
41            writer.close()
42            await writer.wait_closed()
43
44        async def main(srv):
45            async with srv:
46                await srv.serve_forever()
47
48        srv = self.loop.run_until_complete(asyncio.start_server(
49            serve, socket_helper.HOSTv4, 0, start_serving=False))
50
51        self.assertFalse(srv.is_serving())
52
53        main_task = self.loop.create_task(main(srv))
54
55        addr = srv.sockets[0].getsockname()
56        with self.assertRaises(asyncio.CancelledError):
57            with self.tcp_client(lambda sock: client(sock, addr)):
58                self.loop.run_until_complete(main_task)
59
60        self.assertEqual(srv.sockets, ())
61
62        self.assertIsNone(srv._sockets)
63        self.assertIsNone(srv._waiters)
64        self.assertFalse(srv.is_serving())
65
66        with self.assertRaisesRegex(RuntimeError, r'is closed'):
67            self.loop.run_until_complete(srv.serve_forever())
68
69
70class SelectorStartServerTests(BaseStartServer, unittest.TestCase):
71
72    def new_loop(self):
73        return asyncio.SelectorEventLoop()
74
75    @socket_helper.skip_unless_bind_unix_socket
76    def test_start_unix_server_1(self):
77        HELLO_MSG = b'1' * 1024 * 5 + b'\n'
78        started = threading.Event()
79
80        def client(sock, addr):
81            sock.settimeout(2)
82            started.wait(5)
83            sock.connect(addr)
84            sock.send(HELLO_MSG)
85            sock.recv_all(1)
86            sock.close()
87
88        async def serve(reader, writer):
89            await reader.readline()
90            main_task.cancel()
91            writer.write(b'1')
92            writer.close()
93            await writer.wait_closed()
94
95        async def main(srv):
96            async with srv:
97                self.assertFalse(srv.is_serving())
98                await srv.start_serving()
99                self.assertTrue(srv.is_serving())
100                started.set()
101                await srv.serve_forever()
102
103        with test_utils.unix_socket_path() as addr:
104            srv = self.loop.run_until_complete(asyncio.start_unix_server(
105                serve, addr, start_serving=False))
106
107            main_task = self.loop.create_task(main(srv))
108
109            with self.assertRaises(asyncio.CancelledError):
110                with self.unix_client(lambda sock: client(sock, addr)):
111                    self.loop.run_until_complete(main_task)
112
113            self.assertEqual(srv.sockets, ())
114
115            self.assertIsNone(srv._sockets)
116            self.assertIsNone(srv._waiters)
117            self.assertFalse(srv.is_serving())
118
119            with self.assertRaisesRegex(RuntimeError, r'is closed'):
120                self.loop.run_until_complete(srv.serve_forever())
121
122
123@unittest.skipUnless(hasattr(asyncio, 'ProactorEventLoop'), 'Windows only')
124class ProactorStartServerTests(BaseStartServer, unittest.TestCase):
125
126    def new_loop(self):
127        return asyncio.ProactorEventLoop()
128
129
130if __name__ == '__main__':
131    unittest.main()
132