1"""Test script for ftplib module.""" 2 3# Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS 4# environment 5 6import ftplib 7import asyncore 8import asynchat 9import socket 10import StringIO 11import errno 12import os 13try: 14 import ssl 15except ImportError: 16 ssl = None 17 18from unittest import TestCase, SkipTest, skipUnless 19from test import test_support 20from test.test_support import HOST, HOSTv6 21threading = test_support.import_module('threading') 22 23TIMEOUT = 3 24# the dummy data returned by server over the data channel when 25# RETR, LIST and NLST commands are issued 26RETR_DATA = 'abcde12345\r\n' * 1000 27LIST_DATA = 'foo\r\nbar\r\n' 28NLST_DATA = 'foo\r\nbar\r\n' 29 30 31class DummyDTPHandler(asynchat.async_chat): 32 dtp_conn_closed = False 33 34 def __init__(self, conn, baseclass): 35 asynchat.async_chat.__init__(self, conn) 36 self.baseclass = baseclass 37 self.baseclass.last_received_data = '' 38 39 def handle_read(self): 40 self.baseclass.last_received_data += self.recv(1024) 41 42 def handle_close(self): 43 # XXX: this method can be called many times in a row for a single 44 # connection, including in clear-text (non-TLS) mode. 45 # (behaviour witnessed with test_data_connection) 46 if not self.dtp_conn_closed: 47 self.baseclass.push('226 transfer complete') 48 self.close() 49 self.dtp_conn_closed = True 50 51 def handle_error(self): 52 raise 53 54 55class DummyFTPHandler(asynchat.async_chat): 56 57 dtp_handler = DummyDTPHandler 58 59 def __init__(self, conn): 60 asynchat.async_chat.__init__(self, conn) 61 self.set_terminator("\r\n") 62 self.in_buffer = [] 63 self.dtp = None 64 self.last_received_cmd = None 65 self.last_received_data = '' 66 self.next_response = '' 67 self.rest = None 68 self.next_retr_data = RETR_DATA 69 self.push('220 welcome') 70 71 def collect_incoming_data(self, data): 72 self.in_buffer.append(data) 73 74 def found_terminator(self): 75 line = ''.join(self.in_buffer) 76 self.in_buffer = [] 77 if self.next_response: 78 self.push(self.next_response) 79 self.next_response = '' 80 cmd = line.split(' ')[0].lower() 81 self.last_received_cmd = cmd 82 space = line.find(' ') 83 if space != -1: 84 arg = line[space + 1:] 85 else: 86 arg = "" 87 if hasattr(self, 'cmd_' + cmd): 88 method = getattr(self, 'cmd_' + cmd) 89 method(arg) 90 else: 91 self.push('550 command "%s" not understood.' %cmd) 92 93 def handle_error(self): 94 raise 95 96 def push(self, data): 97 asynchat.async_chat.push(self, data + '\r\n') 98 99 def cmd_port(self, arg): 100 addr = map(int, arg.split(',')) 101 ip = '%d.%d.%d.%d' %tuple(addr[:4]) 102 port = (addr[4] * 256) + addr[5] 103 s = socket.create_connection((ip, port), timeout=10) 104 self.dtp = self.dtp_handler(s, baseclass=self) 105 self.push('200 active data connection established') 106 107 def cmd_pasv(self, arg): 108 sock = socket.socket() 109 sock.bind((self.socket.getsockname()[0], 0)) 110 sock.listen(5) 111 sock.settimeout(10) 112 ip, port = sock.getsockname()[:2] 113 ip = ip.replace('.', ',') 114 p1, p2 = divmod(port, 256) 115 self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2)) 116 conn, addr = sock.accept() 117 self.dtp = self.dtp_handler(conn, baseclass=self) 118 119 def cmd_eprt(self, arg): 120 af, ip, port = arg.split(arg[0])[1:-1] 121 port = int(port) 122 s = socket.create_connection((ip, port), timeout=10) 123 self.dtp = self.dtp_handler(s, baseclass=self) 124 self.push('200 active data connection established') 125 126 def cmd_epsv(self, arg): 127 sock = socket.socket(socket.AF_INET6) 128 sock.bind((self.socket.getsockname()[0], 0)) 129 sock.listen(5) 130 sock.settimeout(10) 131 port = sock.getsockname()[1] 132 self.push('229 entering extended passive mode (|||%d|)' %port) 133 conn, addr = sock.accept() 134 self.dtp = self.dtp_handler(conn, baseclass=self) 135 136 def cmd_echo(self, arg): 137 # sends back the received string (used by the test suite) 138 self.push(arg) 139 140 def cmd_user(self, arg): 141 self.push('331 username ok') 142 143 def cmd_pass(self, arg): 144 self.push('230 password ok') 145 146 def cmd_acct(self, arg): 147 self.push('230 acct ok') 148 149 def cmd_rnfr(self, arg): 150 self.push('350 rnfr ok') 151 152 def cmd_rnto(self, arg): 153 self.push('250 rnto ok') 154 155 def cmd_dele(self, arg): 156 self.push('250 dele ok') 157 158 def cmd_cwd(self, arg): 159 self.push('250 cwd ok') 160 161 def cmd_size(self, arg): 162 self.push('250 1000') 163 164 def cmd_mkd(self, arg): 165 self.push('257 "%s"' %arg) 166 167 def cmd_rmd(self, arg): 168 self.push('250 rmd ok') 169 170 def cmd_pwd(self, arg): 171 self.push('257 "pwd ok"') 172 173 def cmd_type(self, arg): 174 self.push('200 type ok') 175 176 def cmd_quit(self, arg): 177 self.push('221 quit ok') 178 self.close() 179 180 def cmd_stor(self, arg): 181 self.push('125 stor ok') 182 183 def cmd_rest(self, arg): 184 self.rest = arg 185 self.push('350 rest ok') 186 187 def cmd_retr(self, arg): 188 self.push('125 retr ok') 189 if self.rest is not None: 190 offset = int(self.rest) 191 else: 192 offset = 0 193 self.dtp.push(self.next_retr_data[offset:]) 194 self.dtp.close_when_done() 195 self.rest = None 196 197 def cmd_list(self, arg): 198 self.push('125 list ok') 199 self.dtp.push(LIST_DATA) 200 self.dtp.close_when_done() 201 202 def cmd_nlst(self, arg): 203 self.push('125 nlst ok') 204 self.dtp.push(NLST_DATA) 205 self.dtp.close_when_done() 206 207 def cmd_setlongretr(self, arg): 208 # For testing. Next RETR will return long line. 209 self.next_retr_data = 'x' * int(arg) 210 self.push('125 setlongretr ok') 211 212 213class DummyFTPServer(asyncore.dispatcher, threading.Thread): 214 215 handler = DummyFTPHandler 216 217 def __init__(self, address, af=socket.AF_INET): 218 threading.Thread.__init__(self) 219 asyncore.dispatcher.__init__(self) 220 self.create_socket(af, socket.SOCK_STREAM) 221 self.bind(address) 222 self.listen(5) 223 self.active = False 224 self.active_lock = threading.Lock() 225 self.host, self.port = self.socket.getsockname()[:2] 226 self.handler_instance = None 227 228 def start(self): 229 assert not self.active 230 self.__flag = threading.Event() 231 threading.Thread.start(self) 232 self.__flag.wait() 233 234 def run(self): 235 self.active = True 236 self.__flag.set() 237 while self.active and asyncore.socket_map: 238 self.active_lock.acquire() 239 asyncore.loop(timeout=0.1, count=1) 240 self.active_lock.release() 241 asyncore.close_all(ignore_all=True) 242 243 def stop(self): 244 assert self.active 245 self.active = False 246 self.join() 247 248 def handle_accept(self): 249 conn, addr = self.accept() 250 self.handler_instance = self.handler(conn) 251 252 def handle_connect(self): 253 self.close() 254 handle_read = handle_connect 255 256 def writable(self): 257 return 0 258 259 def handle_error(self): 260 raise 261 262 263if ssl is not None: 264 265 CERTFILE = os.path.join(os.path.dirname(__file__), "keycert3.pem") 266 CAFILE = os.path.join(os.path.dirname(__file__), "pycacert.pem") 267 268 class SSLConnection(object, asyncore.dispatcher): 269 """An asyncore.dispatcher subclass supporting TLS/SSL.""" 270 271 _ssl_accepting = False 272 _ssl_closing = False 273 274 def secure_connection(self): 275 socket = ssl.wrap_socket(self.socket, suppress_ragged_eofs=False, 276 certfile=CERTFILE, server_side=True, 277 do_handshake_on_connect=False, 278 ssl_version=ssl.PROTOCOL_SSLv23) 279 self.del_channel() 280 self.set_socket(socket) 281 self._ssl_accepting = True 282 283 def _do_ssl_handshake(self): 284 try: 285 self.socket.do_handshake() 286 except ssl.SSLError as err: 287 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 288 ssl.SSL_ERROR_WANT_WRITE): 289 return 290 elif err.args[0] == ssl.SSL_ERROR_EOF: 291 return self.handle_close() 292 raise 293 except socket.error as err: 294 if err.args[0] == errno.ECONNABORTED: 295 return self.handle_close() 296 else: 297 self._ssl_accepting = False 298 299 def _do_ssl_shutdown(self): 300 self._ssl_closing = True 301 try: 302 self.socket = self.socket.unwrap() 303 except ssl.SSLError as err: 304 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 305 ssl.SSL_ERROR_WANT_WRITE): 306 return 307 except socket.error as err: 308 # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return 309 # from OpenSSL's SSL_shutdown(), corresponding to a 310 # closed socket condition. See also: 311 # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html 312 pass 313 self._ssl_closing = False 314 if getattr(self, '_ccc', False) is False: 315 super(SSLConnection, self).close() 316 else: 317 pass 318 319 def handle_read_event(self): 320 if self._ssl_accepting: 321 self._do_ssl_handshake() 322 elif self._ssl_closing: 323 self._do_ssl_shutdown() 324 else: 325 super(SSLConnection, self).handle_read_event() 326 327 def handle_write_event(self): 328 if self._ssl_accepting: 329 self._do_ssl_handshake() 330 elif self._ssl_closing: 331 self._do_ssl_shutdown() 332 else: 333 super(SSLConnection, self).handle_write_event() 334 335 def send(self, data): 336 try: 337 return super(SSLConnection, self).send(data) 338 except ssl.SSLError as err: 339 if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN, 340 ssl.SSL_ERROR_WANT_READ, 341 ssl.SSL_ERROR_WANT_WRITE): 342 return 0 343 raise 344 345 def recv(self, buffer_size): 346 try: 347 return super(SSLConnection, self).recv(buffer_size) 348 except ssl.SSLError as err: 349 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 350 ssl.SSL_ERROR_WANT_WRITE): 351 return b'' 352 if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN): 353 self.handle_close() 354 return b'' 355 raise 356 357 def handle_error(self): 358 raise 359 360 def close(self): 361 if (isinstance(self.socket, ssl.SSLSocket) and 362 self.socket._sslobj is not None): 363 self._do_ssl_shutdown() 364 else: 365 super(SSLConnection, self).close() 366 367 368 class DummyTLS_DTPHandler(SSLConnection, DummyDTPHandler): 369 """A DummyDTPHandler subclass supporting TLS/SSL.""" 370 371 def __init__(self, conn, baseclass): 372 DummyDTPHandler.__init__(self, conn, baseclass) 373 if self.baseclass.secure_data_channel: 374 self.secure_connection() 375 376 377 class DummyTLS_FTPHandler(SSLConnection, DummyFTPHandler): 378 """A DummyFTPHandler subclass supporting TLS/SSL.""" 379 380 dtp_handler = DummyTLS_DTPHandler 381 382 def __init__(self, conn): 383 DummyFTPHandler.__init__(self, conn) 384 self.secure_data_channel = False 385 386 def cmd_auth(self, line): 387 """Set up secure control channel.""" 388 self.push('234 AUTH TLS successful') 389 self.secure_connection() 390 391 def cmd_pbsz(self, line): 392 """Negotiate size of buffer for secure data transfer. 393 For TLS/SSL the only valid value for the parameter is '0'. 394 Any other value is accepted but ignored. 395 """ 396 self.push('200 PBSZ=0 successful.') 397 398 def cmd_prot(self, line): 399 """Setup un/secure data channel.""" 400 arg = line.upper() 401 if arg == 'C': 402 self.push('200 Protection set to Clear') 403 self.secure_data_channel = False 404 elif arg == 'P': 405 self.push('200 Protection set to Private') 406 self.secure_data_channel = True 407 else: 408 self.push("502 Unrecognized PROT type (use C or P).") 409 410 411 class DummyTLS_FTPServer(DummyFTPServer): 412 handler = DummyTLS_FTPHandler 413 414 415class TestFTPClass(TestCase): 416 417 def setUp(self): 418 self.server = DummyFTPServer((HOST, 0)) 419 self.server.start() 420 self.client = ftplib.FTP(timeout=10) 421 self.client.connect(self.server.host, self.server.port) 422 423 def tearDown(self): 424 self.client.close() 425 self.server.stop() 426 427 def test_getwelcome(self): 428 self.assertEqual(self.client.getwelcome(), '220 welcome') 429 430 def test_sanitize(self): 431 self.assertEqual(self.client.sanitize('foo'), repr('foo')) 432 self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****')) 433 self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****')) 434 435 def test_exceptions(self): 436 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400') 437 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499') 438 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500') 439 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599') 440 self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999') 441 442 def test_all_errors(self): 443 exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm, 444 ftplib.error_proto, ftplib.Error, IOError, EOFError) 445 for x in exceptions: 446 try: 447 raise x('exception not included in all_errors set') 448 except ftplib.all_errors: 449 pass 450 451 def test_set_pasv(self): 452 # passive mode is supposed to be enabled by default 453 self.assertTrue(self.client.passiveserver) 454 self.client.set_pasv(True) 455 self.assertTrue(self.client.passiveserver) 456 self.client.set_pasv(False) 457 self.assertFalse(self.client.passiveserver) 458 459 def test_voidcmd(self): 460 self.client.voidcmd('echo 200') 461 self.client.voidcmd('echo 299') 462 self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199') 463 self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300') 464 465 def test_login(self): 466 self.client.login() 467 468 def test_acct(self): 469 self.client.acct('passwd') 470 471 def test_rename(self): 472 self.client.rename('a', 'b') 473 self.server.handler_instance.next_response = '200' 474 self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b') 475 476 def test_delete(self): 477 self.client.delete('foo') 478 self.server.handler_instance.next_response = '199' 479 self.assertRaises(ftplib.error_reply, self.client.delete, 'foo') 480 481 def test_size(self): 482 self.client.size('foo') 483 484 def test_mkd(self): 485 dir = self.client.mkd('/foo') 486 self.assertEqual(dir, '/foo') 487 488 def test_rmd(self): 489 self.client.rmd('foo') 490 491 def test_cwd(self): 492 dir = self.client.cwd('/foo') 493 self.assertEqual(dir, '250 cwd ok') 494 495 def test_pwd(self): 496 dir = self.client.pwd() 497 self.assertEqual(dir, 'pwd ok') 498 499 def test_quit(self): 500 self.assertEqual(self.client.quit(), '221 quit ok') 501 # Ensure the connection gets closed; sock attribute should be None 502 self.assertEqual(self.client.sock, None) 503 504 def test_retrbinary(self): 505 received = [] 506 self.client.retrbinary('retr', received.append) 507 self.assertEqual(''.join(received), RETR_DATA) 508 509 def test_retrbinary_rest(self): 510 for rest in (0, 10, 20): 511 received = [] 512 self.client.retrbinary('retr', received.append, rest=rest) 513 self.assertEqual(''.join(received), RETR_DATA[rest:], 514 msg='rest test case %d %d %d' % (rest, 515 len(''.join(received)), 516 len(RETR_DATA[rest:]))) 517 518 def test_retrlines(self): 519 received = [] 520 self.client.retrlines('retr', received.append) 521 self.assertEqual(''.join(received), RETR_DATA.replace('\r\n', '')) 522 523 def test_storbinary(self): 524 f = StringIO.StringIO(RETR_DATA) 525 self.client.storbinary('stor', f) 526 self.assertEqual(self.server.handler_instance.last_received_data, RETR_DATA) 527 # test new callback arg 528 flag = [] 529 f.seek(0) 530 self.client.storbinary('stor', f, callback=lambda x: flag.append(None)) 531 self.assertTrue(flag) 532 533 def test_storbinary_rest(self): 534 f = StringIO.StringIO(RETR_DATA) 535 for r in (30, '30'): 536 f.seek(0) 537 self.client.storbinary('stor', f, rest=r) 538 self.assertEqual(self.server.handler_instance.rest, str(r)) 539 540 def test_storlines(self): 541 f = StringIO.StringIO(RETR_DATA.replace('\r\n', '\n')) 542 self.client.storlines('stor', f) 543 self.assertEqual(self.server.handler_instance.last_received_data, RETR_DATA) 544 # test new callback arg 545 flag = [] 546 f.seek(0) 547 self.client.storlines('stor foo', f, callback=lambda x: flag.append(None)) 548 self.assertTrue(flag) 549 550 def test_nlst(self): 551 self.client.nlst() 552 self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1]) 553 554 def test_dir(self): 555 l = [] 556 self.client.dir(lambda x: l.append(x)) 557 self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', '')) 558 559 def test_makeport(self): 560 self.client.makeport() 561 # IPv4 is in use, just make sure send_eprt has not been used 562 self.assertEqual(self.server.handler_instance.last_received_cmd, 'port') 563 564 def test_makepasv(self): 565 host, port = self.client.makepasv() 566 conn = socket.create_connection((host, port), 10) 567 conn.close() 568 # IPv4 is in use, just make sure send_epsv has not been used 569 self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv') 570 571 def test_line_too_long(self): 572 self.assertRaises(ftplib.Error, self.client.sendcmd, 573 'x' * self.client.maxline * 2) 574 575 def test_retrlines_too_long(self): 576 self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2)) 577 received = [] 578 self.assertRaises(ftplib.Error, 579 self.client.retrlines, 'retr', received.append) 580 581 def test_storlines_too_long(self): 582 f = StringIO.StringIO('x' * self.client.maxline * 2) 583 self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f) 584 585 586@skipUnless(socket.has_ipv6, "IPv6 not enabled") 587class TestIPv6Environment(TestCase): 588 589 @classmethod 590 def setUpClass(cls): 591 try: 592 DummyFTPServer((HOST, 0), af=socket.AF_INET6) 593 except socket.error: 594 raise SkipTest("IPv6 not enabled") 595 596 def setUp(self): 597 self.server = DummyFTPServer((HOSTv6, 0), af=socket.AF_INET6) 598 self.server.start() 599 self.client = ftplib.FTP() 600 self.client.connect(self.server.host, self.server.port) 601 602 def tearDown(self): 603 self.client.close() 604 self.server.stop() 605 606 def test_af(self): 607 self.assertEqual(self.client.af, socket.AF_INET6) 608 609 def test_makeport(self): 610 self.client.makeport() 611 self.assertEqual(self.server.handler_instance.last_received_cmd, 'eprt') 612 613 def test_makepasv(self): 614 host, port = self.client.makepasv() 615 conn = socket.create_connection((host, port), 10) 616 conn.close() 617 self.assertEqual(self.server.handler_instance.last_received_cmd, 'epsv') 618 619 def test_transfer(self): 620 def retr(): 621 received = [] 622 self.client.retrbinary('retr', received.append) 623 self.assertEqual(''.join(received), RETR_DATA) 624 self.client.set_pasv(True) 625 retr() 626 self.client.set_pasv(False) 627 retr() 628 629 630@skipUnless(ssl, "SSL not available") 631class TestTLS_FTPClassMixin(TestFTPClass): 632 """Repeat TestFTPClass tests starting the TLS layer for both control 633 and data connections first. 634 """ 635 636 def setUp(self): 637 self.server = DummyTLS_FTPServer((HOST, 0)) 638 self.server.start() 639 self.client = ftplib.FTP_TLS(timeout=10) 640 self.client.connect(self.server.host, self.server.port) 641 # enable TLS 642 self.client.auth() 643 self.client.prot_p() 644 645 646@skipUnless(ssl, "SSL not available") 647class TestTLS_FTPClass(TestCase): 648 """Specific TLS_FTP class tests.""" 649 650 def setUp(self): 651 self.server = DummyTLS_FTPServer((HOST, 0)) 652 self.server.start() 653 self.client = ftplib.FTP_TLS(timeout=TIMEOUT) 654 self.client.connect(self.server.host, self.server.port) 655 656 def tearDown(self): 657 self.client.close() 658 self.server.stop() 659 660 def test_control_connection(self): 661 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) 662 self.client.auth() 663 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 664 665 def test_data_connection(self): 666 # clear text 667 sock = self.client.transfercmd('list') 668 self.assertNotIsInstance(sock, ssl.SSLSocket) 669 sock.close() 670 self.assertEqual(self.client.voidresp(), "226 transfer complete") 671 672 # secured, after PROT P 673 self.client.prot_p() 674 sock = self.client.transfercmd('list') 675 self.assertIsInstance(sock, ssl.SSLSocket) 676 sock.close() 677 self.assertEqual(self.client.voidresp(), "226 transfer complete") 678 679 # PROT C is issued, the connection must be in cleartext again 680 self.client.prot_c() 681 sock = self.client.transfercmd('list') 682 self.assertNotIsInstance(sock, ssl.SSLSocket) 683 sock.close() 684 self.assertEqual(self.client.voidresp(), "226 transfer complete") 685 686 def test_login(self): 687 # login() is supposed to implicitly secure the control connection 688 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) 689 self.client.login() 690 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 691 # make sure that AUTH TLS doesn't get issued again 692 self.client.login() 693 694 def test_auth_issued_twice(self): 695 self.client.auth() 696 self.assertRaises(ValueError, self.client.auth) 697 698 def test_auth_ssl(self): 699 try: 700 self.client.ssl_version = ssl.PROTOCOL_SSLv23 701 self.client.auth() 702 self.assertRaises(ValueError, self.client.auth) 703 finally: 704 self.client.ssl_version = ssl.PROTOCOL_TLSv1 705 706 def test_context(self): 707 self.client.quit() 708 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 709 self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE, 710 context=ctx) 711 self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, 712 context=ctx) 713 self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, 714 keyfile=CERTFILE, context=ctx) 715 716 self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT) 717 self.client.connect(self.server.host, self.server.port) 718 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) 719 self.client.auth() 720 self.assertIs(self.client.sock.context, ctx) 721 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 722 723 self.client.prot_p() 724 sock = self.client.transfercmd('list') 725 try: 726 self.assertIs(sock.context, ctx) 727 self.assertIsInstance(sock, ssl.SSLSocket) 728 finally: 729 sock.close() 730 731 def test_check_hostname(self): 732 self.client.quit() 733 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 734 ctx.verify_mode = ssl.CERT_REQUIRED 735 ctx.check_hostname = True 736 ctx.load_verify_locations(CAFILE) 737 self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT) 738 739 # 127.0.0.1 doesn't match SAN 740 self.client.connect(self.server.host, self.server.port) 741 with self.assertRaises(ssl.CertificateError): 742 self.client.auth() 743 # exception quits connection 744 745 self.client.connect(self.server.host, self.server.port) 746 self.client.prot_p() 747 with self.assertRaises(ssl.CertificateError): 748 self.client.transfercmd("list").close() 749 self.client.quit() 750 751 self.client.connect("localhost", self.server.port) 752 self.client.auth() 753 self.client.quit() 754 755 self.client.connect("localhost", self.server.port) 756 self.client.prot_p() 757 self.client.transfercmd("list").close() 758 759 760class TestTimeouts(TestCase): 761 762 def setUp(self): 763 self.evt = threading.Event() 764 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 765 self.sock.settimeout(10) 766 self.port = test_support.bind_port(self.sock) 767 threading.Thread(target=self.server, args=(self.evt,self.sock)).start() 768 # Wait for the server to be ready. 769 self.evt.wait() 770 self.evt.clear() 771 ftplib.FTP.port = self.port 772 773 def tearDown(self): 774 self.evt.wait() 775 776 def server(self, evt, serv): 777 # This method sets the evt 3 times: 778 # 1) when the connection is ready to be accepted. 779 # 2) when it is safe for the caller to close the connection 780 # 3) when we have closed the socket 781 serv.listen(5) 782 # (1) Signal the caller that we are ready to accept the connection. 783 evt.set() 784 try: 785 conn, addr = serv.accept() 786 except socket.timeout: 787 pass 788 else: 789 conn.send("1 Hola mundo\n") 790 # (2) Signal the caller that it is safe to close the socket. 791 evt.set() 792 conn.close() 793 finally: 794 serv.close() 795 # (3) Signal the caller that we are done. 796 evt.set() 797 798 def testTimeoutDefault(self): 799 # default -- use global socket timeout 800 self.assertIsNone(socket.getdefaulttimeout()) 801 socket.setdefaulttimeout(30) 802 try: 803 ftp = ftplib.FTP(HOST) 804 finally: 805 socket.setdefaulttimeout(None) 806 self.assertEqual(ftp.sock.gettimeout(), 30) 807 self.evt.wait() 808 ftp.close() 809 810 def testTimeoutNone(self): 811 # no timeout -- do not use global socket timeout 812 self.assertIsNone(socket.getdefaulttimeout()) 813 socket.setdefaulttimeout(30) 814 try: 815 ftp = ftplib.FTP(HOST, timeout=None) 816 finally: 817 socket.setdefaulttimeout(None) 818 self.assertIsNone(ftp.sock.gettimeout()) 819 self.evt.wait() 820 ftp.close() 821 822 def testTimeoutValue(self): 823 # a value 824 ftp = ftplib.FTP(HOST, timeout=30) 825 self.assertEqual(ftp.sock.gettimeout(), 30) 826 self.evt.wait() 827 ftp.close() 828 829 def testTimeoutConnect(self): 830 ftp = ftplib.FTP() 831 ftp.connect(HOST, timeout=30) 832 self.assertEqual(ftp.sock.gettimeout(), 30) 833 self.evt.wait() 834 ftp.close() 835 836 def testTimeoutDifferentOrder(self): 837 ftp = ftplib.FTP(timeout=30) 838 ftp.connect(HOST) 839 self.assertEqual(ftp.sock.gettimeout(), 30) 840 self.evt.wait() 841 ftp.close() 842 843 def testTimeoutDirectAccess(self): 844 ftp = ftplib.FTP() 845 ftp.timeout = 30 846 ftp.connect(HOST) 847 self.assertEqual(ftp.sock.gettimeout(), 30) 848 self.evt.wait() 849 ftp.close() 850 851 852def test_main(): 853 tests = [TestFTPClass, TestTimeouts, 854 TestIPv6Environment, 855 TestTLS_FTPClassMixin, TestTLS_FTPClass] 856 857 thread_info = test_support.threading_setup() 858 try: 859 test_support.run_unittest(*tests) 860 finally: 861 test_support.threading_cleanup(*thread_info) 862 863 864if __name__ == '__main__': 865 test_main() 866