1# test asynchat 2 3from test import support 4from test.support import socket_helper 5from test.support import threading_helper 6 7import errno 8import socket 9import sys 10import threading 11import time 12import unittest 13import unittest.mock 14 15import warnings 16with warnings.catch_warnings(): 17 warnings.simplefilter('ignore', DeprecationWarning) 18 import asynchat 19 import asyncore 20 21HOST = socket_helper.HOST 22SERVER_QUIT = b'QUIT\n' 23 24 25class echo_server(threading.Thread): 26 # parameter to determine the number of bytes passed back to the 27 # client each send 28 chunk_size = 1 29 30 def __init__(self, event): 31 threading.Thread.__init__(self) 32 self.event = event 33 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 34 self.port = socket_helper.bind_port(self.sock) 35 # This will be set if the client wants us to wait before echoing 36 # data back. 37 self.start_resend_event = None 38 39 def run(self): 40 self.sock.listen() 41 self.event.set() 42 conn, client = self.sock.accept() 43 self.buffer = b"" 44 # collect data until quit message is seen 45 while SERVER_QUIT not in self.buffer: 46 data = conn.recv(1) 47 if not data: 48 break 49 self.buffer = self.buffer + data 50 51 # remove the SERVER_QUIT message 52 self.buffer = self.buffer.replace(SERVER_QUIT, b'') 53 54 if self.start_resend_event: 55 self.start_resend_event.wait() 56 57 # re-send entire set of collected data 58 try: 59 # this may fail on some tests, such as test_close_when_done, 60 # since the client closes the channel when it's done sending 61 while self.buffer: 62 n = conn.send(self.buffer[:self.chunk_size]) 63 time.sleep(0.001) 64 self.buffer = self.buffer[n:] 65 except: 66 pass 67 68 conn.close() 69 self.sock.close() 70 71class echo_client(asynchat.async_chat): 72 73 def __init__(self, terminator, server_port): 74 asynchat.async_chat.__init__(self) 75 self.contents = [] 76 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 77 self.connect((HOST, server_port)) 78 self.set_terminator(terminator) 79 self.buffer = b"" 80 81 def handle_connect(self): 82 pass 83 84 if sys.platform == 'darwin': 85 # select.poll returns a select.POLLHUP at the end of the tests 86 # on darwin, so just ignore it 87 def handle_expt(self): 88 pass 89 90 def collect_incoming_data(self, data): 91 self.buffer += data 92 93 def found_terminator(self): 94 self.contents.append(self.buffer) 95 self.buffer = b"" 96 97def start_echo_server(): 98 event = threading.Event() 99 s = echo_server(event) 100 s.start() 101 event.wait() 102 event.clear() 103 time.sleep(0.01) # Give server time to start accepting. 104 return s, event 105 106 107class TestAsynchat(unittest.TestCase): 108 usepoll = False 109 110 def setUp(self): 111 self._threads = threading_helper.threading_setup() 112 113 def tearDown(self): 114 threading_helper.threading_cleanup(*self._threads) 115 116 def line_terminator_check(self, term, server_chunk): 117 event = threading.Event() 118 s = echo_server(event) 119 s.chunk_size = server_chunk 120 s.start() 121 event.wait() 122 event.clear() 123 time.sleep(0.01) # Give server time to start accepting. 124 c = echo_client(term, s.port) 125 c.push(b"hello ") 126 c.push(b"world" + term) 127 c.push(b"I'm not dead yet!" + term) 128 c.push(SERVER_QUIT) 129 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 130 threading_helper.join_thread(s) 131 132 self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) 133 134 # the line terminator tests below check receiving variously-sized 135 # chunks back from the server in order to exercise all branches of 136 # async_chat.handle_read 137 138 def test_line_terminator1(self): 139 # test one-character terminator 140 for l in (1, 2, 3): 141 self.line_terminator_check(b'\n', l) 142 143 def test_line_terminator2(self): 144 # test two-character terminator 145 for l in (1, 2, 3): 146 self.line_terminator_check(b'\r\n', l) 147 148 def test_line_terminator3(self): 149 # test three-character terminator 150 for l in (1, 2, 3): 151 self.line_terminator_check(b'qqq', l) 152 153 def numeric_terminator_check(self, termlen): 154 # Try reading a fixed number of bytes 155 s, event = start_echo_server() 156 c = echo_client(termlen, s.port) 157 data = b"hello world, I'm not dead yet!\n" 158 c.push(data) 159 c.push(SERVER_QUIT) 160 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 161 threading_helper.join_thread(s) 162 163 self.assertEqual(c.contents, [data[:termlen]]) 164 165 def test_numeric_terminator1(self): 166 # check that ints & longs both work (since type is 167 # explicitly checked in async_chat.handle_read) 168 self.numeric_terminator_check(1) 169 170 def test_numeric_terminator2(self): 171 self.numeric_terminator_check(6) 172 173 def test_none_terminator(self): 174 # Try reading a fixed number of bytes 175 s, event = start_echo_server() 176 c = echo_client(None, s.port) 177 data = b"hello world, I'm not dead yet!\n" 178 c.push(data) 179 c.push(SERVER_QUIT) 180 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 181 threading_helper.join_thread(s) 182 183 self.assertEqual(c.contents, []) 184 self.assertEqual(c.buffer, data) 185 186 def test_simple_producer(self): 187 s, event = start_echo_server() 188 c = echo_client(b'\n', s.port) 189 data = b"hello world\nI'm not dead yet!\n" 190 p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8) 191 c.push_with_producer(p) 192 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 193 threading_helper.join_thread(s) 194 195 self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) 196 197 def test_string_producer(self): 198 s, event = start_echo_server() 199 c = echo_client(b'\n', s.port) 200 data = b"hello world\nI'm not dead yet!\n" 201 c.push_with_producer(data+SERVER_QUIT) 202 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 203 threading_helper.join_thread(s) 204 205 self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) 206 207 def test_empty_line(self): 208 # checks that empty lines are handled correctly 209 s, event = start_echo_server() 210 c = echo_client(b'\n', s.port) 211 c.push(b"hello world\n\nI'm not dead yet!\n") 212 c.push(SERVER_QUIT) 213 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 214 threading_helper.join_thread(s) 215 216 self.assertEqual(c.contents, 217 [b"hello world", b"", b"I'm not dead yet!"]) 218 219 def test_close_when_done(self): 220 s, event = start_echo_server() 221 s.start_resend_event = threading.Event() 222 c = echo_client(b'\n', s.port) 223 c.push(b"hello world\nI'm not dead yet!\n") 224 c.push(SERVER_QUIT) 225 c.close_when_done() 226 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 227 228 # Only allow the server to start echoing data back to the client after 229 # the client has closed its connection. This prevents a race condition 230 # where the server echoes all of its data before we can check that it 231 # got any down below. 232 s.start_resend_event.set() 233 threading_helper.join_thread(s) 234 235 self.assertEqual(c.contents, []) 236 # the server might have been able to send a byte or two back, but this 237 # at least checks that it received something and didn't just fail 238 # (which could still result in the client not having received anything) 239 self.assertGreater(len(s.buffer), 0) 240 241 def test_push(self): 242 # Issue #12523: push() should raise a TypeError if it doesn't get 243 # a bytes string 244 s, event = start_echo_server() 245 c = echo_client(b'\n', s.port) 246 data = b'bytes\n' 247 c.push(data) 248 c.push(bytearray(data)) 249 c.push(memoryview(data)) 250 self.assertRaises(TypeError, c.push, 10) 251 self.assertRaises(TypeError, c.push, 'unicode') 252 c.push(SERVER_QUIT) 253 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 254 threading_helper.join_thread(s) 255 self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes']) 256 257 258class TestAsynchat_WithPoll(TestAsynchat): 259 usepoll = True 260 261 262class TestAsynchatMocked(unittest.TestCase): 263 def test_blockingioerror(self): 264 # Issue #16133: handle_read() must ignore BlockingIOError 265 sock = unittest.mock.Mock() 266 sock.recv.side_effect = BlockingIOError(errno.EAGAIN) 267 268 dispatcher = asynchat.async_chat() 269 dispatcher.set_socket(sock) 270 self.addCleanup(dispatcher.del_channel) 271 272 with unittest.mock.patch.object(dispatcher, 'handle_error') as error: 273 dispatcher.handle_read() 274 self.assertFalse(error.called) 275 276 277class TestHelperFunctions(unittest.TestCase): 278 def test_find_prefix_at_end(self): 279 self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1) 280 self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0) 281 282 283class TestNotConnected(unittest.TestCase): 284 def test_disallow_negative_terminator(self): 285 # Issue #11259 286 client = asynchat.async_chat() 287 self.assertRaises(ValueError, client.set_terminator, -1) 288 289 290 291if __name__ == "__main__": 292 unittest.main() 293