1import asyncio 2import time 3import threading 4import unittest 5 6from test.support import socket_helper 7from test.test_asyncio import utils as test_utils 8from test.test_asyncio import functional as func_tests 9 10 11def tearDownModule(): 12 asyncio.set_event_loop_policy(None) 13 14 15class BaseStartServer(func_tests.FunctionalTestCaseMixin): 16 17 def new_loop(self): 18 raise NotImplementedError 19 20 def test_start_server_1(self): 21 HELLO_MSG = b'1' * 1024 * 5 + b'\n' 22 23 def client(sock, addr): 24 for i in range(10): 25 time.sleep(0.2) 26 if srv.is_serving(): 27 break 28 else: 29 raise RuntimeError 30 31 sock.settimeout(2) 32 sock.connect(addr) 33 sock.send(HELLO_MSG) 34 sock.recv_all(1) 35 sock.close() 36 37 async def serve(reader, writer): 38 await reader.readline() 39 main_task.cancel() 40 writer.write(b'1') 41 writer.close() 42 await writer.wait_closed() 43 44 async def main(srv): 45 async with srv: 46 await srv.serve_forever() 47 48 with self.assertWarns(DeprecationWarning): 49 srv = self.loop.run_until_complete(asyncio.start_server( 50 serve, socket_helper.HOSTv4, 0, loop=self.loop, start_serving=False)) 51 52 self.assertFalse(srv.is_serving()) 53 54 main_task = self.loop.create_task(main(srv)) 55 56 addr = srv.sockets[0].getsockname() 57 with self.assertRaises(asyncio.CancelledError): 58 with self.tcp_client(lambda sock: client(sock, addr)): 59 self.loop.run_until_complete(main_task) 60 61 self.assertEqual(srv.sockets, ()) 62 63 self.assertIsNone(srv._sockets) 64 self.assertIsNone(srv._waiters) 65 self.assertFalse(srv.is_serving()) 66 67 with self.assertRaisesRegex(RuntimeError, r'is closed'): 68 self.loop.run_until_complete(srv.serve_forever()) 69 70 71class SelectorStartServerTests(BaseStartServer, unittest.TestCase): 72 73 def new_loop(self): 74 return asyncio.SelectorEventLoop() 75 76 @socket_helper.skip_unless_bind_unix_socket 77 def test_start_unix_server_1(self): 78 HELLO_MSG = b'1' * 1024 * 5 + b'\n' 79 started = threading.Event() 80 81 def client(sock, addr): 82 sock.settimeout(2) 83 started.wait(5) 84 sock.connect(addr) 85 sock.send(HELLO_MSG) 86 sock.recv_all(1) 87 sock.close() 88 89 async def serve(reader, writer): 90 await reader.readline() 91 main_task.cancel() 92 writer.write(b'1') 93 writer.close() 94 await writer.wait_closed() 95 96 async def main(srv): 97 async with srv: 98 self.assertFalse(srv.is_serving()) 99 await srv.start_serving() 100 self.assertTrue(srv.is_serving()) 101 started.set() 102 await srv.serve_forever() 103 104 with test_utils.unix_socket_path() as addr: 105 with self.assertWarns(DeprecationWarning): 106 srv = self.loop.run_until_complete(asyncio.start_unix_server( 107 serve, addr, loop=self.loop, start_serving=False)) 108 109 main_task = self.loop.create_task(main(srv)) 110 111 with self.assertRaises(asyncio.CancelledError): 112 with self.unix_client(lambda sock: client(sock, addr)): 113 self.loop.run_until_complete(main_task) 114 115 self.assertEqual(srv.sockets, ()) 116 117 self.assertIsNone(srv._sockets) 118 self.assertIsNone(srv._waiters) 119 self.assertFalse(srv.is_serving()) 120 121 with self.assertRaisesRegex(RuntimeError, r'is closed'): 122 self.loop.run_until_complete(srv.serve_forever()) 123 124 125@unittest.skipUnless(hasattr(asyncio, 'ProactorEventLoop'), 'Windows only') 126class ProactorStartServerTests(BaseStartServer, unittest.TestCase): 127 128 def new_loop(self): 129 return asyncio.ProactorEventLoop() 130 131 132if __name__ == '__main__': 133 unittest.main() 134