1"""Test script for ftplib module.""" 2 3# Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS 4# environment 5 6import ftplib 7import asyncore 8import asynchat 9import socket 10import StringIO 11import errno 12import os 13try: 14 import ssl 15except ImportError: 16 ssl = None 17 18from unittest import TestCase, SkipTest, skipUnless 19from test import test_support 20from test.test_support import HOST, HOSTv6 21threading = test_support.import_module('threading') 22 23TIMEOUT = 3 24# the dummy data returned by server over the data channel when 25# RETR, LIST and NLST commands are issued 26RETR_DATA = 'abcde12345\r\n' * 1000 27LIST_DATA = 'foo\r\nbar\r\n' 28NLST_DATA = 'foo\r\nbar\r\n' 29 30 31class DummyDTPHandler(asynchat.async_chat): 32 dtp_conn_closed = False 33 34 def __init__(self, conn, baseclass): 35 asynchat.async_chat.__init__(self, conn) 36 self.baseclass = baseclass 37 self.baseclass.last_received_data = '' 38 39 def handle_read(self): 40 self.baseclass.last_received_data += self.recv(1024) 41 42 def handle_close(self): 43 # XXX: this method can be called many times in a row for a single 44 # connection, including in clear-text (non-TLS) mode. 45 # (behaviour witnessed with test_data_connection) 46 if not self.dtp_conn_closed: 47 self.baseclass.push('226 transfer complete') 48 self.close() 49 self.dtp_conn_closed = True 50 51 def handle_error(self): 52 raise 53 54 55class DummyFTPHandler(asynchat.async_chat): 56 57 dtp_handler = DummyDTPHandler 58 59 def __init__(self, conn): 60 asynchat.async_chat.__init__(self, conn) 61 self.set_terminator("\r\n") 62 self.in_buffer = [] 63 self.dtp = None 64 self.last_received_cmd = None 65 self.last_received_data = '' 66 self.next_response = '' 67 self.rest = None 68 self.next_retr_data = RETR_DATA 69 self.push('220 welcome') 70 71 def collect_incoming_data(self, data): 72 self.in_buffer.append(data) 73 74 def found_terminator(self): 75 line = ''.join(self.in_buffer) 76 self.in_buffer = [] 77 if self.next_response: 78 self.push(self.next_response) 79 self.next_response = '' 80 cmd = line.split(' ')[0].lower() 81 self.last_received_cmd = cmd 82 space = line.find(' ') 83 if space != -1: 84 arg = line[space + 1:] 85 else: 86 arg = "" 87 if hasattr(self, 'cmd_' + cmd): 88 method = getattr(self, 'cmd_' + cmd) 89 method(arg) 90 else: 91 self.push('550 command "%s" not understood.' %cmd) 92 93 def handle_error(self): 94 raise 95 96 def push(self, data): 97 asynchat.async_chat.push(self, data + '\r\n') 98 99 def cmd_port(self, arg): 100 addr = map(int, arg.split(',')) 101 ip = '%d.%d.%d.%d' %tuple(addr[:4]) 102 port = (addr[4] * 256) + addr[5] 103 s = socket.create_connection((ip, port), timeout=10) 104 self.dtp = self.dtp_handler(s, baseclass=self) 105 self.push('200 active data connection established') 106 107 def cmd_pasv(self, arg): 108 sock = socket.socket() 109 sock.bind((self.socket.getsockname()[0], 0)) 110 sock.listen(5) 111 sock.settimeout(10) 112 ip, port = sock.getsockname()[:2] 113 ip = ip.replace('.', ',') 114 p1, p2 = divmod(port, 256) 115 self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2)) 116 conn, addr = sock.accept() 117 self.dtp = self.dtp_handler(conn, baseclass=self) 118 119 def cmd_eprt(self, arg): 120 af, ip, port = arg.split(arg[0])[1:-1] 121 port = int(port) 122 s = socket.create_connection((ip, port), timeout=10) 123 self.dtp = self.dtp_handler(s, baseclass=self) 124 self.push('200 active data connection established') 125 126 def cmd_epsv(self, arg): 127 sock = socket.socket(socket.AF_INET6) 128 sock.bind((self.socket.getsockname()[0], 0)) 129 sock.listen(5) 130 sock.settimeout(10) 131 port = sock.getsockname()[1] 132 self.push('229 entering extended passive mode (|||%d|)' %port) 133 conn, addr = sock.accept() 134 self.dtp = self.dtp_handler(conn, baseclass=self) 135 136 def cmd_echo(self, arg): 137 # sends back the received string (used by the test suite) 138 self.push(arg) 139 140 def cmd_user(self, arg): 141 self.push('331 username ok') 142 143 def cmd_pass(self, arg): 144 self.push('230 password ok') 145 146 def cmd_acct(self, arg): 147 self.push('230 acct ok') 148 149 def cmd_rnfr(self, arg): 150 self.push('350 rnfr ok') 151 152 def cmd_rnto(self, arg): 153 self.push('250 rnto ok') 154 155 def cmd_dele(self, arg): 156 self.push('250 dele ok') 157 158 def cmd_cwd(self, arg): 159 self.push('250 cwd ok') 160 161 def cmd_size(self, arg): 162 self.push('250 1000') 163 164 def cmd_mkd(self, arg): 165 self.push('257 "%s"' %arg) 166 167 def cmd_rmd(self, arg): 168 self.push('250 rmd ok') 169 170 def cmd_pwd(self, arg): 171 self.push('257 "pwd ok"') 172 173 def cmd_type(self, arg): 174 self.push('200 type ok') 175 176 def cmd_quit(self, arg): 177 self.push('221 quit ok') 178 self.close() 179 180 def cmd_stor(self, arg): 181 self.push('125 stor ok') 182 183 def cmd_rest(self, arg): 184 self.rest = arg 185 self.push('350 rest ok') 186 187 def cmd_retr(self, arg): 188 self.push('125 retr ok') 189 if self.rest is not None: 190 offset = int(self.rest) 191 else: 192 offset = 0 193 self.dtp.push(self.next_retr_data[offset:]) 194 self.dtp.close_when_done() 195 self.rest = None 196 197 def cmd_list(self, arg): 198 self.push('125 list ok') 199 self.dtp.push(LIST_DATA) 200 self.dtp.close_when_done() 201 202 def cmd_nlst(self, arg): 203 self.push('125 nlst ok') 204 self.dtp.push(NLST_DATA) 205 self.dtp.close_when_done() 206 207 def cmd_setlongretr(self, arg): 208 # For testing. Next RETR will return long line. 209 self.next_retr_data = 'x' * int(arg) 210 self.push('125 setlongretr ok') 211 212 213class DummyFTPServer(asyncore.dispatcher, threading.Thread): 214 215 handler = DummyFTPHandler 216 217 def __init__(self, address, af=socket.AF_INET): 218 threading.Thread.__init__(self) 219 asyncore.dispatcher.__init__(self) 220 self.create_socket(af, socket.SOCK_STREAM) 221 try: 222 self.bind(address) 223 self.listen(5) 224 self.active = False 225 self.active_lock = threading.Lock() 226 self.host, self.port = self.socket.getsockname()[:2] 227 self.handler_instance = None 228 except: 229 # unregister the server on bind() error, 230 # needed by TestIPv6Environment.setUpClass() 231 self.del_channel() 232 raise 233 234 def start(self): 235 assert not self.active 236 self.__flag = threading.Event() 237 threading.Thread.start(self) 238 self.__flag.wait() 239 240 def run(self): 241 self.active = True 242 self.__flag.set() 243 while self.active and asyncore.socket_map: 244 self.active_lock.acquire() 245 asyncore.loop(timeout=0.1, count=1) 246 self.active_lock.release() 247 asyncore.close_all(ignore_all=True) 248 249 def stop(self): 250 assert self.active 251 self.active = False 252 self.join() 253 254 def handle_accept(self): 255 conn, addr = self.accept() 256 self.handler_instance = self.handler(conn) 257 258 def handle_connect(self): 259 self.close() 260 handle_read = handle_connect 261 262 def writable(self): 263 return 0 264 265 def handle_error(self): 266 raise 267 268 269if ssl is not None: 270 271 CERTFILE = os.path.join(os.path.dirname(__file__), "keycert3.pem") 272 CAFILE = os.path.join(os.path.dirname(__file__), "pycacert.pem") 273 274 class SSLConnection(object, asyncore.dispatcher): 275 """An asyncore.dispatcher subclass supporting TLS/SSL.""" 276 277 _ssl_accepting = False 278 _ssl_closing = False 279 280 def secure_connection(self): 281 socket = ssl.wrap_socket(self.socket, suppress_ragged_eofs=False, 282 certfile=CERTFILE, server_side=True, 283 do_handshake_on_connect=False, 284 ssl_version=ssl.PROTOCOL_SSLv23) 285 self.del_channel() 286 self.set_socket(socket) 287 self._ssl_accepting = True 288 289 def _do_ssl_handshake(self): 290 try: 291 self.socket.do_handshake() 292 except ssl.SSLError as err: 293 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 294 ssl.SSL_ERROR_WANT_WRITE): 295 return 296 elif err.args[0] == ssl.SSL_ERROR_EOF: 297 return self.handle_close() 298 raise 299 except socket.error as err: 300 if err.args[0] == errno.ECONNABORTED: 301 return self.handle_close() 302 else: 303 self._ssl_accepting = False 304 305 def _do_ssl_shutdown(self): 306 self._ssl_closing = True 307 try: 308 self.socket = self.socket.unwrap() 309 except ssl.SSLError as err: 310 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 311 ssl.SSL_ERROR_WANT_WRITE): 312 return 313 except socket.error as err: 314 # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return 315 # from OpenSSL's SSL_shutdown(), corresponding to a 316 # closed socket condition. See also: 317 # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html 318 pass 319 self._ssl_closing = False 320 if getattr(self, '_ccc', False) is False: 321 super(SSLConnection, self).close() 322 else: 323 pass 324 325 def handle_read_event(self): 326 if self._ssl_accepting: 327 self._do_ssl_handshake() 328 elif self._ssl_closing: 329 self._do_ssl_shutdown() 330 else: 331 super(SSLConnection, self).handle_read_event() 332 333 def handle_write_event(self): 334 if self._ssl_accepting: 335 self._do_ssl_handshake() 336 elif self._ssl_closing: 337 self._do_ssl_shutdown() 338 else: 339 super(SSLConnection, self).handle_write_event() 340 341 def send(self, data): 342 try: 343 return super(SSLConnection, self).send(data) 344 except ssl.SSLError as err: 345 if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN, 346 ssl.SSL_ERROR_WANT_READ, 347 ssl.SSL_ERROR_WANT_WRITE): 348 return 0 349 raise 350 351 def recv(self, buffer_size): 352 try: 353 return super(SSLConnection, self).recv(buffer_size) 354 except ssl.SSLError as err: 355 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 356 ssl.SSL_ERROR_WANT_WRITE): 357 return b'' 358 if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN): 359 self.handle_close() 360 return b'' 361 raise 362 363 def handle_error(self): 364 raise 365 366 def close(self): 367 if (isinstance(self.socket, ssl.SSLSocket) and 368 self.socket._sslobj is not None): 369 self._do_ssl_shutdown() 370 else: 371 super(SSLConnection, self).close() 372 373 374 class DummyTLS_DTPHandler(SSLConnection, DummyDTPHandler): 375 """A DummyDTPHandler subclass supporting TLS/SSL.""" 376 377 def __init__(self, conn, baseclass): 378 DummyDTPHandler.__init__(self, conn, baseclass) 379 if self.baseclass.secure_data_channel: 380 self.secure_connection() 381 382 383 class DummyTLS_FTPHandler(SSLConnection, DummyFTPHandler): 384 """A DummyFTPHandler subclass supporting TLS/SSL.""" 385 386 dtp_handler = DummyTLS_DTPHandler 387 388 def __init__(self, conn): 389 DummyFTPHandler.__init__(self, conn) 390 self.secure_data_channel = False 391 392 def cmd_auth(self, line): 393 """Set up secure control channel.""" 394 self.push('234 AUTH TLS successful') 395 self.secure_connection() 396 397 def cmd_pbsz(self, line): 398 """Negotiate size of buffer for secure data transfer. 399 For TLS/SSL the only valid value for the parameter is '0'. 400 Any other value is accepted but ignored. 401 """ 402 self.push('200 PBSZ=0 successful.') 403 404 def cmd_prot(self, line): 405 """Setup un/secure data channel.""" 406 arg = line.upper() 407 if arg == 'C': 408 self.push('200 Protection set to Clear') 409 self.secure_data_channel = False 410 elif arg == 'P': 411 self.push('200 Protection set to Private') 412 self.secure_data_channel = True 413 else: 414 self.push("502 Unrecognized PROT type (use C or P).") 415 416 417 class DummyTLS_FTPServer(DummyFTPServer): 418 handler = DummyTLS_FTPHandler 419 420 421class TestFTPClass(TestCase): 422 423 def setUp(self): 424 self.server = DummyFTPServer((HOST, 0)) 425 self.server.start() 426 self.client = ftplib.FTP(timeout=10) 427 self.client.connect(self.server.host, self.server.port) 428 429 def tearDown(self): 430 self.client.close() 431 self.server.stop() 432 433 def test_getwelcome(self): 434 self.assertEqual(self.client.getwelcome(), '220 welcome') 435 436 def test_sanitize(self): 437 self.assertEqual(self.client.sanitize('foo'), repr('foo')) 438 self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****')) 439 self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****')) 440 441 def test_exceptions(self): 442 self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r\n0') 443 self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\n0') 444 self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r0') 445 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400') 446 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499') 447 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500') 448 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599') 449 self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999') 450 451 def test_all_errors(self): 452 exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm, 453 ftplib.error_proto, ftplib.Error, IOError, EOFError) 454 for x in exceptions: 455 try: 456 raise x('exception not included in all_errors set') 457 except ftplib.all_errors: 458 pass 459 460 def test_set_pasv(self): 461 # passive mode is supposed to be enabled by default 462 self.assertTrue(self.client.passiveserver) 463 self.client.set_pasv(True) 464 self.assertTrue(self.client.passiveserver) 465 self.client.set_pasv(False) 466 self.assertFalse(self.client.passiveserver) 467 468 def test_voidcmd(self): 469 self.client.voidcmd('echo 200') 470 self.client.voidcmd('echo 299') 471 self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199') 472 self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300') 473 474 def test_login(self): 475 self.client.login() 476 477 def test_acct(self): 478 self.client.acct('passwd') 479 480 def test_rename(self): 481 self.client.rename('a', 'b') 482 self.server.handler_instance.next_response = '200' 483 self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b') 484 485 def test_delete(self): 486 self.client.delete('foo') 487 self.server.handler_instance.next_response = '199' 488 self.assertRaises(ftplib.error_reply, self.client.delete, 'foo') 489 490 def test_size(self): 491 self.client.size('foo') 492 493 def test_mkd(self): 494 dir = self.client.mkd('/foo') 495 self.assertEqual(dir, '/foo') 496 497 def test_rmd(self): 498 self.client.rmd('foo') 499 500 def test_cwd(self): 501 dir = self.client.cwd('/foo') 502 self.assertEqual(dir, '250 cwd ok') 503 504 def test_pwd(self): 505 dir = self.client.pwd() 506 self.assertEqual(dir, 'pwd ok') 507 508 def test_quit(self): 509 self.assertEqual(self.client.quit(), '221 quit ok') 510 # Ensure the connection gets closed; sock attribute should be None 511 self.assertEqual(self.client.sock, None) 512 513 def test_retrbinary(self): 514 received = [] 515 self.client.retrbinary('retr', received.append) 516 self.assertEqual(''.join(received), RETR_DATA) 517 518 def test_retrbinary_rest(self): 519 for rest in (0, 10, 20): 520 received = [] 521 self.client.retrbinary('retr', received.append, rest=rest) 522 self.assertEqual(''.join(received), RETR_DATA[rest:], 523 msg='rest test case %d %d %d' % (rest, 524 len(''.join(received)), 525 len(RETR_DATA[rest:]))) 526 527 def test_retrlines(self): 528 received = [] 529 self.client.retrlines('retr', received.append) 530 self.assertEqual(''.join(received), RETR_DATA.replace('\r\n', '')) 531 532 def test_storbinary(self): 533 f = StringIO.StringIO(RETR_DATA) 534 self.client.storbinary('stor', f) 535 self.assertEqual(self.server.handler_instance.last_received_data, RETR_DATA) 536 # test new callback arg 537 flag = [] 538 f.seek(0) 539 self.client.storbinary('stor', f, callback=lambda x: flag.append(None)) 540 self.assertTrue(flag) 541 542 def test_storbinary_rest(self): 543 f = StringIO.StringIO(RETR_DATA) 544 for r in (30, '30'): 545 f.seek(0) 546 self.client.storbinary('stor', f, rest=r) 547 self.assertEqual(self.server.handler_instance.rest, str(r)) 548 549 def test_storlines(self): 550 f = StringIO.StringIO(RETR_DATA.replace('\r\n', '\n')) 551 self.client.storlines('stor', f) 552 self.assertEqual(self.server.handler_instance.last_received_data, RETR_DATA) 553 # test new callback arg 554 flag = [] 555 f.seek(0) 556 self.client.storlines('stor foo', f, callback=lambda x: flag.append(None)) 557 self.assertTrue(flag) 558 559 def test_nlst(self): 560 self.client.nlst() 561 self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1]) 562 563 def test_dir(self): 564 l = [] 565 self.client.dir(lambda x: l.append(x)) 566 self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', '')) 567 568 def test_makeport(self): 569 self.client.makeport() 570 # IPv4 is in use, just make sure send_eprt has not been used 571 self.assertEqual(self.server.handler_instance.last_received_cmd, 'port') 572 573 def test_makepasv(self): 574 host, port = self.client.makepasv() 575 conn = socket.create_connection((host, port), 10) 576 conn.close() 577 # IPv4 is in use, just make sure send_epsv has not been used 578 self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv') 579 580 def test_line_too_long(self): 581 self.assertRaises(ftplib.Error, self.client.sendcmd, 582 'x' * self.client.maxline * 2) 583 584 def test_retrlines_too_long(self): 585 self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2)) 586 received = [] 587 self.assertRaises(ftplib.Error, 588 self.client.retrlines, 'retr', received.append) 589 590 def test_storlines_too_long(self): 591 f = StringIO.StringIO('x' * self.client.maxline * 2) 592 self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f) 593 594 595@skipUnless(socket.has_ipv6, "IPv6 not enabled") 596class TestIPv6Environment(TestCase): 597 598 @classmethod 599 def setUpClass(cls): 600 try: 601 DummyFTPServer((HOST, 0), af=socket.AF_INET6) 602 except socket.error: 603 raise SkipTest("IPv6 not enabled") 604 605 def setUp(self): 606 self.server = DummyFTPServer((HOSTv6, 0), af=socket.AF_INET6) 607 self.server.start() 608 self.client = ftplib.FTP() 609 self.client.connect(self.server.host, self.server.port) 610 611 def tearDown(self): 612 self.client.close() 613 self.server.stop() 614 615 def test_af(self): 616 self.assertEqual(self.client.af, socket.AF_INET6) 617 618 def test_makeport(self): 619 self.client.makeport() 620 self.assertEqual(self.server.handler_instance.last_received_cmd, 'eprt') 621 622 def test_makepasv(self): 623 host, port = self.client.makepasv() 624 conn = socket.create_connection((host, port), 10) 625 conn.close() 626 self.assertEqual(self.server.handler_instance.last_received_cmd, 'epsv') 627 628 def test_transfer(self): 629 def retr(): 630 received = [] 631 self.client.retrbinary('retr', received.append) 632 self.assertEqual(''.join(received), RETR_DATA) 633 self.client.set_pasv(True) 634 retr() 635 self.client.set_pasv(False) 636 retr() 637 638 639@skipUnless(ssl, "SSL not available") 640class TestTLS_FTPClassMixin(TestFTPClass): 641 """Repeat TestFTPClass tests starting the TLS layer for both control 642 and data connections first. 643 """ 644 645 def setUp(self): 646 self.server = DummyTLS_FTPServer((HOST, 0)) 647 self.server.start() 648 self.client = ftplib.FTP_TLS(timeout=10) 649 self.client.connect(self.server.host, self.server.port) 650 # enable TLS 651 self.client.auth() 652 self.client.prot_p() 653 654 655@skipUnless(ssl, "SSL not available") 656class TestTLS_FTPClass(TestCase): 657 """Specific TLS_FTP class tests.""" 658 659 def setUp(self): 660 self.server = DummyTLS_FTPServer((HOST, 0)) 661 self.server.start() 662 self.client = ftplib.FTP_TLS(timeout=TIMEOUT) 663 self.client.connect(self.server.host, self.server.port) 664 665 def tearDown(self): 666 self.client.close() 667 self.server.stop() 668 669 def test_control_connection(self): 670 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) 671 self.client.auth() 672 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 673 674 def test_data_connection(self): 675 # clear text 676 sock = self.client.transfercmd('list') 677 self.assertNotIsInstance(sock, ssl.SSLSocket) 678 self.assertEqual(sock.recv(1024), LIST_DATA.encode('ascii')) 679 sock.close() 680 self.assertEqual(self.client.voidresp(), "226 transfer complete") 681 682 # secured, after PROT P 683 self.client.prot_p() 684 sock = self.client.transfercmd('list') 685 self.assertIsInstance(sock, ssl.SSLSocket) 686 # consume from SSL socket to finalize handshake and avoid 687 # "SSLError [SSL] shutdown while in init" 688 self.assertEqual(sock.recv(1024), LIST_DATA.encode('ascii')) 689 sock.close() 690 self.assertEqual(self.client.voidresp(), "226 transfer complete") 691 692 # PROT C is issued, the connection must be in cleartext again 693 self.client.prot_c() 694 sock = self.client.transfercmd('list') 695 self.assertNotIsInstance(sock, ssl.SSLSocket) 696 self.assertEqual(sock.recv(1024), LIST_DATA.encode('ascii')) 697 sock.close() 698 self.assertEqual(self.client.voidresp(), "226 transfer complete") 699 700 def test_login(self): 701 # login() is supposed to implicitly secure the control connection 702 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) 703 self.client.login() 704 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 705 # make sure that AUTH TLS doesn't get issued again 706 self.client.login() 707 708 def test_auth_issued_twice(self): 709 self.client.auth() 710 self.assertRaises(ValueError, self.client.auth) 711 712 def test_auth_ssl(self): 713 try: 714 self.client.ssl_version = ssl.PROTOCOL_SSLv23 715 self.client.auth() 716 self.assertRaises(ValueError, self.client.auth) 717 finally: 718 self.client.ssl_version = ssl.PROTOCOL_TLS 719 720 def test_context(self): 721 self.client.quit() 722 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS) 723 self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE, 724 context=ctx) 725 self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, 726 context=ctx) 727 self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, 728 keyfile=CERTFILE, context=ctx) 729 730 self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT) 731 self.client.connect(self.server.host, self.server.port) 732 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) 733 self.client.auth() 734 self.assertIs(self.client.sock.context, ctx) 735 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 736 737 self.client.prot_p() 738 sock = self.client.transfercmd('list') 739 try: 740 self.assertIs(sock.context, ctx) 741 self.assertIsInstance(sock, ssl.SSLSocket) 742 finally: 743 sock.close() 744 745 def test_check_hostname(self): 746 self.client.quit() 747 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS) 748 ctx.verify_mode = ssl.CERT_REQUIRED 749 ctx.check_hostname = True 750 ctx.load_verify_locations(CAFILE) 751 self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT) 752 753 # 127.0.0.1 doesn't match SAN 754 self.client.connect(self.server.host, self.server.port) 755 with self.assertRaises(ssl.CertificateError): 756 self.client.auth() 757 # exception quits connection 758 759 self.client.connect(self.server.host, self.server.port) 760 self.client.prot_p() 761 with self.assertRaises(ssl.CertificateError): 762 self.client.transfercmd("list").close() 763 self.client.quit() 764 765 self.client.connect("localhost", self.server.port) 766 self.client.auth() 767 self.client.quit() 768 769 self.client.connect("localhost", self.server.port) 770 self.client.prot_p() 771 self.client.transfercmd("list").close() 772 773 774class TestTimeouts(TestCase): 775 776 def setUp(self): 777 self.evt = threading.Event() 778 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 779 self.sock.settimeout(10) 780 self.port = test_support.bind_port(self.sock) 781 threading.Thread(target=self.server, args=(self.evt,self.sock)).start() 782 # Wait for the server to be ready. 783 self.evt.wait() 784 self.evt.clear() 785 ftplib.FTP.port = self.port 786 787 def tearDown(self): 788 self.evt.wait() 789 790 def server(self, evt, serv): 791 # This method sets the evt 3 times: 792 # 1) when the connection is ready to be accepted. 793 # 2) when it is safe for the caller to close the connection 794 # 3) when we have closed the socket 795 serv.listen(5) 796 # (1) Signal the caller that we are ready to accept the connection. 797 evt.set() 798 try: 799 conn, addr = serv.accept() 800 except socket.timeout: 801 pass 802 else: 803 conn.send("1 Hola mundo\n") 804 # (2) Signal the caller that it is safe to close the socket. 805 evt.set() 806 conn.close() 807 finally: 808 serv.close() 809 # (3) Signal the caller that we are done. 810 evt.set() 811 812 def testTimeoutDefault(self): 813 # default -- use global socket timeout 814 self.assertIsNone(socket.getdefaulttimeout()) 815 socket.setdefaulttimeout(30) 816 try: 817 ftp = ftplib.FTP(HOST) 818 finally: 819 socket.setdefaulttimeout(None) 820 self.assertEqual(ftp.sock.gettimeout(), 30) 821 self.evt.wait() 822 ftp.close() 823 824 def testTimeoutNone(self): 825 # no timeout -- do not use global socket timeout 826 self.assertIsNone(socket.getdefaulttimeout()) 827 socket.setdefaulttimeout(30) 828 try: 829 ftp = ftplib.FTP(HOST, timeout=None) 830 finally: 831 socket.setdefaulttimeout(None) 832 self.assertIsNone(ftp.sock.gettimeout()) 833 self.evt.wait() 834 ftp.close() 835 836 def testTimeoutValue(self): 837 # a value 838 ftp = ftplib.FTP(HOST, timeout=30) 839 self.assertEqual(ftp.sock.gettimeout(), 30) 840 self.evt.wait() 841 ftp.close() 842 843 def testTimeoutConnect(self): 844 ftp = ftplib.FTP() 845 ftp.connect(HOST, timeout=30) 846 self.assertEqual(ftp.sock.gettimeout(), 30) 847 self.evt.wait() 848 ftp.close() 849 850 def testTimeoutDifferentOrder(self): 851 ftp = ftplib.FTP(timeout=30) 852 ftp.connect(HOST) 853 self.assertEqual(ftp.sock.gettimeout(), 30) 854 self.evt.wait() 855 ftp.close() 856 857 def testTimeoutDirectAccess(self): 858 ftp = ftplib.FTP() 859 ftp.timeout = 30 860 ftp.connect(HOST) 861 self.assertEqual(ftp.sock.gettimeout(), 30) 862 self.evt.wait() 863 ftp.close() 864 865 866def test_main(): 867 tests = [TestFTPClass, TestTimeouts, 868 TestIPv6Environment, 869 TestTLS_FTPClassMixin, TestTLS_FTPClass] 870 871 thread_info = test_support.threading_setup() 872 try: 873 test_support.run_unittest(*tests) 874 finally: 875 test_support.threading_cleanup(*thread_info) 876 877 878if __name__ == '__main__': 879 test_main() 880