1import socket 2import telnetlib 3import time 4import Queue 5 6import unittest 7from unittest import TestCase 8from test import test_support 9threading = test_support.import_module('threading') 10 11HOST = test_support.HOST 12EOF_sigil = object() 13 14def server(evt, serv, dataq=None): 15 """ Open a tcp server in three steps 16 1) set evt to true to let the parent know we are ready 17 2) [optional] if is not False, write the list of data from dataq.get() 18 to the socket. 19 """ 20 serv.listen(5) 21 evt.set() 22 try: 23 conn, addr = serv.accept() 24 if dataq: 25 data = '' 26 new_data = dataq.get(True, 0.5) 27 dataq.task_done() 28 for item in new_data: 29 if item == EOF_sigil: 30 break 31 if type(item) in [int, float]: 32 time.sleep(item) 33 else: 34 data += item 35 written = conn.send(data) 36 data = data[written:] 37 conn.close() 38 except socket.timeout: 39 pass 40 finally: 41 serv.close() 42 43class GeneralTests(TestCase): 44 45 def setUp(self): 46 self.evt = threading.Event() 47 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 48 self.sock.settimeout(60) # Safety net. Look issue 11812 49 self.port = test_support.bind_port(self.sock) 50 self.thread = threading.Thread(target=server, args=(self.evt,self.sock)) 51 self.thread.setDaemon(True) 52 self.thread.start() 53 self.evt.wait() 54 55 def tearDown(self): 56 self.thread.join() 57 58 def testBasic(self): 59 # connects 60 telnet = telnetlib.Telnet(HOST, self.port) 61 telnet.sock.close() 62 63 def testTimeoutDefault(self): 64 self.assertTrue(socket.getdefaulttimeout() is None) 65 socket.setdefaulttimeout(30) 66 try: 67 telnet = telnetlib.Telnet(HOST, self.port) 68 finally: 69 socket.setdefaulttimeout(None) 70 self.assertEqual(telnet.sock.gettimeout(), 30) 71 telnet.sock.close() 72 73 def testTimeoutNone(self): 74 # None, having other default 75 self.assertTrue(socket.getdefaulttimeout() is None) 76 socket.setdefaulttimeout(30) 77 try: 78 telnet = telnetlib.Telnet(HOST, self.port, timeout=None) 79 finally: 80 socket.setdefaulttimeout(None) 81 self.assertTrue(telnet.sock.gettimeout() is None) 82 telnet.sock.close() 83 84 def testTimeoutValue(self): 85 telnet = telnetlib.Telnet(HOST, self.port, timeout=30) 86 self.assertEqual(telnet.sock.gettimeout(), 30) 87 telnet.sock.close() 88 89 def testTimeoutOpen(self): 90 telnet = telnetlib.Telnet() 91 telnet.open(HOST, self.port, timeout=30) 92 self.assertEqual(telnet.sock.gettimeout(), 30) 93 telnet.sock.close() 94 95 def testGetters(self): 96 # Test telnet getter methods 97 telnet = telnetlib.Telnet(HOST, self.port, timeout=30) 98 t_sock = telnet.sock 99 self.assertEqual(telnet.get_socket(), t_sock) 100 self.assertEqual(telnet.fileno(), t_sock.fileno()) 101 telnet.sock.close() 102 103def _read_setUp(self): 104 self.evt = threading.Event() 105 self.dataq = Queue.Queue() 106 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 107 self.sock.settimeout(10) 108 self.port = test_support.bind_port(self.sock) 109 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq)) 110 self.thread.start() 111 self.evt.wait() 112 113def _read_tearDown(self): 114 self.thread.join() 115 116class ReadTests(TestCase): 117 setUp = _read_setUp 118 tearDown = _read_tearDown 119 120 # use a similar approach to testing timeouts as test_timeout.py 121 # these will never pass 100% but make the fuzz big enough that it is rare 122 block_long = 0.6 123 block_short = 0.3 124 def test_read_until_A(self): 125 """ 126 read_until(expected, [timeout]) 127 Read until the expected string has been seen, or a timeout is 128 hit (default is no timeout); may block. 129 """ 130 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 131 self.dataq.put(want) 132 telnet = telnetlib.Telnet(HOST, self.port) 133 self.dataq.join() 134 data = telnet.read_until('match') 135 self.assertEqual(data, ''.join(want[:-2])) 136 137 def test_read_until_B(self): 138 # test the timeout - it does NOT raise socket.timeout 139 want = ['hello', self.block_long, 'not seen', EOF_sigil] 140 self.dataq.put(want) 141 telnet = telnetlib.Telnet(HOST, self.port) 142 self.dataq.join() 143 data = telnet.read_until('not seen', self.block_short) 144 self.assertEqual(data, want[0]) 145 self.assertEqual(telnet.read_all(), 'not seen') 146 147 def test_read_until_with_poll(self): 148 """Use select.poll() to implement telnet.read_until().""" 149 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 150 self.dataq.put(want) 151 telnet = telnetlib.Telnet(HOST, self.port) 152 if not telnet._has_poll: 153 raise unittest.SkipTest('select.poll() is required') 154 telnet._has_poll = True 155 self.dataq.join() 156 data = telnet.read_until('match') 157 self.assertEqual(data, ''.join(want[:-2])) 158 159 def test_read_until_with_select(self): 160 """Use select.select() to implement telnet.read_until().""" 161 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 162 self.dataq.put(want) 163 telnet = telnetlib.Telnet(HOST, self.port) 164 telnet._has_poll = False 165 self.dataq.join() 166 data = telnet.read_until('match') 167 self.assertEqual(data, ''.join(want[:-2])) 168 169 def test_read_all_A(self): 170 """ 171 read_all() 172 Read all data until EOF; may block. 173 """ 174 want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil] 175 self.dataq.put(want) 176 telnet = telnetlib.Telnet(HOST, self.port) 177 self.dataq.join() 178 data = telnet.read_all() 179 self.assertEqual(data, ''.join(want[:-1])) 180 181 def _test_blocking(self, func): 182 self.dataq.put([self.block_long, EOF_sigil]) 183 self.dataq.join() 184 start = time.time() 185 data = func() 186 self.assertTrue(self.block_short <= time.time() - start) 187 188 def test_read_all_B(self): 189 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all) 190 191 def test_read_all_C(self): 192 self.dataq.put([EOF_sigil]) 193 telnet = telnetlib.Telnet(HOST, self.port) 194 self.dataq.join() 195 telnet.read_all() 196 telnet.read_all() # shouldn't raise 197 198 def test_read_some_A(self): 199 """ 200 read_some() 201 Read at least one byte or EOF; may block. 202 """ 203 # test 'at least one byte' 204 want = ['x' * 500, EOF_sigil] 205 self.dataq.put(want) 206 telnet = telnetlib.Telnet(HOST, self.port) 207 self.dataq.join() 208 data = telnet.read_all() 209 self.assertTrue(len(data) >= 1) 210 211 def test_read_some_B(self): 212 # test EOF 213 self.dataq.put([EOF_sigil]) 214 telnet = telnetlib.Telnet(HOST, self.port) 215 self.dataq.join() 216 self.assertEqual('', telnet.read_some()) 217 218 def test_read_some_C(self): 219 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some) 220 221 def _test_read_any_eager_A(self, func_name): 222 """ 223 read_very_eager() 224 Read all data available already queued or on the socket, 225 without blocking. 226 """ 227 want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil] 228 expects = want[1] + want[2] 229 self.dataq.put(want) 230 telnet = telnetlib.Telnet(HOST, self.port) 231 self.dataq.join() 232 func = getattr(telnet, func_name) 233 data = '' 234 while True: 235 try: 236 data += func() 237 self.assertTrue(expects.startswith(data)) 238 except EOFError: 239 break 240 self.assertEqual(expects, data) 241 242 def _test_read_any_eager_B(self, func_name): 243 # test EOF 244 self.dataq.put([EOF_sigil]) 245 telnet = telnetlib.Telnet(HOST, self.port) 246 self.dataq.join() 247 time.sleep(self.block_short) 248 func = getattr(telnet, func_name) 249 self.assertRaises(EOFError, func) 250 251 # read_eager and read_very_eager make the same guarantees 252 # (they behave differently but we only test the guarantees) 253 def test_read_very_eager_A(self): 254 self._test_read_any_eager_A('read_very_eager') 255 def test_read_very_eager_B(self): 256 self._test_read_any_eager_B('read_very_eager') 257 def test_read_eager_A(self): 258 self._test_read_any_eager_A('read_eager') 259 def test_read_eager_B(self): 260 self._test_read_any_eager_B('read_eager') 261 # NB -- we need to test the IAC block which is mentioned in the docstring 262 # but not in the module docs 263 264 def _test_read_any_lazy_B(self, func_name): 265 self.dataq.put([EOF_sigil]) 266 telnet = telnetlib.Telnet(HOST, self.port) 267 self.dataq.join() 268 func = getattr(telnet, func_name) 269 telnet.fill_rawq() 270 self.assertRaises(EOFError, func) 271 272 def test_read_lazy_A(self): 273 want = ['x' * 100, EOF_sigil] 274 self.dataq.put(want) 275 telnet = telnetlib.Telnet(HOST, self.port) 276 self.dataq.join() 277 time.sleep(self.block_short) 278 self.assertEqual('', telnet.read_lazy()) 279 data = '' 280 while True: 281 try: 282 read_data = telnet.read_lazy() 283 data += read_data 284 if not read_data: 285 telnet.fill_rawq() 286 except EOFError: 287 break 288 self.assertTrue(want[0].startswith(data)) 289 self.assertEqual(data, want[0]) 290 291 def test_read_lazy_B(self): 292 self._test_read_any_lazy_B('read_lazy') 293 294 def test_read_very_lazy_A(self): 295 want = ['x' * 100, EOF_sigil] 296 self.dataq.put(want) 297 telnet = telnetlib.Telnet(HOST, self.port) 298 self.dataq.join() 299 time.sleep(self.block_short) 300 self.assertEqual('', telnet.read_very_lazy()) 301 data = '' 302 while True: 303 try: 304 read_data = telnet.read_very_lazy() 305 except EOFError: 306 break 307 data += read_data 308 if not read_data: 309 telnet.fill_rawq() 310 self.assertEqual('', telnet.cookedq) 311 telnet.process_rawq() 312 self.assertTrue(want[0].startswith(data)) 313 self.assertEqual(data, want[0]) 314 315 def test_read_very_lazy_B(self): 316 self._test_read_any_lazy_B('read_very_lazy') 317 318class nego_collector(object): 319 def __init__(self, sb_getter=None): 320 self.seen = '' 321 self.sb_getter = sb_getter 322 self.sb_seen = '' 323 324 def do_nego(self, sock, cmd, opt): 325 self.seen += cmd + opt 326 if cmd == tl.SE and self.sb_getter: 327 sb_data = self.sb_getter() 328 self.sb_seen += sb_data 329 330tl = telnetlib 331class OptionTests(TestCase): 332 setUp = _read_setUp 333 tearDown = _read_tearDown 334 # RFC 854 commands 335 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP] 336 337 def _test_command(self, data): 338 """ helper for testing IAC + cmd """ 339 self.setUp() 340 self.dataq.put(data) 341 telnet = telnetlib.Telnet(HOST, self.port) 342 self.dataq.join() 343 nego = nego_collector() 344 telnet.set_option_negotiation_callback(nego.do_nego) 345 txt = telnet.read_all() 346 cmd = nego.seen 347 self.assertTrue(len(cmd) > 0) # we expect at least one command 348 self.assertIn(cmd[0], self.cmds) 349 self.assertEqual(cmd[1], tl.NOOPT) 350 self.assertEqual(len(''.join(data[:-1])), len(txt + cmd)) 351 nego.sb_getter = None # break the nego => telnet cycle 352 self.tearDown() 353 354 def test_IAC_commands(self): 355 # reset our setup 356 self.dataq.put([EOF_sigil]) 357 telnet = telnetlib.Telnet(HOST, self.port) 358 self.dataq.join() 359 self.tearDown() 360 361 for cmd in self.cmds: 362 self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil]) 363 self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil]) 364 self._test_command([tl.IAC + cmd, EOF_sigil]) 365 # all at once 366 self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil]) 367 self.assertEqual('', telnet.read_sb_data()) 368 369 def test_SB_commands(self): 370 # RFC 855, subnegotiations portion 371 send = [tl.IAC + tl.SB + tl.IAC + tl.SE, 372 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE, 373 tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE, 374 tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE, 375 tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE, 376 EOF_sigil, 377 ] 378 self.dataq.put(send) 379 telnet = telnetlib.Telnet(HOST, self.port) 380 self.dataq.join() 381 nego = nego_collector(telnet.read_sb_data) 382 telnet.set_option_negotiation_callback(nego.do_nego) 383 txt = telnet.read_all() 384 self.assertEqual(txt, '') 385 want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd' 386 self.assertEqual(nego.sb_seen, want_sb_data) 387 self.assertEqual('', telnet.read_sb_data()) 388 nego.sb_getter = None # break the nego => telnet cycle 389 390 391class ExpectTests(TestCase): 392 def setUp(self): 393 self.evt = threading.Event() 394 self.dataq = Queue.Queue() 395 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 396 self.sock.settimeout(10) 397 self.port = test_support.bind_port(self.sock) 398 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, 399 self.dataq)) 400 self.thread.start() 401 self.evt.wait() 402 403 def tearDown(self): 404 self.thread.join() 405 406 # use a similar approach to testing timeouts as test_timeout.py 407 # these will never pass 100% but make the fuzz big enough that it is rare 408 block_long = 0.6 409 block_short = 0.3 410 def test_expect_A(self): 411 """ 412 expect(expected, [timeout]) 413 Read until the expected string has been seen, or a timeout is 414 hit (default is no timeout); may block. 415 """ 416 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 417 self.dataq.put(want) 418 telnet = telnetlib.Telnet(HOST, self.port) 419 self.dataq.join() 420 (_,_,data) = telnet.expect(['match']) 421 self.assertEqual(data, ''.join(want[:-2])) 422 423 def test_expect_B(self): 424 # test the timeout - it does NOT raise socket.timeout 425 want = ['hello', self.block_long, 'not seen', EOF_sigil] 426 self.dataq.put(want) 427 telnet = telnetlib.Telnet(HOST, self.port) 428 self.dataq.join() 429 (_,_,data) = telnet.expect(['not seen'], self.block_short) 430 self.assertEqual(data, want[0]) 431 self.assertEqual(telnet.read_all(), 'not seen') 432 433 def test_expect_with_poll(self): 434 """Use select.poll() to implement telnet.expect().""" 435 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 436 self.dataq.put(want) 437 telnet = telnetlib.Telnet(HOST, self.port) 438 if not telnet._has_poll: 439 raise unittest.SkipTest('select.poll() is required') 440 telnet._has_poll = True 441 self.dataq.join() 442 (_,_,data) = telnet.expect(['match']) 443 self.assertEqual(data, ''.join(want[:-2])) 444 445 def test_expect_with_select(self): 446 """Use select.select() to implement telnet.expect().""" 447 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 448 self.dataq.put(want) 449 telnet = telnetlib.Telnet(HOST, self.port) 450 telnet._has_poll = False 451 self.dataq.join() 452 (_,_,data) = telnet.expect(['match']) 453 self.assertEqual(data, ''.join(want[:-2])) 454 455 456def test_main(verbose=None): 457 test_support.run_unittest(GeneralTests, ReadTests, OptionTests, 458 ExpectTests) 459 460if __name__ == '__main__': 461 test_main() 462