• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Test script for ftplib module."""
2
3# Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS
4# environment
5
6import ftplib
7import asyncore
8import asynchat
9import socket
10import io
11import errno
12import os
13import threading
14import time
15try:
16    import ssl
17except ImportError:
18    ssl = None
19
20from unittest import TestCase, skipUnless
21from test import support
22from test.support import socket_helper
23from test.support.socket_helper import HOST, HOSTv6
24
25TIMEOUT = support.LOOPBACK_TIMEOUT
26DEFAULT_ENCODING = 'utf-8'
27# the dummy data returned by server over the data channel when
28# RETR, LIST, NLST, MLSD commands are issued
29RETR_DATA = 'abcde12345\r\n' * 1000 + 'non-ascii char \xAE\r\n'
30LIST_DATA = 'foo\r\nbar\r\n non-ascii char \xAE\r\n'
31NLST_DATA = 'foo\r\nbar\r\n non-ascii char \xAE\r\n'
32MLSD_DATA = ("type=cdir;perm=el;unique==keVO1+ZF4; test\r\n"
33             "type=pdir;perm=e;unique==keVO1+d?3; ..\r\n"
34             "type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar\r\n"
35             "type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device\r\n"
36             "type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block\r\n"
37             "type=file;perm=awr;unique==keVO1+8G4; writable\r\n"
38             "type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous\r\n"
39             "type=dir;perm=;unique==keVO1+1t2; no-exec\r\n"
40             "type=file;perm=r;unique==keVO1+EG4; two words\r\n"
41             "type=file;perm=r;unique==keVO1+IH4;  leading space\r\n"
42             "type=file;perm=r;unique==keVO1+1G4; file1\r\n"
43             "type=dir;perm=cpmel;unique==keVO1+7G4; incoming\r\n"
44             "type=file;perm=r;unique==keVO1+1G4; file2\r\n"
45             "type=file;perm=r;unique==keVO1+1G4; file3\r\n"
46             "type=file;perm=r;unique==keVO1+1G4; file4\r\n"
47             "type=dir;perm=cpmel;unique==SGP1; dir \xAE non-ascii char\r\n"
48             "type=file;perm=r;unique==SGP2; file \xAE non-ascii char\r\n")
49
50
51class DummyDTPHandler(asynchat.async_chat):
52    dtp_conn_closed = False
53
54    def __init__(self, conn, baseclass):
55        asynchat.async_chat.__init__(self, conn)
56        self.baseclass = baseclass
57        self.baseclass.last_received_data = ''
58        self.encoding = baseclass.encoding
59
60    def handle_read(self):
61        new_data = self.recv(1024).decode(self.encoding, 'replace')
62        self.baseclass.last_received_data += new_data
63
64    def handle_close(self):
65        # XXX: this method can be called many times in a row for a single
66        # connection, including in clear-text (non-TLS) mode.
67        # (behaviour witnessed with test_data_connection)
68        if not self.dtp_conn_closed:
69            self.baseclass.push('226 transfer complete')
70            self.close()
71            self.dtp_conn_closed = True
72
73    def push(self, what):
74        if self.baseclass.next_data is not None:
75            what = self.baseclass.next_data
76            self.baseclass.next_data = None
77        if not what:
78            return self.close_when_done()
79        super(DummyDTPHandler, self).push(what.encode(self.encoding))
80
81    def handle_error(self):
82        raise Exception
83
84
85class DummyFTPHandler(asynchat.async_chat):
86
87    dtp_handler = DummyDTPHandler
88
89    def __init__(self, conn, encoding=DEFAULT_ENCODING):
90        asynchat.async_chat.__init__(self, conn)
91        # tells the socket to handle urgent data inline (ABOR command)
92        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_OOBINLINE, 1)
93        self.set_terminator(b"\r\n")
94        self.in_buffer = []
95        self.dtp = None
96        self.last_received_cmd = None
97        self.last_received_data = ''
98        self.next_response = ''
99        self.next_data = None
100        self.rest = None
101        self.next_retr_data = RETR_DATA
102        self.push('220 welcome')
103        self.encoding = encoding
104        # We use this as the string IPv4 address to direct the client
105        # to in response to a PASV command.  To test security behavior.
106        # https://bugs.python.org/issue43285/.
107        self.fake_pasv_server_ip = '252.253.254.255'
108
109    def collect_incoming_data(self, data):
110        self.in_buffer.append(data)
111
112    def found_terminator(self):
113        line = b''.join(self.in_buffer).decode(self.encoding)
114        self.in_buffer = []
115        if self.next_response:
116            self.push(self.next_response)
117            self.next_response = ''
118        cmd = line.split(' ')[0].lower()
119        self.last_received_cmd = cmd
120        space = line.find(' ')
121        if space != -1:
122            arg = line[space + 1:]
123        else:
124            arg = ""
125        if hasattr(self, 'cmd_' + cmd):
126            method = getattr(self, 'cmd_' + cmd)
127            method(arg)
128        else:
129            self.push('550 command "%s" not understood.' %cmd)
130
131    def handle_error(self):
132        raise Exception
133
134    def push(self, data):
135        asynchat.async_chat.push(self, data.encode(self.encoding) + b'\r\n')
136
137    def cmd_port(self, arg):
138        addr = list(map(int, arg.split(',')))
139        ip = '%d.%d.%d.%d' %tuple(addr[:4])
140        port = (addr[4] * 256) + addr[5]
141        s = socket.create_connection((ip, port), timeout=TIMEOUT)
142        self.dtp = self.dtp_handler(s, baseclass=self)
143        self.push('200 active data connection established')
144
145    def cmd_pasv(self, arg):
146        with socket.create_server((self.socket.getsockname()[0], 0)) as sock:
147            sock.settimeout(TIMEOUT)
148            port = sock.getsockname()[1]
149            ip = self.fake_pasv_server_ip
150            ip = ip.replace('.', ','); p1 = port / 256; p2 = port % 256
151            self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2))
152            conn, addr = sock.accept()
153            self.dtp = self.dtp_handler(conn, baseclass=self)
154
155    def cmd_eprt(self, arg):
156        af, ip, port = arg.split(arg[0])[1:-1]
157        port = int(port)
158        s = socket.create_connection((ip, port), timeout=TIMEOUT)
159        self.dtp = self.dtp_handler(s, baseclass=self)
160        self.push('200 active data connection established')
161
162    def cmd_epsv(self, arg):
163        with socket.create_server((self.socket.getsockname()[0], 0),
164                                  family=socket.AF_INET6) as sock:
165            sock.settimeout(TIMEOUT)
166            port = sock.getsockname()[1]
167            self.push('229 entering extended passive mode (|||%d|)' %port)
168            conn, addr = sock.accept()
169            self.dtp = self.dtp_handler(conn, baseclass=self)
170
171    def cmd_echo(self, arg):
172        # sends back the received string (used by the test suite)
173        self.push(arg)
174
175    def cmd_noop(self, arg):
176        self.push('200 noop ok')
177
178    def cmd_user(self, arg):
179        self.push('331 username ok')
180
181    def cmd_pass(self, arg):
182        self.push('230 password ok')
183
184    def cmd_acct(self, arg):
185        self.push('230 acct ok')
186
187    def cmd_rnfr(self, arg):
188        self.push('350 rnfr ok')
189
190    def cmd_rnto(self, arg):
191        self.push('250 rnto ok')
192
193    def cmd_dele(self, arg):
194        self.push('250 dele ok')
195
196    def cmd_cwd(self, arg):
197        self.push('250 cwd ok')
198
199    def cmd_size(self, arg):
200        self.push('250 1000')
201
202    def cmd_mkd(self, arg):
203        self.push('257 "%s"' %arg)
204
205    def cmd_rmd(self, arg):
206        self.push('250 rmd ok')
207
208    def cmd_pwd(self, arg):
209        self.push('257 "pwd ok"')
210
211    def cmd_type(self, arg):
212        self.push('200 type ok')
213
214    def cmd_quit(self, arg):
215        self.push('221 quit ok')
216        self.close()
217
218    def cmd_abor(self, arg):
219        self.push('226 abor ok')
220
221    def cmd_stor(self, arg):
222        self.push('125 stor ok')
223
224    def cmd_rest(self, arg):
225        self.rest = arg
226        self.push('350 rest ok')
227
228    def cmd_retr(self, arg):
229        self.push('125 retr ok')
230        if self.rest is not None:
231            offset = int(self.rest)
232        else:
233            offset = 0
234        self.dtp.push(self.next_retr_data[offset:])
235        self.dtp.close_when_done()
236        self.rest = None
237
238    def cmd_list(self, arg):
239        self.push('125 list ok')
240        self.dtp.push(LIST_DATA)
241        self.dtp.close_when_done()
242
243    def cmd_nlst(self, arg):
244        self.push('125 nlst ok')
245        self.dtp.push(NLST_DATA)
246        self.dtp.close_when_done()
247
248    def cmd_opts(self, arg):
249        self.push('200 opts ok')
250
251    def cmd_mlsd(self, arg):
252        self.push('125 mlsd ok')
253        self.dtp.push(MLSD_DATA)
254        self.dtp.close_when_done()
255
256    def cmd_setlongretr(self, arg):
257        # For testing. Next RETR will return long line.
258        self.next_retr_data = 'x' * int(arg)
259        self.push('125 setlongretr ok')
260
261
262class DummyFTPServer(asyncore.dispatcher, threading.Thread):
263
264    handler = DummyFTPHandler
265
266    def __init__(self, address, af=socket.AF_INET, encoding=DEFAULT_ENCODING):
267        threading.Thread.__init__(self)
268        asyncore.dispatcher.__init__(self)
269        self.daemon = True
270        self.create_socket(af, socket.SOCK_STREAM)
271        self.bind(address)
272        self.listen(5)
273        self.active = False
274        self.active_lock = threading.Lock()
275        self.host, self.port = self.socket.getsockname()[:2]
276        self.handler_instance = None
277        self.encoding = encoding
278
279    def start(self):
280        assert not self.active
281        self.__flag = threading.Event()
282        threading.Thread.start(self)
283        self.__flag.wait()
284
285    def run(self):
286        self.active = True
287        self.__flag.set()
288        while self.active and asyncore.socket_map:
289            self.active_lock.acquire()
290            asyncore.loop(timeout=0.1, count=1)
291            self.active_lock.release()
292        asyncore.close_all(ignore_all=True)
293
294    def stop(self):
295        assert self.active
296        self.active = False
297        self.join()
298
299    def handle_accepted(self, conn, addr):
300        self.handler_instance = self.handler(conn, encoding=self.encoding)
301
302    def handle_connect(self):
303        self.close()
304    handle_read = handle_connect
305
306    def writable(self):
307        return 0
308
309    def handle_error(self):
310        raise Exception
311
312
313if ssl is not None:
314
315    CERTFILE = os.path.join(os.path.dirname(__file__), "keycert3.pem")
316    CAFILE = os.path.join(os.path.dirname(__file__), "pycacert.pem")
317
318    class SSLConnection(asyncore.dispatcher):
319        """An asyncore.dispatcher subclass supporting TLS/SSL."""
320
321        _ssl_accepting = False
322        _ssl_closing = False
323
324        def secure_connection(self):
325            context = ssl.SSLContext()
326            context.load_cert_chain(CERTFILE)
327            socket = context.wrap_socket(self.socket,
328                                         suppress_ragged_eofs=False,
329                                         server_side=True,
330                                         do_handshake_on_connect=False)
331            self.del_channel()
332            self.set_socket(socket)
333            self._ssl_accepting = True
334
335        def _do_ssl_handshake(self):
336            try:
337                self.socket.do_handshake()
338            except ssl.SSLError as err:
339                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
340                                   ssl.SSL_ERROR_WANT_WRITE):
341                    return
342                elif err.args[0] == ssl.SSL_ERROR_EOF:
343                    return self.handle_close()
344                # TODO: SSLError does not expose alert information
345                elif "SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1]:
346                    return self.handle_close()
347                raise
348            except OSError as err:
349                if err.args[0] == errno.ECONNABORTED:
350                    return self.handle_close()
351            else:
352                self._ssl_accepting = False
353
354        def _do_ssl_shutdown(self):
355            self._ssl_closing = True
356            try:
357                self.socket = self.socket.unwrap()
358            except ssl.SSLError as err:
359                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
360                                   ssl.SSL_ERROR_WANT_WRITE):
361                    return
362            except OSError:
363                # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return
364                # from OpenSSL's SSL_shutdown(), corresponding to a
365                # closed socket condition. See also:
366                # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html
367                pass
368            self._ssl_closing = False
369            if getattr(self, '_ccc', False) is False:
370                super(SSLConnection, self).close()
371            else:
372                pass
373
374        def handle_read_event(self):
375            if self._ssl_accepting:
376                self._do_ssl_handshake()
377            elif self._ssl_closing:
378                self._do_ssl_shutdown()
379            else:
380                super(SSLConnection, self).handle_read_event()
381
382        def handle_write_event(self):
383            if self._ssl_accepting:
384                self._do_ssl_handshake()
385            elif self._ssl_closing:
386                self._do_ssl_shutdown()
387            else:
388                super(SSLConnection, self).handle_write_event()
389
390        def send(self, data):
391            try:
392                return super(SSLConnection, self).send(data)
393            except ssl.SSLError as err:
394                if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN,
395                                   ssl.SSL_ERROR_WANT_READ,
396                                   ssl.SSL_ERROR_WANT_WRITE):
397                    return 0
398                raise
399
400        def recv(self, buffer_size):
401            try:
402                return super(SSLConnection, self).recv(buffer_size)
403            except ssl.SSLError as err:
404                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
405                                   ssl.SSL_ERROR_WANT_WRITE):
406                    return b''
407                if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
408                    self.handle_close()
409                    return b''
410                raise
411
412        def handle_error(self):
413            raise Exception
414
415        def close(self):
416            if (isinstance(self.socket, ssl.SSLSocket) and
417                    self.socket._sslobj is not None):
418                self._do_ssl_shutdown()
419            else:
420                super(SSLConnection, self).close()
421
422
423    class DummyTLS_DTPHandler(SSLConnection, DummyDTPHandler):
424        """A DummyDTPHandler subclass supporting TLS/SSL."""
425
426        def __init__(self, conn, baseclass):
427            DummyDTPHandler.__init__(self, conn, baseclass)
428            if self.baseclass.secure_data_channel:
429                self.secure_connection()
430
431
432    class DummyTLS_FTPHandler(SSLConnection, DummyFTPHandler):
433        """A DummyFTPHandler subclass supporting TLS/SSL."""
434
435        dtp_handler = DummyTLS_DTPHandler
436
437        def __init__(self, conn, encoding=DEFAULT_ENCODING):
438            DummyFTPHandler.__init__(self, conn, encoding=encoding)
439            self.secure_data_channel = False
440            self._ccc = False
441
442        def cmd_auth(self, line):
443            """Set up secure control channel."""
444            self.push('234 AUTH TLS successful')
445            self.secure_connection()
446
447        def cmd_ccc(self, line):
448            self.push('220 Reverting back to clear-text')
449            self._ccc = True
450            self._do_ssl_shutdown()
451
452        def cmd_pbsz(self, line):
453            """Negotiate size of buffer for secure data transfer.
454            For TLS/SSL the only valid value for the parameter is '0'.
455            Any other value is accepted but ignored.
456            """
457            self.push('200 PBSZ=0 successful.')
458
459        def cmd_prot(self, line):
460            """Setup un/secure data channel."""
461            arg = line.upper()
462            if arg == 'C':
463                self.push('200 Protection set to Clear')
464                self.secure_data_channel = False
465            elif arg == 'P':
466                self.push('200 Protection set to Private')
467                self.secure_data_channel = True
468            else:
469                self.push("502 Unrecognized PROT type (use C or P).")
470
471
472    class DummyTLS_FTPServer(DummyFTPServer):
473        handler = DummyTLS_FTPHandler
474
475
476class TestFTPClass(TestCase):
477
478    def setUp(self, encoding=DEFAULT_ENCODING):
479        self.server = DummyFTPServer((HOST, 0), encoding=encoding)
480        self.server.start()
481        self.client = ftplib.FTP(timeout=TIMEOUT, encoding=encoding)
482        self.client.connect(self.server.host, self.server.port)
483
484    def tearDown(self):
485        self.client.close()
486        self.server.stop()
487        # Explicitly clear the attribute to prevent dangling thread
488        self.server = None
489        asyncore.close_all(ignore_all=True)
490
491    def check_data(self, received, expected):
492        self.assertEqual(len(received), len(expected))
493        self.assertEqual(received, expected)
494
495    def test_getwelcome(self):
496        self.assertEqual(self.client.getwelcome(), '220 welcome')
497
498    def test_sanitize(self):
499        self.assertEqual(self.client.sanitize('foo'), repr('foo'))
500        self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****'))
501        self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****'))
502
503    def test_exceptions(self):
504        self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r\n0')
505        self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\n0')
506        self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r0')
507        self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400')
508        self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499')
509        self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500')
510        self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599')
511        self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999')
512
513    def test_all_errors(self):
514        exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
515                      ftplib.error_proto, ftplib.Error, OSError,
516                      EOFError)
517        for x in exceptions:
518            try:
519                raise x('exception not included in all_errors set')
520            except ftplib.all_errors:
521                pass
522
523    def test_set_pasv(self):
524        # passive mode is supposed to be enabled by default
525        self.assertTrue(self.client.passiveserver)
526        self.client.set_pasv(True)
527        self.assertTrue(self.client.passiveserver)
528        self.client.set_pasv(False)
529        self.assertFalse(self.client.passiveserver)
530
531    def test_voidcmd(self):
532        self.client.voidcmd('echo 200')
533        self.client.voidcmd('echo 299')
534        self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199')
535        self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300')
536
537    def test_login(self):
538        self.client.login()
539
540    def test_acct(self):
541        self.client.acct('passwd')
542
543    def test_rename(self):
544        self.client.rename('a', 'b')
545        self.server.handler_instance.next_response = '200'
546        self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b')
547
548    def test_delete(self):
549        self.client.delete('foo')
550        self.server.handler_instance.next_response = '199'
551        self.assertRaises(ftplib.error_reply, self.client.delete, 'foo')
552
553    def test_size(self):
554        self.client.size('foo')
555
556    def test_mkd(self):
557        dir = self.client.mkd('/foo')
558        self.assertEqual(dir, '/foo')
559
560    def test_rmd(self):
561        self.client.rmd('foo')
562
563    def test_cwd(self):
564        dir = self.client.cwd('/foo')
565        self.assertEqual(dir, '250 cwd ok')
566
567    def test_pwd(self):
568        dir = self.client.pwd()
569        self.assertEqual(dir, 'pwd ok')
570
571    def test_quit(self):
572        self.assertEqual(self.client.quit(), '221 quit ok')
573        # Ensure the connection gets closed; sock attribute should be None
574        self.assertEqual(self.client.sock, None)
575
576    def test_abort(self):
577        self.client.abort()
578
579    def test_retrbinary(self):
580        def callback(data):
581            received.append(data.decode(self.client.encoding))
582        received = []
583        self.client.retrbinary('retr', callback)
584        self.check_data(''.join(received), RETR_DATA)
585
586    def test_retrbinary_rest(self):
587        def callback(data):
588            received.append(data.decode(self.client.encoding))
589        for rest in (0, 10, 20):
590            received = []
591            self.client.retrbinary('retr', callback, rest=rest)
592            self.check_data(''.join(received), RETR_DATA[rest:])
593
594    def test_retrlines(self):
595        received = []
596        self.client.retrlines('retr', received.append)
597        self.check_data(''.join(received), RETR_DATA.replace('\r\n', ''))
598
599    def test_storbinary(self):
600        f = io.BytesIO(RETR_DATA.encode(self.client.encoding))
601        self.client.storbinary('stor', f)
602        self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
603        # test new callback arg
604        flag = []
605        f.seek(0)
606        self.client.storbinary('stor', f, callback=lambda x: flag.append(None))
607        self.assertTrue(flag)
608
609    def test_storbinary_rest(self):
610        data = RETR_DATA.replace('\r\n', '\n').encode(self.client.encoding)
611        f = io.BytesIO(data)
612        for r in (30, '30'):
613            f.seek(0)
614            self.client.storbinary('stor', f, rest=r)
615            self.assertEqual(self.server.handler_instance.rest, str(r))
616
617    def test_storlines(self):
618        data = RETR_DATA.replace('\r\n', '\n').encode(self.client.encoding)
619        f = io.BytesIO(data)
620        self.client.storlines('stor', f)
621        self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
622        # test new callback arg
623        flag = []
624        f.seek(0)
625        self.client.storlines('stor foo', f, callback=lambda x: flag.append(None))
626        self.assertTrue(flag)
627
628        f = io.StringIO(RETR_DATA.replace('\r\n', '\n'))
629        # storlines() expects a binary file, not a text file
630        with support.check_warnings(('', BytesWarning), quiet=True):
631            self.assertRaises(TypeError, self.client.storlines, 'stor foo', f)
632
633    def test_nlst(self):
634        self.client.nlst()
635        self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1])
636
637    def test_dir(self):
638        l = []
639        self.client.dir(lambda x: l.append(x))
640        self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
641
642    def test_mlsd(self):
643        list(self.client.mlsd())
644        list(self.client.mlsd(path='/'))
645        list(self.client.mlsd(path='/', facts=['size', 'type']))
646
647        ls = list(self.client.mlsd())
648        for name, facts in ls:
649            self.assertIsInstance(name, str)
650            self.assertIsInstance(facts, dict)
651            self.assertTrue(name)
652            self.assertIn('type', facts)
653            self.assertIn('perm', facts)
654            self.assertIn('unique', facts)
655
656        def set_data(data):
657            self.server.handler_instance.next_data = data
658
659        def test_entry(line, type=None, perm=None, unique=None, name=None):
660            type = 'type' if type is None else type
661            perm = 'perm' if perm is None else perm
662            unique = 'unique' if unique is None else unique
663            name = 'name' if name is None else name
664            set_data(line)
665            _name, facts = next(self.client.mlsd())
666            self.assertEqual(_name, name)
667            self.assertEqual(facts['type'], type)
668            self.assertEqual(facts['perm'], perm)
669            self.assertEqual(facts['unique'], unique)
670
671        # plain
672        test_entry('type=type;perm=perm;unique=unique; name\r\n')
673        # "=" in fact value
674        test_entry('type=ty=pe;perm=perm;unique=unique; name\r\n', type="ty=pe")
675        test_entry('type==type;perm=perm;unique=unique; name\r\n', type="=type")
676        test_entry('type=t=y=pe;perm=perm;unique=unique; name\r\n', type="t=y=pe")
677        test_entry('type=====;perm=perm;unique=unique; name\r\n', type="====")
678        # spaces in name
679        test_entry('type=type;perm=perm;unique=unique; na me\r\n', name="na me")
680        test_entry('type=type;perm=perm;unique=unique; name \r\n', name="name ")
681        test_entry('type=type;perm=perm;unique=unique;  name\r\n', name=" name")
682        test_entry('type=type;perm=perm;unique=unique; n am  e\r\n', name="n am  e")
683        # ";" in name
684        test_entry('type=type;perm=perm;unique=unique; na;me\r\n', name="na;me")
685        test_entry('type=type;perm=perm;unique=unique; ;name\r\n', name=";name")
686        test_entry('type=type;perm=perm;unique=unique; ;name;\r\n', name=";name;")
687        test_entry('type=type;perm=perm;unique=unique; ;;;;\r\n', name=";;;;")
688        # case sensitiveness
689        set_data('Type=type;TyPe=perm;UNIQUE=unique; name\r\n')
690        _name, facts = next(self.client.mlsd())
691        for x in facts:
692            self.assertTrue(x.islower())
693        # no data (directory empty)
694        set_data('')
695        self.assertRaises(StopIteration, next, self.client.mlsd())
696        set_data('')
697        for x in self.client.mlsd():
698            self.fail("unexpected data %s" % x)
699
700    def test_makeport(self):
701        with self.client.makeport():
702            # IPv4 is in use, just make sure send_eprt has not been used
703            self.assertEqual(self.server.handler_instance.last_received_cmd,
704                                'port')
705
706    def test_makepasv(self):
707        host, port = self.client.makepasv()
708        conn = socket.create_connection((host, port), timeout=TIMEOUT)
709        conn.close()
710        # IPv4 is in use, just make sure send_epsv has not been used
711        self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv')
712
713    def test_makepasv_issue43285_security_disabled(self):
714        """Test the opt-in to the old vulnerable behavior."""
715        self.client.trust_server_pasv_ipv4_address = True
716        bad_host, port = self.client.makepasv()
717        self.assertEqual(
718                bad_host, self.server.handler_instance.fake_pasv_server_ip)
719        # Opening and closing a connection keeps the dummy server happy
720        # instead of timing out on accept.
721        socket.create_connection((self.client.sock.getpeername()[0], port),
722                                 timeout=TIMEOUT).close()
723
724    def test_makepasv_issue43285_security_enabled_default(self):
725        self.assertFalse(self.client.trust_server_pasv_ipv4_address)
726        trusted_host, port = self.client.makepasv()
727        self.assertNotEqual(
728                trusted_host, self.server.handler_instance.fake_pasv_server_ip)
729        # Opening and closing a connection keeps the dummy server happy
730        # instead of timing out on accept.
731        socket.create_connection((trusted_host, port), timeout=TIMEOUT).close()
732
733    def test_with_statement(self):
734        self.client.quit()
735
736        def is_client_connected():
737            if self.client.sock is None:
738                return False
739            try:
740                self.client.sendcmd('noop')
741            except (OSError, EOFError):
742                return False
743            return True
744
745        # base test
746        with ftplib.FTP(timeout=TIMEOUT) as self.client:
747            self.client.connect(self.server.host, self.server.port)
748            self.client.sendcmd('noop')
749            self.assertTrue(is_client_connected())
750        self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
751        self.assertFalse(is_client_connected())
752
753        # QUIT sent inside the with block
754        with ftplib.FTP(timeout=TIMEOUT) as self.client:
755            self.client.connect(self.server.host, self.server.port)
756            self.client.sendcmd('noop')
757            self.client.quit()
758        self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
759        self.assertFalse(is_client_connected())
760
761        # force a wrong response code to be sent on QUIT: error_perm
762        # is expected and the connection is supposed to be closed
763        try:
764            with ftplib.FTP(timeout=TIMEOUT) as self.client:
765                self.client.connect(self.server.host, self.server.port)
766                self.client.sendcmd('noop')
767                self.server.handler_instance.next_response = '550 error on quit'
768        except ftplib.error_perm as err:
769            self.assertEqual(str(err), '550 error on quit')
770        else:
771            self.fail('Exception not raised')
772        # needed to give the threaded server some time to set the attribute
773        # which otherwise would still be == 'noop'
774        time.sleep(0.1)
775        self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
776        self.assertFalse(is_client_connected())
777
778    def test_source_address(self):
779        self.client.quit()
780        port = socket_helper.find_unused_port()
781        try:
782            self.client.connect(self.server.host, self.server.port,
783                                source_address=(HOST, port))
784            self.assertEqual(self.client.sock.getsockname()[1], port)
785            self.client.quit()
786        except OSError as e:
787            if e.errno == errno.EADDRINUSE:
788                self.skipTest("couldn't bind to port %d" % port)
789            raise
790
791    def test_source_address_passive_connection(self):
792        port = socket_helper.find_unused_port()
793        self.client.source_address = (HOST, port)
794        try:
795            with self.client.transfercmd('list') as sock:
796                self.assertEqual(sock.getsockname()[1], port)
797        except OSError as e:
798            if e.errno == errno.EADDRINUSE:
799                self.skipTest("couldn't bind to port %d" % port)
800            raise
801
802    def test_parse257(self):
803        self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar')
804        self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar')
805        self.assertEqual(ftplib.parse257('257 ""'), '')
806        self.assertEqual(ftplib.parse257('257 "" created'), '')
807        self.assertRaises(ftplib.error_reply, ftplib.parse257, '250 "/foo/bar"')
808        # The 257 response is supposed to include the directory
809        # name and in case it contains embedded double-quotes
810        # they must be doubled (see RFC-959, chapter 7, appendix 2).
811        self.assertEqual(ftplib.parse257('257 "/foo/b""ar"'), '/foo/b"ar')
812        self.assertEqual(ftplib.parse257('257 "/foo/b""ar" created'), '/foo/b"ar')
813
814    def test_line_too_long(self):
815        self.assertRaises(ftplib.Error, self.client.sendcmd,
816                          'x' * self.client.maxline * 2)
817
818    def test_retrlines_too_long(self):
819        self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2))
820        received = []
821        self.assertRaises(ftplib.Error,
822                          self.client.retrlines, 'retr', received.append)
823
824    def test_storlines_too_long(self):
825        f = io.BytesIO(b'x' * self.client.maxline * 2)
826        self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
827
828    def test_encoding_param(self):
829        encodings = ['latin-1', 'utf-8']
830        for encoding in encodings:
831            with self.subTest(encoding=encoding):
832                self.tearDown()
833                self.setUp(encoding=encoding)
834                self.assertEqual(encoding, self.client.encoding)
835                self.test_retrbinary()
836                self.test_storbinary()
837                self.test_retrlines()
838                new_dir = self.client.mkd('/non-ascii dir \xAE')
839                self.check_data(new_dir, '/non-ascii dir \xAE')
840        # Check default encoding
841        client = ftplib.FTP(timeout=TIMEOUT)
842        self.assertEqual(DEFAULT_ENCODING, client.encoding)
843
844
845@skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
846class TestIPv6Environment(TestCase):
847
848    def setUp(self):
849        self.server = DummyFTPServer((HOSTv6, 0),
850                                     af=socket.AF_INET6,
851                                     encoding=DEFAULT_ENCODING)
852        self.server.start()
853        self.client = ftplib.FTP(timeout=TIMEOUT, encoding=DEFAULT_ENCODING)
854        self.client.connect(self.server.host, self.server.port)
855
856    def tearDown(self):
857        self.client.close()
858        self.server.stop()
859        # Explicitly clear the attribute to prevent dangling thread
860        self.server = None
861        asyncore.close_all(ignore_all=True)
862
863    def test_af(self):
864        self.assertEqual(self.client.af, socket.AF_INET6)
865
866    def test_makeport(self):
867        with self.client.makeport():
868            self.assertEqual(self.server.handler_instance.last_received_cmd,
869                                'eprt')
870
871    def test_makepasv(self):
872        host, port = self.client.makepasv()
873        conn = socket.create_connection((host, port), timeout=TIMEOUT)
874        conn.close()
875        self.assertEqual(self.server.handler_instance.last_received_cmd, 'epsv')
876
877    def test_transfer(self):
878        def retr():
879            def callback(data):
880                received.append(data.decode(self.client.encoding))
881            received = []
882            self.client.retrbinary('retr', callback)
883            self.assertEqual(len(''.join(received)), len(RETR_DATA))
884            self.assertEqual(''.join(received), RETR_DATA)
885        self.client.set_pasv(True)
886        retr()
887        self.client.set_pasv(False)
888        retr()
889
890
891@skipUnless(ssl, "SSL not available")
892class TestTLS_FTPClassMixin(TestFTPClass):
893    """Repeat TestFTPClass tests starting the TLS layer for both control
894    and data connections first.
895    """
896
897    def setUp(self, encoding=DEFAULT_ENCODING):
898        self.server = DummyTLS_FTPServer((HOST, 0), encoding=encoding)
899        self.server.start()
900        self.client = ftplib.FTP_TLS(timeout=TIMEOUT, encoding=encoding)
901        self.client.connect(self.server.host, self.server.port)
902        # enable TLS
903        self.client.auth()
904        self.client.prot_p()
905
906
907@skipUnless(ssl, "SSL not available")
908class TestTLS_FTPClass(TestCase):
909    """Specific TLS_FTP class tests."""
910
911    def setUp(self, encoding=DEFAULT_ENCODING):
912        self.server = DummyTLS_FTPServer((HOST, 0), encoding=encoding)
913        self.server.start()
914        self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
915        self.client.connect(self.server.host, self.server.port)
916
917    def tearDown(self):
918        self.client.close()
919        self.server.stop()
920        # Explicitly clear the attribute to prevent dangling thread
921        self.server = None
922        asyncore.close_all(ignore_all=True)
923
924    def test_control_connection(self):
925        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
926        self.client.auth()
927        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
928
929    def test_data_connection(self):
930        # clear text
931        with self.client.transfercmd('list') as sock:
932            self.assertNotIsInstance(sock, ssl.SSLSocket)
933            self.assertEqual(sock.recv(1024),
934                             LIST_DATA.encode(self.client.encoding))
935        self.assertEqual(self.client.voidresp(), "226 transfer complete")
936
937        # secured, after PROT P
938        self.client.prot_p()
939        with self.client.transfercmd('list') as sock:
940            self.assertIsInstance(sock, ssl.SSLSocket)
941            # consume from SSL socket to finalize handshake and avoid
942            # "SSLError [SSL] shutdown while in init"
943            self.assertEqual(sock.recv(1024),
944                             LIST_DATA.encode(self.client.encoding))
945        self.assertEqual(self.client.voidresp(), "226 transfer complete")
946
947        # PROT C is issued, the connection must be in cleartext again
948        self.client.prot_c()
949        with self.client.transfercmd('list') as sock:
950            self.assertNotIsInstance(sock, ssl.SSLSocket)
951            self.assertEqual(sock.recv(1024),
952                             LIST_DATA.encode(self.client.encoding))
953        self.assertEqual(self.client.voidresp(), "226 transfer complete")
954
955    def test_login(self):
956        # login() is supposed to implicitly secure the control connection
957        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
958        self.client.login()
959        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
960        # make sure that AUTH TLS doesn't get issued again
961        self.client.login()
962
963    def test_auth_issued_twice(self):
964        self.client.auth()
965        self.assertRaises(ValueError, self.client.auth)
966
967    def test_context(self):
968        self.client.quit()
969        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
970        ctx.check_hostname = False
971        ctx.verify_mode = ssl.CERT_NONE
972        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
973                          context=ctx)
974        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
975                          context=ctx)
976        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
977                          keyfile=CERTFILE, context=ctx)
978
979        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
980        self.client.connect(self.server.host, self.server.port)
981        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
982        self.client.auth()
983        self.assertIs(self.client.sock.context, ctx)
984        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
985
986        self.client.prot_p()
987        with self.client.transfercmd('list') as sock:
988            self.assertIs(sock.context, ctx)
989            self.assertIsInstance(sock, ssl.SSLSocket)
990
991    def test_ccc(self):
992        self.assertRaises(ValueError, self.client.ccc)
993        self.client.login(secure=True)
994        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
995        self.client.ccc()
996        self.assertRaises(ValueError, self.client.sock.unwrap)
997
998    @skipUnless(False, "FIXME: bpo-32706")
999    def test_check_hostname(self):
1000        self.client.quit()
1001        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1002        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1003        self.assertEqual(ctx.check_hostname, True)
1004        ctx.load_verify_locations(CAFILE)
1005        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
1006
1007        # 127.0.0.1 doesn't match SAN
1008        self.client.connect(self.server.host, self.server.port)
1009        with self.assertRaises(ssl.CertificateError):
1010            self.client.auth()
1011        # exception quits connection
1012
1013        self.client.connect(self.server.host, self.server.port)
1014        self.client.prot_p()
1015        with self.assertRaises(ssl.CertificateError):
1016            with self.client.transfercmd("list") as sock:
1017                pass
1018        self.client.quit()
1019
1020        self.client.connect("localhost", self.server.port)
1021        self.client.auth()
1022        self.client.quit()
1023
1024        self.client.connect("localhost", self.server.port)
1025        self.client.prot_p()
1026        with self.client.transfercmd("list") as sock:
1027            pass
1028
1029
1030class TestTimeouts(TestCase):
1031
1032    def setUp(self):
1033        self.evt = threading.Event()
1034        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1035        self.sock.settimeout(20)
1036        self.port = socket_helper.bind_port(self.sock)
1037        self.server_thread = threading.Thread(target=self.server)
1038        self.server_thread.daemon = True
1039        self.server_thread.start()
1040        # Wait for the server to be ready.
1041        self.evt.wait()
1042        self.evt.clear()
1043        self.old_port = ftplib.FTP.port
1044        ftplib.FTP.port = self.port
1045
1046    def tearDown(self):
1047        ftplib.FTP.port = self.old_port
1048        self.server_thread.join()
1049        # Explicitly clear the attribute to prevent dangling thread
1050        self.server_thread = None
1051
1052    def server(self):
1053        # This method sets the evt 3 times:
1054        #  1) when the connection is ready to be accepted.
1055        #  2) when it is safe for the caller to close the connection
1056        #  3) when we have closed the socket
1057        self.sock.listen()
1058        # (1) Signal the caller that we are ready to accept the connection.
1059        self.evt.set()
1060        try:
1061            conn, addr = self.sock.accept()
1062        except socket.timeout:
1063            pass
1064        else:
1065            conn.sendall(b"1 Hola mundo\n")
1066            conn.shutdown(socket.SHUT_WR)
1067            # (2) Signal the caller that it is safe to close the socket.
1068            self.evt.set()
1069            conn.close()
1070        finally:
1071            self.sock.close()
1072
1073    def testTimeoutDefault(self):
1074        # default -- use global socket timeout
1075        self.assertIsNone(socket.getdefaulttimeout())
1076        socket.setdefaulttimeout(30)
1077        try:
1078            ftp = ftplib.FTP(HOST)
1079        finally:
1080            socket.setdefaulttimeout(None)
1081        self.assertEqual(ftp.sock.gettimeout(), 30)
1082        self.evt.wait()
1083        ftp.close()
1084
1085    def testTimeoutNone(self):
1086        # no timeout -- do not use global socket timeout
1087        self.assertIsNone(socket.getdefaulttimeout())
1088        socket.setdefaulttimeout(30)
1089        try:
1090            ftp = ftplib.FTP(HOST, timeout=None)
1091        finally:
1092            socket.setdefaulttimeout(None)
1093        self.assertIsNone(ftp.sock.gettimeout())
1094        self.evt.wait()
1095        ftp.close()
1096
1097    def testTimeoutValue(self):
1098        # a value
1099        ftp = ftplib.FTP(HOST, timeout=30)
1100        self.assertEqual(ftp.sock.gettimeout(), 30)
1101        self.evt.wait()
1102        ftp.close()
1103
1104        # bpo-39259
1105        with self.assertRaises(ValueError):
1106            ftplib.FTP(HOST, timeout=0)
1107
1108    def testTimeoutConnect(self):
1109        ftp = ftplib.FTP()
1110        ftp.connect(HOST, timeout=30)
1111        self.assertEqual(ftp.sock.gettimeout(), 30)
1112        self.evt.wait()
1113        ftp.close()
1114
1115    def testTimeoutDifferentOrder(self):
1116        ftp = ftplib.FTP(timeout=30)
1117        ftp.connect(HOST)
1118        self.assertEqual(ftp.sock.gettimeout(), 30)
1119        self.evt.wait()
1120        ftp.close()
1121
1122    def testTimeoutDirectAccess(self):
1123        ftp = ftplib.FTP()
1124        ftp.timeout = 30
1125        ftp.connect(HOST)
1126        self.assertEqual(ftp.sock.gettimeout(), 30)
1127        self.evt.wait()
1128        ftp.close()
1129
1130
1131class MiscTestCase(TestCase):
1132    def test__all__(self):
1133        blacklist = {'MSG_OOB', 'FTP_PORT', 'MAXLINE', 'CRLF', 'B_CRLF',
1134                     'Error', 'parse150', 'parse227', 'parse229', 'parse257',
1135                     'print_line', 'ftpcp', 'test'}
1136        support.check__all__(self, ftplib, blacklist=blacklist)
1137
1138
1139def test_main():
1140    tests = [TestFTPClass, TestTimeouts,
1141             TestIPv6Environment,
1142             TestTLS_FTPClassMixin, TestTLS_FTPClass,
1143             MiscTestCase]
1144
1145    thread_info = support.threading_setup()
1146    try:
1147        support.run_unittest(*tests)
1148    finally:
1149        support.threading_cleanup(*thread_info)
1150
1151
1152if __name__ == '__main__':
1153    test_main()
1154