1"""Test script for ftplib module.""" 2 3# Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS 4# environment 5 6import ftplib 7import asyncore 8import asynchat 9import socket 10import io 11import errno 12import os 13import threading 14import time 15try: 16 import ssl 17except ImportError: 18 ssl = None 19 20from unittest import TestCase, skipUnless 21from test import support 22from test.support import socket_helper 23from test.support.socket_helper import HOST, HOSTv6 24 25TIMEOUT = support.LOOPBACK_TIMEOUT 26DEFAULT_ENCODING = 'utf-8' 27# the dummy data returned by server over the data channel when 28# RETR, LIST, NLST, MLSD commands are issued 29RETR_DATA = 'abcde12345\r\n' * 1000 + 'non-ascii char \xAE\r\n' 30LIST_DATA = 'foo\r\nbar\r\n non-ascii char \xAE\r\n' 31NLST_DATA = 'foo\r\nbar\r\n non-ascii char \xAE\r\n' 32MLSD_DATA = ("type=cdir;perm=el;unique==keVO1+ZF4; test\r\n" 33 "type=pdir;perm=e;unique==keVO1+d?3; ..\r\n" 34 "type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar\r\n" 35 "type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device\r\n" 36 "type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block\r\n" 37 "type=file;perm=awr;unique==keVO1+8G4; writable\r\n" 38 "type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous\r\n" 39 "type=dir;perm=;unique==keVO1+1t2; no-exec\r\n" 40 "type=file;perm=r;unique==keVO1+EG4; two words\r\n" 41 "type=file;perm=r;unique==keVO1+IH4; leading space\r\n" 42 "type=file;perm=r;unique==keVO1+1G4; file1\r\n" 43 "type=dir;perm=cpmel;unique==keVO1+7G4; incoming\r\n" 44 "type=file;perm=r;unique==keVO1+1G4; file2\r\n" 45 "type=file;perm=r;unique==keVO1+1G4; file3\r\n" 46 "type=file;perm=r;unique==keVO1+1G4; file4\r\n" 47 "type=dir;perm=cpmel;unique==SGP1; dir \xAE non-ascii char\r\n" 48 "type=file;perm=r;unique==SGP2; file \xAE non-ascii char\r\n") 49 50 51class DummyDTPHandler(asynchat.async_chat): 52 dtp_conn_closed = False 53 54 def __init__(self, conn, baseclass): 55 asynchat.async_chat.__init__(self, conn) 56 self.baseclass = baseclass 57 self.baseclass.last_received_data = '' 58 self.encoding = baseclass.encoding 59 60 def handle_read(self): 61 new_data = self.recv(1024).decode(self.encoding, 'replace') 62 self.baseclass.last_received_data += new_data 63 64 def handle_close(self): 65 # XXX: this method can be called many times in a row for a single 66 # connection, including in clear-text (non-TLS) mode. 67 # (behaviour witnessed with test_data_connection) 68 if not self.dtp_conn_closed: 69 self.baseclass.push('226 transfer complete') 70 self.close() 71 self.dtp_conn_closed = True 72 73 def push(self, what): 74 if self.baseclass.next_data is not None: 75 what = self.baseclass.next_data 76 self.baseclass.next_data = None 77 if not what: 78 return self.close_when_done() 79 super(DummyDTPHandler, self).push(what.encode(self.encoding)) 80 81 def handle_error(self): 82 raise Exception 83 84 85class DummyFTPHandler(asynchat.async_chat): 86 87 dtp_handler = DummyDTPHandler 88 89 def __init__(self, conn, encoding=DEFAULT_ENCODING): 90 asynchat.async_chat.__init__(self, conn) 91 # tells the socket to handle urgent data inline (ABOR command) 92 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_OOBINLINE, 1) 93 self.set_terminator(b"\r\n") 94 self.in_buffer = [] 95 self.dtp = None 96 self.last_received_cmd = None 97 self.last_received_data = '' 98 self.next_response = '' 99 self.next_data = None 100 self.rest = None 101 self.next_retr_data = RETR_DATA 102 self.push('220 welcome') 103 self.encoding = encoding 104 # We use this as the string IPv4 address to direct the client 105 # to in response to a PASV command. To test security behavior. 106 # https://bugs.python.org/issue43285/. 107 self.fake_pasv_server_ip = '252.253.254.255' 108 109 def collect_incoming_data(self, data): 110 self.in_buffer.append(data) 111 112 def found_terminator(self): 113 line = b''.join(self.in_buffer).decode(self.encoding) 114 self.in_buffer = [] 115 if self.next_response: 116 self.push(self.next_response) 117 self.next_response = '' 118 cmd = line.split(' ')[0].lower() 119 self.last_received_cmd = cmd 120 space = line.find(' ') 121 if space != -1: 122 arg = line[space + 1:] 123 else: 124 arg = "" 125 if hasattr(self, 'cmd_' + cmd): 126 method = getattr(self, 'cmd_' + cmd) 127 method(arg) 128 else: 129 self.push('550 command "%s" not understood.' %cmd) 130 131 def handle_error(self): 132 raise Exception 133 134 def push(self, data): 135 asynchat.async_chat.push(self, data.encode(self.encoding) + b'\r\n') 136 137 def cmd_port(self, arg): 138 addr = list(map(int, arg.split(','))) 139 ip = '%d.%d.%d.%d' %tuple(addr[:4]) 140 port = (addr[4] * 256) + addr[5] 141 s = socket.create_connection((ip, port), timeout=TIMEOUT) 142 self.dtp = self.dtp_handler(s, baseclass=self) 143 self.push('200 active data connection established') 144 145 def cmd_pasv(self, arg): 146 with socket.create_server((self.socket.getsockname()[0], 0)) as sock: 147 sock.settimeout(TIMEOUT) 148 port = sock.getsockname()[1] 149 ip = self.fake_pasv_server_ip 150 ip = ip.replace('.', ','); p1 = port / 256; p2 = port % 256 151 self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2)) 152 conn, addr = sock.accept() 153 self.dtp = self.dtp_handler(conn, baseclass=self) 154 155 def cmd_eprt(self, arg): 156 af, ip, port = arg.split(arg[0])[1:-1] 157 port = int(port) 158 s = socket.create_connection((ip, port), timeout=TIMEOUT) 159 self.dtp = self.dtp_handler(s, baseclass=self) 160 self.push('200 active data connection established') 161 162 def cmd_epsv(self, arg): 163 with socket.create_server((self.socket.getsockname()[0], 0), 164 family=socket.AF_INET6) as sock: 165 sock.settimeout(TIMEOUT) 166 port = sock.getsockname()[1] 167 self.push('229 entering extended passive mode (|||%d|)' %port) 168 conn, addr = sock.accept() 169 self.dtp = self.dtp_handler(conn, baseclass=self) 170 171 def cmd_echo(self, arg): 172 # sends back the received string (used by the test suite) 173 self.push(arg) 174 175 def cmd_noop(self, arg): 176 self.push('200 noop ok') 177 178 def cmd_user(self, arg): 179 self.push('331 username ok') 180 181 def cmd_pass(self, arg): 182 self.push('230 password ok') 183 184 def cmd_acct(self, arg): 185 self.push('230 acct ok') 186 187 def cmd_rnfr(self, arg): 188 self.push('350 rnfr ok') 189 190 def cmd_rnto(self, arg): 191 self.push('250 rnto ok') 192 193 def cmd_dele(self, arg): 194 self.push('250 dele ok') 195 196 def cmd_cwd(self, arg): 197 self.push('250 cwd ok') 198 199 def cmd_size(self, arg): 200 self.push('250 1000') 201 202 def cmd_mkd(self, arg): 203 self.push('257 "%s"' %arg) 204 205 def cmd_rmd(self, arg): 206 self.push('250 rmd ok') 207 208 def cmd_pwd(self, arg): 209 self.push('257 "pwd ok"') 210 211 def cmd_type(self, arg): 212 self.push('200 type ok') 213 214 def cmd_quit(self, arg): 215 self.push('221 quit ok') 216 self.close() 217 218 def cmd_abor(self, arg): 219 self.push('226 abor ok') 220 221 def cmd_stor(self, arg): 222 self.push('125 stor ok') 223 224 def cmd_rest(self, arg): 225 self.rest = arg 226 self.push('350 rest ok') 227 228 def cmd_retr(self, arg): 229 self.push('125 retr ok') 230 if self.rest is not None: 231 offset = int(self.rest) 232 else: 233 offset = 0 234 self.dtp.push(self.next_retr_data[offset:]) 235 self.dtp.close_when_done() 236 self.rest = None 237 238 def cmd_list(self, arg): 239 self.push('125 list ok') 240 self.dtp.push(LIST_DATA) 241 self.dtp.close_when_done() 242 243 def cmd_nlst(self, arg): 244 self.push('125 nlst ok') 245 self.dtp.push(NLST_DATA) 246 self.dtp.close_when_done() 247 248 def cmd_opts(self, arg): 249 self.push('200 opts ok') 250 251 def cmd_mlsd(self, arg): 252 self.push('125 mlsd ok') 253 self.dtp.push(MLSD_DATA) 254 self.dtp.close_when_done() 255 256 def cmd_setlongretr(self, arg): 257 # For testing. Next RETR will return long line. 258 self.next_retr_data = 'x' * int(arg) 259 self.push('125 setlongretr ok') 260 261 262class DummyFTPServer(asyncore.dispatcher, threading.Thread): 263 264 handler = DummyFTPHandler 265 266 def __init__(self, address, af=socket.AF_INET, encoding=DEFAULT_ENCODING): 267 threading.Thread.__init__(self) 268 asyncore.dispatcher.__init__(self) 269 self.daemon = True 270 self.create_socket(af, socket.SOCK_STREAM) 271 self.bind(address) 272 self.listen(5) 273 self.active = False 274 self.active_lock = threading.Lock() 275 self.host, self.port = self.socket.getsockname()[:2] 276 self.handler_instance = None 277 self.encoding = encoding 278 279 def start(self): 280 assert not self.active 281 self.__flag = threading.Event() 282 threading.Thread.start(self) 283 self.__flag.wait() 284 285 def run(self): 286 self.active = True 287 self.__flag.set() 288 while self.active and asyncore.socket_map: 289 self.active_lock.acquire() 290 asyncore.loop(timeout=0.1, count=1) 291 self.active_lock.release() 292 asyncore.close_all(ignore_all=True) 293 294 def stop(self): 295 assert self.active 296 self.active = False 297 self.join() 298 299 def handle_accepted(self, conn, addr): 300 self.handler_instance = self.handler(conn, encoding=self.encoding) 301 302 def handle_connect(self): 303 self.close() 304 handle_read = handle_connect 305 306 def writable(self): 307 return 0 308 309 def handle_error(self): 310 raise Exception 311 312 313if ssl is not None: 314 315 CERTFILE = os.path.join(os.path.dirname(__file__), "keycert3.pem") 316 CAFILE = os.path.join(os.path.dirname(__file__), "pycacert.pem") 317 318 class SSLConnection(asyncore.dispatcher): 319 """An asyncore.dispatcher subclass supporting TLS/SSL.""" 320 321 _ssl_accepting = False 322 _ssl_closing = False 323 324 def secure_connection(self): 325 context = ssl.SSLContext() 326 context.load_cert_chain(CERTFILE) 327 socket = context.wrap_socket(self.socket, 328 suppress_ragged_eofs=False, 329 server_side=True, 330 do_handshake_on_connect=False) 331 self.del_channel() 332 self.set_socket(socket) 333 self._ssl_accepting = True 334 335 def _do_ssl_handshake(self): 336 try: 337 self.socket.do_handshake() 338 except ssl.SSLError as err: 339 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 340 ssl.SSL_ERROR_WANT_WRITE): 341 return 342 elif err.args[0] == ssl.SSL_ERROR_EOF: 343 return self.handle_close() 344 # TODO: SSLError does not expose alert information 345 elif "SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1]: 346 return self.handle_close() 347 raise 348 except OSError as err: 349 if err.args[0] == errno.ECONNABORTED: 350 return self.handle_close() 351 else: 352 self._ssl_accepting = False 353 354 def _do_ssl_shutdown(self): 355 self._ssl_closing = True 356 try: 357 self.socket = self.socket.unwrap() 358 except ssl.SSLError as err: 359 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 360 ssl.SSL_ERROR_WANT_WRITE): 361 return 362 except OSError: 363 # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return 364 # from OpenSSL's SSL_shutdown(), corresponding to a 365 # closed socket condition. See also: 366 # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html 367 pass 368 self._ssl_closing = False 369 if getattr(self, '_ccc', False) is False: 370 super(SSLConnection, self).close() 371 else: 372 pass 373 374 def handle_read_event(self): 375 if self._ssl_accepting: 376 self._do_ssl_handshake() 377 elif self._ssl_closing: 378 self._do_ssl_shutdown() 379 else: 380 super(SSLConnection, self).handle_read_event() 381 382 def handle_write_event(self): 383 if self._ssl_accepting: 384 self._do_ssl_handshake() 385 elif self._ssl_closing: 386 self._do_ssl_shutdown() 387 else: 388 super(SSLConnection, self).handle_write_event() 389 390 def send(self, data): 391 try: 392 return super(SSLConnection, self).send(data) 393 except ssl.SSLError as err: 394 if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN, 395 ssl.SSL_ERROR_WANT_READ, 396 ssl.SSL_ERROR_WANT_WRITE): 397 return 0 398 raise 399 400 def recv(self, buffer_size): 401 try: 402 return super(SSLConnection, self).recv(buffer_size) 403 except ssl.SSLError as err: 404 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 405 ssl.SSL_ERROR_WANT_WRITE): 406 return b'' 407 if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN): 408 self.handle_close() 409 return b'' 410 raise 411 412 def handle_error(self): 413 raise Exception 414 415 def close(self): 416 if (isinstance(self.socket, ssl.SSLSocket) and 417 self.socket._sslobj is not None): 418 self._do_ssl_shutdown() 419 else: 420 super(SSLConnection, self).close() 421 422 423 class DummyTLS_DTPHandler(SSLConnection, DummyDTPHandler): 424 """A DummyDTPHandler subclass supporting TLS/SSL.""" 425 426 def __init__(self, conn, baseclass): 427 DummyDTPHandler.__init__(self, conn, baseclass) 428 if self.baseclass.secure_data_channel: 429 self.secure_connection() 430 431 432 class DummyTLS_FTPHandler(SSLConnection, DummyFTPHandler): 433 """A DummyFTPHandler subclass supporting TLS/SSL.""" 434 435 dtp_handler = DummyTLS_DTPHandler 436 437 def __init__(self, conn, encoding=DEFAULT_ENCODING): 438 DummyFTPHandler.__init__(self, conn, encoding=encoding) 439 self.secure_data_channel = False 440 self._ccc = False 441 442 def cmd_auth(self, line): 443 """Set up secure control channel.""" 444 self.push('234 AUTH TLS successful') 445 self.secure_connection() 446 447 def cmd_ccc(self, line): 448 self.push('220 Reverting back to clear-text') 449 self._ccc = True 450 self._do_ssl_shutdown() 451 452 def cmd_pbsz(self, line): 453 """Negotiate size of buffer for secure data transfer. 454 For TLS/SSL the only valid value for the parameter is '0'. 455 Any other value is accepted but ignored. 456 """ 457 self.push('200 PBSZ=0 successful.') 458 459 def cmd_prot(self, line): 460 """Setup un/secure data channel.""" 461 arg = line.upper() 462 if arg == 'C': 463 self.push('200 Protection set to Clear') 464 self.secure_data_channel = False 465 elif arg == 'P': 466 self.push('200 Protection set to Private') 467 self.secure_data_channel = True 468 else: 469 self.push("502 Unrecognized PROT type (use C or P).") 470 471 472 class DummyTLS_FTPServer(DummyFTPServer): 473 handler = DummyTLS_FTPHandler 474 475 476class TestFTPClass(TestCase): 477 478 def setUp(self, encoding=DEFAULT_ENCODING): 479 self.server = DummyFTPServer((HOST, 0), encoding=encoding) 480 self.server.start() 481 self.client = ftplib.FTP(timeout=TIMEOUT, encoding=encoding) 482 self.client.connect(self.server.host, self.server.port) 483 484 def tearDown(self): 485 self.client.close() 486 self.server.stop() 487 # Explicitly clear the attribute to prevent dangling thread 488 self.server = None 489 asyncore.close_all(ignore_all=True) 490 491 def check_data(self, received, expected): 492 self.assertEqual(len(received), len(expected)) 493 self.assertEqual(received, expected) 494 495 def test_getwelcome(self): 496 self.assertEqual(self.client.getwelcome(), '220 welcome') 497 498 def test_sanitize(self): 499 self.assertEqual(self.client.sanitize('foo'), repr('foo')) 500 self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****')) 501 self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****')) 502 503 def test_exceptions(self): 504 self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r\n0') 505 self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\n0') 506 self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r0') 507 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400') 508 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499') 509 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500') 510 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599') 511 self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999') 512 513 def test_all_errors(self): 514 exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm, 515 ftplib.error_proto, ftplib.Error, OSError, 516 EOFError) 517 for x in exceptions: 518 try: 519 raise x('exception not included in all_errors set') 520 except ftplib.all_errors: 521 pass 522 523 def test_set_pasv(self): 524 # passive mode is supposed to be enabled by default 525 self.assertTrue(self.client.passiveserver) 526 self.client.set_pasv(True) 527 self.assertTrue(self.client.passiveserver) 528 self.client.set_pasv(False) 529 self.assertFalse(self.client.passiveserver) 530 531 def test_voidcmd(self): 532 self.client.voidcmd('echo 200') 533 self.client.voidcmd('echo 299') 534 self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199') 535 self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300') 536 537 def test_login(self): 538 self.client.login() 539 540 def test_acct(self): 541 self.client.acct('passwd') 542 543 def test_rename(self): 544 self.client.rename('a', 'b') 545 self.server.handler_instance.next_response = '200' 546 self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b') 547 548 def test_delete(self): 549 self.client.delete('foo') 550 self.server.handler_instance.next_response = '199' 551 self.assertRaises(ftplib.error_reply, self.client.delete, 'foo') 552 553 def test_size(self): 554 self.client.size('foo') 555 556 def test_mkd(self): 557 dir = self.client.mkd('/foo') 558 self.assertEqual(dir, '/foo') 559 560 def test_rmd(self): 561 self.client.rmd('foo') 562 563 def test_cwd(self): 564 dir = self.client.cwd('/foo') 565 self.assertEqual(dir, '250 cwd ok') 566 567 def test_pwd(self): 568 dir = self.client.pwd() 569 self.assertEqual(dir, 'pwd ok') 570 571 def test_quit(self): 572 self.assertEqual(self.client.quit(), '221 quit ok') 573 # Ensure the connection gets closed; sock attribute should be None 574 self.assertEqual(self.client.sock, None) 575 576 def test_abort(self): 577 self.client.abort() 578 579 def test_retrbinary(self): 580 def callback(data): 581 received.append(data.decode(self.client.encoding)) 582 received = [] 583 self.client.retrbinary('retr', callback) 584 self.check_data(''.join(received), RETR_DATA) 585 586 def test_retrbinary_rest(self): 587 def callback(data): 588 received.append(data.decode(self.client.encoding)) 589 for rest in (0, 10, 20): 590 received = [] 591 self.client.retrbinary('retr', callback, rest=rest) 592 self.check_data(''.join(received), RETR_DATA[rest:]) 593 594 def test_retrlines(self): 595 received = [] 596 self.client.retrlines('retr', received.append) 597 self.check_data(''.join(received), RETR_DATA.replace('\r\n', '')) 598 599 def test_storbinary(self): 600 f = io.BytesIO(RETR_DATA.encode(self.client.encoding)) 601 self.client.storbinary('stor', f) 602 self.check_data(self.server.handler_instance.last_received_data, RETR_DATA) 603 # test new callback arg 604 flag = [] 605 f.seek(0) 606 self.client.storbinary('stor', f, callback=lambda x: flag.append(None)) 607 self.assertTrue(flag) 608 609 def test_storbinary_rest(self): 610 data = RETR_DATA.replace('\r\n', '\n').encode(self.client.encoding) 611 f = io.BytesIO(data) 612 for r in (30, '30'): 613 f.seek(0) 614 self.client.storbinary('stor', f, rest=r) 615 self.assertEqual(self.server.handler_instance.rest, str(r)) 616 617 def test_storlines(self): 618 data = RETR_DATA.replace('\r\n', '\n').encode(self.client.encoding) 619 f = io.BytesIO(data) 620 self.client.storlines('stor', f) 621 self.check_data(self.server.handler_instance.last_received_data, RETR_DATA) 622 # test new callback arg 623 flag = [] 624 f.seek(0) 625 self.client.storlines('stor foo', f, callback=lambda x: flag.append(None)) 626 self.assertTrue(flag) 627 628 f = io.StringIO(RETR_DATA.replace('\r\n', '\n')) 629 # storlines() expects a binary file, not a text file 630 with support.check_warnings(('', BytesWarning), quiet=True): 631 self.assertRaises(TypeError, self.client.storlines, 'stor foo', f) 632 633 def test_nlst(self): 634 self.client.nlst() 635 self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1]) 636 637 def test_dir(self): 638 l = [] 639 self.client.dir(lambda x: l.append(x)) 640 self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', '')) 641 642 def test_mlsd(self): 643 list(self.client.mlsd()) 644 list(self.client.mlsd(path='/')) 645 list(self.client.mlsd(path='/', facts=['size', 'type'])) 646 647 ls = list(self.client.mlsd()) 648 for name, facts in ls: 649 self.assertIsInstance(name, str) 650 self.assertIsInstance(facts, dict) 651 self.assertTrue(name) 652 self.assertIn('type', facts) 653 self.assertIn('perm', facts) 654 self.assertIn('unique', facts) 655 656 def set_data(data): 657 self.server.handler_instance.next_data = data 658 659 def test_entry(line, type=None, perm=None, unique=None, name=None): 660 type = 'type' if type is None else type 661 perm = 'perm' if perm is None else perm 662 unique = 'unique' if unique is None else unique 663 name = 'name' if name is None else name 664 set_data(line) 665 _name, facts = next(self.client.mlsd()) 666 self.assertEqual(_name, name) 667 self.assertEqual(facts['type'], type) 668 self.assertEqual(facts['perm'], perm) 669 self.assertEqual(facts['unique'], unique) 670 671 # plain 672 test_entry('type=type;perm=perm;unique=unique; name\r\n') 673 # "=" in fact value 674 test_entry('type=ty=pe;perm=perm;unique=unique; name\r\n', type="ty=pe") 675 test_entry('type==type;perm=perm;unique=unique; name\r\n', type="=type") 676 test_entry('type=t=y=pe;perm=perm;unique=unique; name\r\n', type="t=y=pe") 677 test_entry('type=====;perm=perm;unique=unique; name\r\n', type="====") 678 # spaces in name 679 test_entry('type=type;perm=perm;unique=unique; na me\r\n', name="na me") 680 test_entry('type=type;perm=perm;unique=unique; name \r\n', name="name ") 681 test_entry('type=type;perm=perm;unique=unique; name\r\n', name=" name") 682 test_entry('type=type;perm=perm;unique=unique; n am e\r\n', name="n am e") 683 # ";" in name 684 test_entry('type=type;perm=perm;unique=unique; na;me\r\n', name="na;me") 685 test_entry('type=type;perm=perm;unique=unique; ;name\r\n', name=";name") 686 test_entry('type=type;perm=perm;unique=unique; ;name;\r\n', name=";name;") 687 test_entry('type=type;perm=perm;unique=unique; ;;;;\r\n', name=";;;;") 688 # case sensitiveness 689 set_data('Type=type;TyPe=perm;UNIQUE=unique; name\r\n') 690 _name, facts = next(self.client.mlsd()) 691 for x in facts: 692 self.assertTrue(x.islower()) 693 # no data (directory empty) 694 set_data('') 695 self.assertRaises(StopIteration, next, self.client.mlsd()) 696 set_data('') 697 for x in self.client.mlsd(): 698 self.fail("unexpected data %s" % x) 699 700 def test_makeport(self): 701 with self.client.makeport(): 702 # IPv4 is in use, just make sure send_eprt has not been used 703 self.assertEqual(self.server.handler_instance.last_received_cmd, 704 'port') 705 706 def test_makepasv(self): 707 host, port = self.client.makepasv() 708 conn = socket.create_connection((host, port), timeout=TIMEOUT) 709 conn.close() 710 # IPv4 is in use, just make sure send_epsv has not been used 711 self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv') 712 713 def test_makepasv_issue43285_security_disabled(self): 714 """Test the opt-in to the old vulnerable behavior.""" 715 self.client.trust_server_pasv_ipv4_address = True 716 bad_host, port = self.client.makepasv() 717 self.assertEqual( 718 bad_host, self.server.handler_instance.fake_pasv_server_ip) 719 # Opening and closing a connection keeps the dummy server happy 720 # instead of timing out on accept. 721 socket.create_connection((self.client.sock.getpeername()[0], port), 722 timeout=TIMEOUT).close() 723 724 def test_makepasv_issue43285_security_enabled_default(self): 725 self.assertFalse(self.client.trust_server_pasv_ipv4_address) 726 trusted_host, port = self.client.makepasv() 727 self.assertNotEqual( 728 trusted_host, self.server.handler_instance.fake_pasv_server_ip) 729 # Opening and closing a connection keeps the dummy server happy 730 # instead of timing out on accept. 731 socket.create_connection((trusted_host, port), timeout=TIMEOUT).close() 732 733 def test_with_statement(self): 734 self.client.quit() 735 736 def is_client_connected(): 737 if self.client.sock is None: 738 return False 739 try: 740 self.client.sendcmd('noop') 741 except (OSError, EOFError): 742 return False 743 return True 744 745 # base test 746 with ftplib.FTP(timeout=TIMEOUT) as self.client: 747 self.client.connect(self.server.host, self.server.port) 748 self.client.sendcmd('noop') 749 self.assertTrue(is_client_connected()) 750 self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit') 751 self.assertFalse(is_client_connected()) 752 753 # QUIT sent inside the with block 754 with ftplib.FTP(timeout=TIMEOUT) as self.client: 755 self.client.connect(self.server.host, self.server.port) 756 self.client.sendcmd('noop') 757 self.client.quit() 758 self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit') 759 self.assertFalse(is_client_connected()) 760 761 # force a wrong response code to be sent on QUIT: error_perm 762 # is expected and the connection is supposed to be closed 763 try: 764 with ftplib.FTP(timeout=TIMEOUT) as self.client: 765 self.client.connect(self.server.host, self.server.port) 766 self.client.sendcmd('noop') 767 self.server.handler_instance.next_response = '550 error on quit' 768 except ftplib.error_perm as err: 769 self.assertEqual(str(err), '550 error on quit') 770 else: 771 self.fail('Exception not raised') 772 # needed to give the threaded server some time to set the attribute 773 # which otherwise would still be == 'noop' 774 time.sleep(0.1) 775 self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit') 776 self.assertFalse(is_client_connected()) 777 778 def test_source_address(self): 779 self.client.quit() 780 port = socket_helper.find_unused_port() 781 try: 782 self.client.connect(self.server.host, self.server.port, 783 source_address=(HOST, port)) 784 self.assertEqual(self.client.sock.getsockname()[1], port) 785 self.client.quit() 786 except OSError as e: 787 if e.errno == errno.EADDRINUSE: 788 self.skipTest("couldn't bind to port %d" % port) 789 raise 790 791 def test_source_address_passive_connection(self): 792 port = socket_helper.find_unused_port() 793 self.client.source_address = (HOST, port) 794 try: 795 with self.client.transfercmd('list') as sock: 796 self.assertEqual(sock.getsockname()[1], port) 797 except OSError as e: 798 if e.errno == errno.EADDRINUSE: 799 self.skipTest("couldn't bind to port %d" % port) 800 raise 801 802 def test_parse257(self): 803 self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar') 804 self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar') 805 self.assertEqual(ftplib.parse257('257 ""'), '') 806 self.assertEqual(ftplib.parse257('257 "" created'), '') 807 self.assertRaises(ftplib.error_reply, ftplib.parse257, '250 "/foo/bar"') 808 # The 257 response is supposed to include the directory 809 # name and in case it contains embedded double-quotes 810 # they must be doubled (see RFC-959, chapter 7, appendix 2). 811 self.assertEqual(ftplib.parse257('257 "/foo/b""ar"'), '/foo/b"ar') 812 self.assertEqual(ftplib.parse257('257 "/foo/b""ar" created'), '/foo/b"ar') 813 814 def test_line_too_long(self): 815 self.assertRaises(ftplib.Error, self.client.sendcmd, 816 'x' * self.client.maxline * 2) 817 818 def test_retrlines_too_long(self): 819 self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2)) 820 received = [] 821 self.assertRaises(ftplib.Error, 822 self.client.retrlines, 'retr', received.append) 823 824 def test_storlines_too_long(self): 825 f = io.BytesIO(b'x' * self.client.maxline * 2) 826 self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f) 827 828 def test_encoding_param(self): 829 encodings = ['latin-1', 'utf-8'] 830 for encoding in encodings: 831 with self.subTest(encoding=encoding): 832 self.tearDown() 833 self.setUp(encoding=encoding) 834 self.assertEqual(encoding, self.client.encoding) 835 self.test_retrbinary() 836 self.test_storbinary() 837 self.test_retrlines() 838 new_dir = self.client.mkd('/non-ascii dir \xAE') 839 self.check_data(new_dir, '/non-ascii dir \xAE') 840 # Check default encoding 841 client = ftplib.FTP(timeout=TIMEOUT) 842 self.assertEqual(DEFAULT_ENCODING, client.encoding) 843 844 845@skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled") 846class TestIPv6Environment(TestCase): 847 848 def setUp(self): 849 self.server = DummyFTPServer((HOSTv6, 0), 850 af=socket.AF_INET6, 851 encoding=DEFAULT_ENCODING) 852 self.server.start() 853 self.client = ftplib.FTP(timeout=TIMEOUT, encoding=DEFAULT_ENCODING) 854 self.client.connect(self.server.host, self.server.port) 855 856 def tearDown(self): 857 self.client.close() 858 self.server.stop() 859 # Explicitly clear the attribute to prevent dangling thread 860 self.server = None 861 asyncore.close_all(ignore_all=True) 862 863 def test_af(self): 864 self.assertEqual(self.client.af, socket.AF_INET6) 865 866 def test_makeport(self): 867 with self.client.makeport(): 868 self.assertEqual(self.server.handler_instance.last_received_cmd, 869 'eprt') 870 871 def test_makepasv(self): 872 host, port = self.client.makepasv() 873 conn = socket.create_connection((host, port), timeout=TIMEOUT) 874 conn.close() 875 self.assertEqual(self.server.handler_instance.last_received_cmd, 'epsv') 876 877 def test_transfer(self): 878 def retr(): 879 def callback(data): 880 received.append(data.decode(self.client.encoding)) 881 received = [] 882 self.client.retrbinary('retr', callback) 883 self.assertEqual(len(''.join(received)), len(RETR_DATA)) 884 self.assertEqual(''.join(received), RETR_DATA) 885 self.client.set_pasv(True) 886 retr() 887 self.client.set_pasv(False) 888 retr() 889 890 891@skipUnless(ssl, "SSL not available") 892class TestTLS_FTPClassMixin(TestFTPClass): 893 """Repeat TestFTPClass tests starting the TLS layer for both control 894 and data connections first. 895 """ 896 897 def setUp(self, encoding=DEFAULT_ENCODING): 898 self.server = DummyTLS_FTPServer((HOST, 0), encoding=encoding) 899 self.server.start() 900 self.client = ftplib.FTP_TLS(timeout=TIMEOUT, encoding=encoding) 901 self.client.connect(self.server.host, self.server.port) 902 # enable TLS 903 self.client.auth() 904 self.client.prot_p() 905 906 907@skipUnless(ssl, "SSL not available") 908class TestTLS_FTPClass(TestCase): 909 """Specific TLS_FTP class tests.""" 910 911 def setUp(self, encoding=DEFAULT_ENCODING): 912 self.server = DummyTLS_FTPServer((HOST, 0), encoding=encoding) 913 self.server.start() 914 self.client = ftplib.FTP_TLS(timeout=TIMEOUT) 915 self.client.connect(self.server.host, self.server.port) 916 917 def tearDown(self): 918 self.client.close() 919 self.server.stop() 920 # Explicitly clear the attribute to prevent dangling thread 921 self.server = None 922 asyncore.close_all(ignore_all=True) 923 924 def test_control_connection(self): 925 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) 926 self.client.auth() 927 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 928 929 def test_data_connection(self): 930 # clear text 931 with self.client.transfercmd('list') as sock: 932 self.assertNotIsInstance(sock, ssl.SSLSocket) 933 self.assertEqual(sock.recv(1024), 934 LIST_DATA.encode(self.client.encoding)) 935 self.assertEqual(self.client.voidresp(), "226 transfer complete") 936 937 # secured, after PROT P 938 self.client.prot_p() 939 with self.client.transfercmd('list') as sock: 940 self.assertIsInstance(sock, ssl.SSLSocket) 941 # consume from SSL socket to finalize handshake and avoid 942 # "SSLError [SSL] shutdown while in init" 943 self.assertEqual(sock.recv(1024), 944 LIST_DATA.encode(self.client.encoding)) 945 self.assertEqual(self.client.voidresp(), "226 transfer complete") 946 947 # PROT C is issued, the connection must be in cleartext again 948 self.client.prot_c() 949 with self.client.transfercmd('list') as sock: 950 self.assertNotIsInstance(sock, ssl.SSLSocket) 951 self.assertEqual(sock.recv(1024), 952 LIST_DATA.encode(self.client.encoding)) 953 self.assertEqual(self.client.voidresp(), "226 transfer complete") 954 955 def test_login(self): 956 # login() is supposed to implicitly secure the control connection 957 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) 958 self.client.login() 959 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 960 # make sure that AUTH TLS doesn't get issued again 961 self.client.login() 962 963 def test_auth_issued_twice(self): 964 self.client.auth() 965 self.assertRaises(ValueError, self.client.auth) 966 967 def test_context(self): 968 self.client.quit() 969 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 970 ctx.check_hostname = False 971 ctx.verify_mode = ssl.CERT_NONE 972 self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE, 973 context=ctx) 974 self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, 975 context=ctx) 976 self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, 977 keyfile=CERTFILE, context=ctx) 978 979 self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT) 980 self.client.connect(self.server.host, self.server.port) 981 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) 982 self.client.auth() 983 self.assertIs(self.client.sock.context, ctx) 984 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 985 986 self.client.prot_p() 987 with self.client.transfercmd('list') as sock: 988 self.assertIs(sock.context, ctx) 989 self.assertIsInstance(sock, ssl.SSLSocket) 990 991 def test_ccc(self): 992 self.assertRaises(ValueError, self.client.ccc) 993 self.client.login(secure=True) 994 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 995 self.client.ccc() 996 self.assertRaises(ValueError, self.client.sock.unwrap) 997 998 @skipUnless(False, "FIXME: bpo-32706") 999 def test_check_hostname(self): 1000 self.client.quit() 1001 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1002 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 1003 self.assertEqual(ctx.check_hostname, True) 1004 ctx.load_verify_locations(CAFILE) 1005 self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT) 1006 1007 # 127.0.0.1 doesn't match SAN 1008 self.client.connect(self.server.host, self.server.port) 1009 with self.assertRaises(ssl.CertificateError): 1010 self.client.auth() 1011 # exception quits connection 1012 1013 self.client.connect(self.server.host, self.server.port) 1014 self.client.prot_p() 1015 with self.assertRaises(ssl.CertificateError): 1016 with self.client.transfercmd("list") as sock: 1017 pass 1018 self.client.quit() 1019 1020 self.client.connect("localhost", self.server.port) 1021 self.client.auth() 1022 self.client.quit() 1023 1024 self.client.connect("localhost", self.server.port) 1025 self.client.prot_p() 1026 with self.client.transfercmd("list") as sock: 1027 pass 1028 1029 1030class TestTimeouts(TestCase): 1031 1032 def setUp(self): 1033 self.evt = threading.Event() 1034 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1035 self.sock.settimeout(20) 1036 self.port = socket_helper.bind_port(self.sock) 1037 self.server_thread = threading.Thread(target=self.server) 1038 self.server_thread.daemon = True 1039 self.server_thread.start() 1040 # Wait for the server to be ready. 1041 self.evt.wait() 1042 self.evt.clear() 1043 self.old_port = ftplib.FTP.port 1044 ftplib.FTP.port = self.port 1045 1046 def tearDown(self): 1047 ftplib.FTP.port = self.old_port 1048 self.server_thread.join() 1049 # Explicitly clear the attribute to prevent dangling thread 1050 self.server_thread = None 1051 1052 def server(self): 1053 # This method sets the evt 3 times: 1054 # 1) when the connection is ready to be accepted. 1055 # 2) when it is safe for the caller to close the connection 1056 # 3) when we have closed the socket 1057 self.sock.listen() 1058 # (1) Signal the caller that we are ready to accept the connection. 1059 self.evt.set() 1060 try: 1061 conn, addr = self.sock.accept() 1062 except socket.timeout: 1063 pass 1064 else: 1065 conn.sendall(b"1 Hola mundo\n") 1066 conn.shutdown(socket.SHUT_WR) 1067 # (2) Signal the caller that it is safe to close the socket. 1068 self.evt.set() 1069 conn.close() 1070 finally: 1071 self.sock.close() 1072 1073 def testTimeoutDefault(self): 1074 # default -- use global socket timeout 1075 self.assertIsNone(socket.getdefaulttimeout()) 1076 socket.setdefaulttimeout(30) 1077 try: 1078 ftp = ftplib.FTP(HOST) 1079 finally: 1080 socket.setdefaulttimeout(None) 1081 self.assertEqual(ftp.sock.gettimeout(), 30) 1082 self.evt.wait() 1083 ftp.close() 1084 1085 def testTimeoutNone(self): 1086 # no timeout -- do not use global socket timeout 1087 self.assertIsNone(socket.getdefaulttimeout()) 1088 socket.setdefaulttimeout(30) 1089 try: 1090 ftp = ftplib.FTP(HOST, timeout=None) 1091 finally: 1092 socket.setdefaulttimeout(None) 1093 self.assertIsNone(ftp.sock.gettimeout()) 1094 self.evt.wait() 1095 ftp.close() 1096 1097 def testTimeoutValue(self): 1098 # a value 1099 ftp = ftplib.FTP(HOST, timeout=30) 1100 self.assertEqual(ftp.sock.gettimeout(), 30) 1101 self.evt.wait() 1102 ftp.close() 1103 1104 # bpo-39259 1105 with self.assertRaises(ValueError): 1106 ftplib.FTP(HOST, timeout=0) 1107 1108 def testTimeoutConnect(self): 1109 ftp = ftplib.FTP() 1110 ftp.connect(HOST, timeout=30) 1111 self.assertEqual(ftp.sock.gettimeout(), 30) 1112 self.evt.wait() 1113 ftp.close() 1114 1115 def testTimeoutDifferentOrder(self): 1116 ftp = ftplib.FTP(timeout=30) 1117 ftp.connect(HOST) 1118 self.assertEqual(ftp.sock.gettimeout(), 30) 1119 self.evt.wait() 1120 ftp.close() 1121 1122 def testTimeoutDirectAccess(self): 1123 ftp = ftplib.FTP() 1124 ftp.timeout = 30 1125 ftp.connect(HOST) 1126 self.assertEqual(ftp.sock.gettimeout(), 30) 1127 self.evt.wait() 1128 ftp.close() 1129 1130 1131class MiscTestCase(TestCase): 1132 def test__all__(self): 1133 blacklist = {'MSG_OOB', 'FTP_PORT', 'MAXLINE', 'CRLF', 'B_CRLF', 1134 'Error', 'parse150', 'parse227', 'parse229', 'parse257', 1135 'print_line', 'ftpcp', 'test'} 1136 support.check__all__(self, ftplib, blacklist=blacklist) 1137 1138 1139def test_main(): 1140 tests = [TestFTPClass, TestTimeouts, 1141 TestIPv6Environment, 1142 TestTLS_FTPClassMixin, TestTLS_FTPClass, 1143 MiscTestCase] 1144 1145 thread_info = support.threading_setup() 1146 try: 1147 support.run_unittest(*tests) 1148 finally: 1149 support.threading_cleanup(*thread_info) 1150 1151 1152if __name__ == '__main__': 1153 test_main() 1154