1import os 2import signal 3import socket 4import sys 5import time 6import threading 7import unittest 8from unittest import mock 9 10if sys.platform != 'win32': 11 raise unittest.SkipTest('Windows only') 12 13import _overlapped 14import _winapi 15 16import asyncio 17from asyncio import windows_events 18from test.test_asyncio import utils as test_utils 19 20 21def tearDownModule(): 22 asyncio.set_event_loop_policy(None) 23 24 25class UpperProto(asyncio.Protocol): 26 def __init__(self): 27 self.buf = [] 28 29 def connection_made(self, trans): 30 self.trans = trans 31 32 def data_received(self, data): 33 self.buf.append(data) 34 if b'\n' in data: 35 self.trans.write(b''.join(self.buf).upper()) 36 self.trans.close() 37 38 39class ProactorLoopCtrlC(test_utils.TestCase): 40 41 def test_ctrl_c(self): 42 43 def SIGINT_after_delay(): 44 time.sleep(0.1) 45 signal.raise_signal(signal.SIGINT) 46 47 thread = threading.Thread(target=SIGINT_after_delay) 48 loop = asyncio.get_event_loop() 49 try: 50 # only start the loop once the event loop is running 51 loop.call_soon(thread.start) 52 loop.run_forever() 53 self.fail("should not fall through 'run_forever'") 54 except KeyboardInterrupt: 55 pass 56 finally: 57 self.close_loop(loop) 58 thread.join() 59 60 61class ProactorMultithreading(test_utils.TestCase): 62 def test_run_from_nonmain_thread(self): 63 finished = False 64 65 async def coro(): 66 await asyncio.sleep(0) 67 68 def func(): 69 nonlocal finished 70 loop = asyncio.new_event_loop() 71 loop.run_until_complete(coro()) 72 # close() must not call signal.set_wakeup_fd() 73 loop.close() 74 finished = True 75 76 thread = threading.Thread(target=func) 77 thread.start() 78 thread.join() 79 self.assertTrue(finished) 80 81 82class ProactorTests(test_utils.TestCase): 83 84 def setUp(self): 85 super().setUp() 86 self.loop = asyncio.ProactorEventLoop() 87 self.set_event_loop(self.loop) 88 89 def test_close(self): 90 a, b = socket.socketpair() 91 trans = self.loop._make_socket_transport(a, asyncio.Protocol()) 92 f = asyncio.ensure_future(self.loop.sock_recv(b, 100), loop=self.loop) 93 trans.close() 94 self.loop.run_until_complete(f) 95 self.assertEqual(f.result(), b'') 96 b.close() 97 98 def test_double_bind(self): 99 ADDRESS = r'\\.\pipe\test_double_bind-%s' % os.getpid() 100 server1 = windows_events.PipeServer(ADDRESS) 101 with self.assertRaises(PermissionError): 102 windows_events.PipeServer(ADDRESS) 103 server1.close() 104 105 def test_pipe(self): 106 res = self.loop.run_until_complete(self._test_pipe()) 107 self.assertEqual(res, 'done') 108 109 async def _test_pipe(self): 110 ADDRESS = r'\\.\pipe\_test_pipe-%s' % os.getpid() 111 112 with self.assertRaises(FileNotFoundError): 113 await self.loop.create_pipe_connection( 114 asyncio.Protocol, ADDRESS) 115 116 [server] = await self.loop.start_serving_pipe( 117 UpperProto, ADDRESS) 118 self.assertIsInstance(server, windows_events.PipeServer) 119 120 clients = [] 121 for i in range(5): 122 stream_reader = asyncio.StreamReader(loop=self.loop) 123 protocol = asyncio.StreamReaderProtocol(stream_reader, 124 loop=self.loop) 125 trans, proto = await self.loop.create_pipe_connection( 126 lambda: protocol, ADDRESS) 127 self.assertIsInstance(trans, asyncio.Transport) 128 self.assertEqual(protocol, proto) 129 clients.append((stream_reader, trans)) 130 131 for i, (r, w) in enumerate(clients): 132 w.write('lower-{}\n'.format(i).encode()) 133 134 for i, (r, w) in enumerate(clients): 135 response = await r.readline() 136 self.assertEqual(response, 'LOWER-{}\n'.format(i).encode()) 137 w.close() 138 139 server.close() 140 141 with self.assertRaises(FileNotFoundError): 142 await self.loop.create_pipe_connection( 143 asyncio.Protocol, ADDRESS) 144 145 return 'done' 146 147 def test_connect_pipe_cancel(self): 148 exc = OSError() 149 exc.winerror = _overlapped.ERROR_PIPE_BUSY 150 with mock.patch.object(_overlapped, 'ConnectPipe', 151 side_effect=exc) as connect: 152 coro = self.loop._proactor.connect_pipe('pipe_address') 153 task = self.loop.create_task(coro) 154 155 # check that it's possible to cancel connect_pipe() 156 task.cancel() 157 with self.assertRaises(asyncio.CancelledError): 158 self.loop.run_until_complete(task) 159 160 def test_wait_for_handle(self): 161 event = _overlapped.CreateEvent(None, True, False, None) 162 self.addCleanup(_winapi.CloseHandle, event) 163 164 # Wait for unset event with 0.5s timeout; 165 # result should be False at timeout 166 fut = self.loop._proactor.wait_for_handle(event, 0.5) 167 start = self.loop.time() 168 done = self.loop.run_until_complete(fut) 169 elapsed = self.loop.time() - start 170 171 self.assertEqual(done, False) 172 self.assertFalse(fut.result()) 173 # bpo-31008: Tolerate only 450 ms (at least 500 ms expected), 174 # because of bad clock resolution on Windows 175 self.assertTrue(0.45 <= elapsed <= 0.9, elapsed) 176 177 _overlapped.SetEvent(event) 178 179 # Wait for set event; 180 # result should be True immediately 181 fut = self.loop._proactor.wait_for_handle(event, 10) 182 start = self.loop.time() 183 done = self.loop.run_until_complete(fut) 184 elapsed = self.loop.time() - start 185 186 self.assertEqual(done, True) 187 self.assertTrue(fut.result()) 188 self.assertTrue(0 <= elapsed < 0.3, elapsed) 189 190 # asyncio issue #195: cancelling a done _WaitHandleFuture 191 # must not crash 192 fut.cancel() 193 194 def test_wait_for_handle_cancel(self): 195 event = _overlapped.CreateEvent(None, True, False, None) 196 self.addCleanup(_winapi.CloseHandle, event) 197 198 # Wait for unset event with a cancelled future; 199 # CancelledError should be raised immediately 200 fut = self.loop._proactor.wait_for_handle(event, 10) 201 fut.cancel() 202 start = self.loop.time() 203 with self.assertRaises(asyncio.CancelledError): 204 self.loop.run_until_complete(fut) 205 elapsed = self.loop.time() - start 206 self.assertTrue(0 <= elapsed < 0.1, elapsed) 207 208 # asyncio issue #195: cancelling a _WaitHandleFuture twice 209 # must not crash 210 fut = self.loop._proactor.wait_for_handle(event) 211 fut.cancel() 212 fut.cancel() 213 214 215class WinPolicyTests(test_utils.TestCase): 216 217 def test_selector_win_policy(self): 218 async def main(): 219 self.assertIsInstance( 220 asyncio.get_running_loop(), 221 asyncio.SelectorEventLoop) 222 223 old_policy = asyncio.get_event_loop_policy() 224 try: 225 asyncio.set_event_loop_policy( 226 asyncio.WindowsSelectorEventLoopPolicy()) 227 asyncio.run(main()) 228 finally: 229 asyncio.set_event_loop_policy(old_policy) 230 231 def test_proactor_win_policy(self): 232 async def main(): 233 self.assertIsInstance( 234 asyncio.get_running_loop(), 235 asyncio.ProactorEventLoop) 236 237 old_policy = asyncio.get_event_loop_policy() 238 try: 239 asyncio.set_event_loop_policy( 240 asyncio.WindowsProactorEventLoopPolicy()) 241 asyncio.run(main()) 242 finally: 243 asyncio.set_event_loop_policy(old_policy) 244 245 246if __name__ == '__main__': 247 unittest.main() 248