• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Test script for ftplib module."""
2
3# Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS
4# environment
5
6import ftplib
7import asyncore
8import asynchat
9import socket
10import StringIO
11import errno
12import os
13try:
14    import ssl
15except ImportError:
16    ssl = None
17
18from unittest import TestCase, SkipTest, skipUnless
19from test import test_support
20from test.test_support import HOST, HOSTv6
21threading = test_support.import_module('threading')
22
23TIMEOUT = 3
24# the dummy data returned by server over the data channel when
25# RETR, LIST and NLST commands are issued
26RETR_DATA = 'abcde12345\r\n' * 1000
27LIST_DATA = 'foo\r\nbar\r\n'
28NLST_DATA = 'foo\r\nbar\r\n'
29
30
31class DummyDTPHandler(asynchat.async_chat):
32    dtp_conn_closed = False
33
34    def __init__(self, conn, baseclass):
35        asynchat.async_chat.__init__(self, conn)
36        self.baseclass = baseclass
37        self.baseclass.last_received_data = ''
38
39    def handle_read(self):
40        self.baseclass.last_received_data += self.recv(1024)
41
42    def handle_close(self):
43        # XXX: this method can be called many times in a row for a single
44        # connection, including in clear-text (non-TLS) mode.
45        # (behaviour witnessed with test_data_connection)
46        if not self.dtp_conn_closed:
47            self.baseclass.push('226 transfer complete')
48            self.close()
49            self.dtp_conn_closed = True
50
51    def handle_error(self):
52        raise
53
54
55class DummyFTPHandler(asynchat.async_chat):
56
57    dtp_handler = DummyDTPHandler
58
59    def __init__(self, conn):
60        asynchat.async_chat.__init__(self, conn)
61        self.set_terminator("\r\n")
62        self.in_buffer = []
63        self.dtp = None
64        self.last_received_cmd = None
65        self.last_received_data = ''
66        self.next_response = ''
67        self.rest = None
68        self.next_retr_data = RETR_DATA
69        self.push('220 welcome')
70
71    def collect_incoming_data(self, data):
72        self.in_buffer.append(data)
73
74    def found_terminator(self):
75        line = ''.join(self.in_buffer)
76        self.in_buffer = []
77        if self.next_response:
78            self.push(self.next_response)
79            self.next_response = ''
80        cmd = line.split(' ')[0].lower()
81        self.last_received_cmd = cmd
82        space = line.find(' ')
83        if space != -1:
84            arg = line[space + 1:]
85        else:
86            arg = ""
87        if hasattr(self, 'cmd_' + cmd):
88            method = getattr(self, 'cmd_' + cmd)
89            method(arg)
90        else:
91            self.push('550 command "%s" not understood.' %cmd)
92
93    def handle_error(self):
94        raise
95
96    def push(self, data):
97        asynchat.async_chat.push(self, data + '\r\n')
98
99    def cmd_port(self, arg):
100        addr = map(int, arg.split(','))
101        ip = '%d.%d.%d.%d' %tuple(addr[:4])
102        port = (addr[4] * 256) + addr[5]
103        s = socket.create_connection((ip, port), timeout=10)
104        self.dtp = self.dtp_handler(s, baseclass=self)
105        self.push('200 active data connection established')
106
107    def cmd_pasv(self, arg):
108        sock = socket.socket()
109        sock.bind((self.socket.getsockname()[0], 0))
110        sock.listen(5)
111        sock.settimeout(10)
112        ip, port = sock.getsockname()[:2]
113        ip = ip.replace('.', ',')
114        p1, p2 = divmod(port, 256)
115        self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2))
116        conn, addr = sock.accept()
117        self.dtp = self.dtp_handler(conn, baseclass=self)
118
119    def cmd_eprt(self, arg):
120        af, ip, port = arg.split(arg[0])[1:-1]
121        port = int(port)
122        s = socket.create_connection((ip, port), timeout=10)
123        self.dtp = self.dtp_handler(s, baseclass=self)
124        self.push('200 active data connection established')
125
126    def cmd_epsv(self, arg):
127        sock = socket.socket(socket.AF_INET6)
128        sock.bind((self.socket.getsockname()[0], 0))
129        sock.listen(5)
130        sock.settimeout(10)
131        port = sock.getsockname()[1]
132        self.push('229 entering extended passive mode (|||%d|)' %port)
133        conn, addr = sock.accept()
134        self.dtp = self.dtp_handler(conn, baseclass=self)
135
136    def cmd_echo(self, arg):
137        # sends back the received string (used by the test suite)
138        self.push(arg)
139
140    def cmd_user(self, arg):
141        self.push('331 username ok')
142
143    def cmd_pass(self, arg):
144        self.push('230 password ok')
145
146    def cmd_acct(self, arg):
147        self.push('230 acct ok')
148
149    def cmd_rnfr(self, arg):
150        self.push('350 rnfr ok')
151
152    def cmd_rnto(self, arg):
153        self.push('250 rnto ok')
154
155    def cmd_dele(self, arg):
156        self.push('250 dele ok')
157
158    def cmd_cwd(self, arg):
159        self.push('250 cwd ok')
160
161    def cmd_size(self, arg):
162        self.push('250 1000')
163
164    def cmd_mkd(self, arg):
165        self.push('257 "%s"' %arg)
166
167    def cmd_rmd(self, arg):
168        self.push('250 rmd ok')
169
170    def cmd_pwd(self, arg):
171        self.push('257 "pwd ok"')
172
173    def cmd_type(self, arg):
174        self.push('200 type ok')
175
176    def cmd_quit(self, arg):
177        self.push('221 quit ok')
178        self.close()
179
180    def cmd_stor(self, arg):
181        self.push('125 stor ok')
182
183    def cmd_rest(self, arg):
184        self.rest = arg
185        self.push('350 rest ok')
186
187    def cmd_retr(self, arg):
188        self.push('125 retr ok')
189        if self.rest is not None:
190            offset = int(self.rest)
191        else:
192            offset = 0
193        self.dtp.push(self.next_retr_data[offset:])
194        self.dtp.close_when_done()
195        self.rest = None
196
197    def cmd_list(self, arg):
198        self.push('125 list ok')
199        self.dtp.push(LIST_DATA)
200        self.dtp.close_when_done()
201
202    def cmd_nlst(self, arg):
203        self.push('125 nlst ok')
204        self.dtp.push(NLST_DATA)
205        self.dtp.close_when_done()
206
207    def cmd_setlongretr(self, arg):
208        # For testing. Next RETR will return long line.
209        self.next_retr_data = 'x' * int(arg)
210        self.push('125 setlongretr ok')
211
212
213class DummyFTPServer(asyncore.dispatcher, threading.Thread):
214
215    handler = DummyFTPHandler
216
217    def __init__(self, address, af=socket.AF_INET):
218        threading.Thread.__init__(self)
219        asyncore.dispatcher.__init__(self)
220        self.create_socket(af, socket.SOCK_STREAM)
221        self.bind(address)
222        self.listen(5)
223        self.active = False
224        self.active_lock = threading.Lock()
225        self.host, self.port = self.socket.getsockname()[:2]
226        self.handler_instance = None
227
228    def start(self):
229        assert not self.active
230        self.__flag = threading.Event()
231        threading.Thread.start(self)
232        self.__flag.wait()
233
234    def run(self):
235        self.active = True
236        self.__flag.set()
237        while self.active and asyncore.socket_map:
238            self.active_lock.acquire()
239            asyncore.loop(timeout=0.1, count=1)
240            self.active_lock.release()
241        asyncore.close_all(ignore_all=True)
242
243    def stop(self):
244        assert self.active
245        self.active = False
246        self.join()
247
248    def handle_accept(self):
249        conn, addr = self.accept()
250        self.handler_instance = self.handler(conn)
251
252    def handle_connect(self):
253        self.close()
254    handle_read = handle_connect
255
256    def writable(self):
257        return 0
258
259    def handle_error(self):
260        raise
261
262
263if ssl is not None:
264
265    CERTFILE = os.path.join(os.path.dirname(__file__), "keycert3.pem")
266    CAFILE = os.path.join(os.path.dirname(__file__), "pycacert.pem")
267
268    class SSLConnection(object, asyncore.dispatcher):
269        """An asyncore.dispatcher subclass supporting TLS/SSL."""
270
271        _ssl_accepting = False
272        _ssl_closing = False
273
274        def secure_connection(self):
275            socket = ssl.wrap_socket(self.socket, suppress_ragged_eofs=False,
276                                     certfile=CERTFILE, server_side=True,
277                                     do_handshake_on_connect=False,
278                                     ssl_version=ssl.PROTOCOL_SSLv23)
279            self.del_channel()
280            self.set_socket(socket)
281            self._ssl_accepting = True
282
283        def _do_ssl_handshake(self):
284            try:
285                self.socket.do_handshake()
286            except ssl.SSLError as err:
287                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
288                                   ssl.SSL_ERROR_WANT_WRITE):
289                    return
290                elif err.args[0] == ssl.SSL_ERROR_EOF:
291                    return self.handle_close()
292                raise
293            except socket.error as err:
294                if err.args[0] == errno.ECONNABORTED:
295                    return self.handle_close()
296            else:
297                self._ssl_accepting = False
298
299        def _do_ssl_shutdown(self):
300            self._ssl_closing = True
301            try:
302                self.socket = self.socket.unwrap()
303            except ssl.SSLError as err:
304                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
305                                   ssl.SSL_ERROR_WANT_WRITE):
306                    return
307            except socket.error as err:
308                # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return
309                # from OpenSSL's SSL_shutdown(), corresponding to a
310                # closed socket condition. See also:
311                # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html
312                pass
313            self._ssl_closing = False
314            if getattr(self, '_ccc', False) is False:
315                super(SSLConnection, self).close()
316            else:
317                pass
318
319        def handle_read_event(self):
320            if self._ssl_accepting:
321                self._do_ssl_handshake()
322            elif self._ssl_closing:
323                self._do_ssl_shutdown()
324            else:
325                super(SSLConnection, self).handle_read_event()
326
327        def handle_write_event(self):
328            if self._ssl_accepting:
329                self._do_ssl_handshake()
330            elif self._ssl_closing:
331                self._do_ssl_shutdown()
332            else:
333                super(SSLConnection, self).handle_write_event()
334
335        def send(self, data):
336            try:
337                return super(SSLConnection, self).send(data)
338            except ssl.SSLError as err:
339                if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN,
340                                   ssl.SSL_ERROR_WANT_READ,
341                                   ssl.SSL_ERROR_WANT_WRITE):
342                    return 0
343                raise
344
345        def recv(self, buffer_size):
346            try:
347                return super(SSLConnection, self).recv(buffer_size)
348            except ssl.SSLError as err:
349                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
350                                   ssl.SSL_ERROR_WANT_WRITE):
351                    return b''
352                if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
353                    self.handle_close()
354                    return b''
355                raise
356
357        def handle_error(self):
358            raise
359
360        def close(self):
361            if (isinstance(self.socket, ssl.SSLSocket) and
362                self.socket._sslobj is not None):
363                self._do_ssl_shutdown()
364            else:
365                super(SSLConnection, self).close()
366
367
368    class DummyTLS_DTPHandler(SSLConnection, DummyDTPHandler):
369        """A DummyDTPHandler subclass supporting TLS/SSL."""
370
371        def __init__(self, conn, baseclass):
372            DummyDTPHandler.__init__(self, conn, baseclass)
373            if self.baseclass.secure_data_channel:
374                self.secure_connection()
375
376
377    class DummyTLS_FTPHandler(SSLConnection, DummyFTPHandler):
378        """A DummyFTPHandler subclass supporting TLS/SSL."""
379
380        dtp_handler = DummyTLS_DTPHandler
381
382        def __init__(self, conn):
383            DummyFTPHandler.__init__(self, conn)
384            self.secure_data_channel = False
385
386        def cmd_auth(self, line):
387            """Set up secure control channel."""
388            self.push('234 AUTH TLS successful')
389            self.secure_connection()
390
391        def cmd_pbsz(self, line):
392            """Negotiate size of buffer for secure data transfer.
393            For TLS/SSL the only valid value for the parameter is '0'.
394            Any other value is accepted but ignored.
395            """
396            self.push('200 PBSZ=0 successful.')
397
398        def cmd_prot(self, line):
399            """Setup un/secure data channel."""
400            arg = line.upper()
401            if arg == 'C':
402                self.push('200 Protection set to Clear')
403                self.secure_data_channel = False
404            elif arg == 'P':
405                self.push('200 Protection set to Private')
406                self.secure_data_channel = True
407            else:
408                self.push("502 Unrecognized PROT type (use C or P).")
409
410
411    class DummyTLS_FTPServer(DummyFTPServer):
412        handler = DummyTLS_FTPHandler
413
414
415class TestFTPClass(TestCase):
416
417    def setUp(self):
418        self.server = DummyFTPServer((HOST, 0))
419        self.server.start()
420        self.client = ftplib.FTP(timeout=10)
421        self.client.connect(self.server.host, self.server.port)
422
423    def tearDown(self):
424        self.client.close()
425        self.server.stop()
426
427    def test_getwelcome(self):
428        self.assertEqual(self.client.getwelcome(), '220 welcome')
429
430    def test_sanitize(self):
431        self.assertEqual(self.client.sanitize('foo'), repr('foo'))
432        self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****'))
433        self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****'))
434
435    def test_exceptions(self):
436        self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400')
437        self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499')
438        self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500')
439        self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599')
440        self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999')
441
442    def test_all_errors(self):
443        exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
444                      ftplib.error_proto, ftplib.Error, IOError, EOFError)
445        for x in exceptions:
446            try:
447                raise x('exception not included in all_errors set')
448            except ftplib.all_errors:
449                pass
450
451    def test_set_pasv(self):
452        # passive mode is supposed to be enabled by default
453        self.assertTrue(self.client.passiveserver)
454        self.client.set_pasv(True)
455        self.assertTrue(self.client.passiveserver)
456        self.client.set_pasv(False)
457        self.assertFalse(self.client.passiveserver)
458
459    def test_voidcmd(self):
460        self.client.voidcmd('echo 200')
461        self.client.voidcmd('echo 299')
462        self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199')
463        self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300')
464
465    def test_login(self):
466        self.client.login()
467
468    def test_acct(self):
469        self.client.acct('passwd')
470
471    def test_rename(self):
472        self.client.rename('a', 'b')
473        self.server.handler_instance.next_response = '200'
474        self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b')
475
476    def test_delete(self):
477        self.client.delete('foo')
478        self.server.handler_instance.next_response = '199'
479        self.assertRaises(ftplib.error_reply, self.client.delete, 'foo')
480
481    def test_size(self):
482        self.client.size('foo')
483
484    def test_mkd(self):
485        dir = self.client.mkd('/foo')
486        self.assertEqual(dir, '/foo')
487
488    def test_rmd(self):
489        self.client.rmd('foo')
490
491    def test_cwd(self):
492        dir = self.client.cwd('/foo')
493        self.assertEqual(dir, '250 cwd ok')
494
495    def test_pwd(self):
496        dir = self.client.pwd()
497        self.assertEqual(dir, 'pwd ok')
498
499    def test_quit(self):
500        self.assertEqual(self.client.quit(), '221 quit ok')
501        # Ensure the connection gets closed; sock attribute should be None
502        self.assertEqual(self.client.sock, None)
503
504    def test_retrbinary(self):
505        received = []
506        self.client.retrbinary('retr', received.append)
507        self.assertEqual(''.join(received), RETR_DATA)
508
509    def test_retrbinary_rest(self):
510        for rest in (0, 10, 20):
511            received = []
512            self.client.retrbinary('retr', received.append, rest=rest)
513            self.assertEqual(''.join(received), RETR_DATA[rest:],
514                             msg='rest test case %d %d %d' % (rest,
515                                                              len(''.join(received)),
516                                                              len(RETR_DATA[rest:])))
517
518    def test_retrlines(self):
519        received = []
520        self.client.retrlines('retr', received.append)
521        self.assertEqual(''.join(received), RETR_DATA.replace('\r\n', ''))
522
523    def test_storbinary(self):
524        f = StringIO.StringIO(RETR_DATA)
525        self.client.storbinary('stor', f)
526        self.assertEqual(self.server.handler_instance.last_received_data, RETR_DATA)
527        # test new callback arg
528        flag = []
529        f.seek(0)
530        self.client.storbinary('stor', f, callback=lambda x: flag.append(None))
531        self.assertTrue(flag)
532
533    def test_storbinary_rest(self):
534        f = StringIO.StringIO(RETR_DATA)
535        for r in (30, '30'):
536            f.seek(0)
537            self.client.storbinary('stor', f, rest=r)
538            self.assertEqual(self.server.handler_instance.rest, str(r))
539
540    def test_storlines(self):
541        f = StringIO.StringIO(RETR_DATA.replace('\r\n', '\n'))
542        self.client.storlines('stor', f)
543        self.assertEqual(self.server.handler_instance.last_received_data, RETR_DATA)
544        # test new callback arg
545        flag = []
546        f.seek(0)
547        self.client.storlines('stor foo', f, callback=lambda x: flag.append(None))
548        self.assertTrue(flag)
549
550    def test_nlst(self):
551        self.client.nlst()
552        self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1])
553
554    def test_dir(self):
555        l = []
556        self.client.dir(lambda x: l.append(x))
557        self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
558
559    def test_makeport(self):
560        self.client.makeport()
561        # IPv4 is in use, just make sure send_eprt has not been used
562        self.assertEqual(self.server.handler_instance.last_received_cmd, 'port')
563
564    def test_makepasv(self):
565        host, port = self.client.makepasv()
566        conn = socket.create_connection((host, port), 10)
567        conn.close()
568        # IPv4 is in use, just make sure send_epsv has not been used
569        self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv')
570
571    def test_line_too_long(self):
572        self.assertRaises(ftplib.Error, self.client.sendcmd,
573                          'x' * self.client.maxline * 2)
574
575    def test_retrlines_too_long(self):
576        self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2))
577        received = []
578        self.assertRaises(ftplib.Error,
579                          self.client.retrlines, 'retr', received.append)
580
581    def test_storlines_too_long(self):
582        f = StringIO.StringIO('x' * self.client.maxline * 2)
583        self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
584
585
586@skipUnless(socket.has_ipv6, "IPv6 not enabled")
587class TestIPv6Environment(TestCase):
588
589    @classmethod
590    def setUpClass(cls):
591        try:
592            DummyFTPServer((HOST, 0), af=socket.AF_INET6)
593        except socket.error:
594            raise SkipTest("IPv6 not enabled")
595
596    def setUp(self):
597        self.server = DummyFTPServer((HOSTv6, 0), af=socket.AF_INET6)
598        self.server.start()
599        self.client = ftplib.FTP()
600        self.client.connect(self.server.host, self.server.port)
601
602    def tearDown(self):
603        self.client.close()
604        self.server.stop()
605
606    def test_af(self):
607        self.assertEqual(self.client.af, socket.AF_INET6)
608
609    def test_makeport(self):
610        self.client.makeport()
611        self.assertEqual(self.server.handler_instance.last_received_cmd, 'eprt')
612
613    def test_makepasv(self):
614        host, port = self.client.makepasv()
615        conn = socket.create_connection((host, port), 10)
616        conn.close()
617        self.assertEqual(self.server.handler_instance.last_received_cmd, 'epsv')
618
619    def test_transfer(self):
620        def retr():
621            received = []
622            self.client.retrbinary('retr', received.append)
623            self.assertEqual(''.join(received), RETR_DATA)
624        self.client.set_pasv(True)
625        retr()
626        self.client.set_pasv(False)
627        retr()
628
629
630@skipUnless(ssl, "SSL not available")
631class TestTLS_FTPClassMixin(TestFTPClass):
632    """Repeat TestFTPClass tests starting the TLS layer for both control
633    and data connections first.
634    """
635
636    def setUp(self):
637        self.server = DummyTLS_FTPServer((HOST, 0))
638        self.server.start()
639        self.client = ftplib.FTP_TLS(timeout=10)
640        self.client.connect(self.server.host, self.server.port)
641        # enable TLS
642        self.client.auth()
643        self.client.prot_p()
644
645
646@skipUnless(ssl, "SSL not available")
647class TestTLS_FTPClass(TestCase):
648    """Specific TLS_FTP class tests."""
649
650    def setUp(self):
651        self.server = DummyTLS_FTPServer((HOST, 0))
652        self.server.start()
653        self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
654        self.client.connect(self.server.host, self.server.port)
655
656    def tearDown(self):
657        self.client.close()
658        self.server.stop()
659
660    def test_control_connection(self):
661        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
662        self.client.auth()
663        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
664
665    def test_data_connection(self):
666        # clear text
667        sock = self.client.transfercmd('list')
668        self.assertNotIsInstance(sock, ssl.SSLSocket)
669        sock.close()
670        self.assertEqual(self.client.voidresp(), "226 transfer complete")
671
672        # secured, after PROT P
673        self.client.prot_p()
674        sock = self.client.transfercmd('list')
675        self.assertIsInstance(sock, ssl.SSLSocket)
676        sock.close()
677        self.assertEqual(self.client.voidresp(), "226 transfer complete")
678
679        # PROT C is issued, the connection must be in cleartext again
680        self.client.prot_c()
681        sock = self.client.transfercmd('list')
682        self.assertNotIsInstance(sock, ssl.SSLSocket)
683        sock.close()
684        self.assertEqual(self.client.voidresp(), "226 transfer complete")
685
686    def test_login(self):
687        # login() is supposed to implicitly secure the control connection
688        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
689        self.client.login()
690        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
691        # make sure that AUTH TLS doesn't get issued again
692        self.client.login()
693
694    def test_auth_issued_twice(self):
695        self.client.auth()
696        self.assertRaises(ValueError, self.client.auth)
697
698    def test_auth_ssl(self):
699        try:
700            self.client.ssl_version = ssl.PROTOCOL_SSLv23
701            self.client.auth()
702            self.assertRaises(ValueError, self.client.auth)
703        finally:
704            self.client.ssl_version = ssl.PROTOCOL_TLSv1
705
706    def test_context(self):
707        self.client.quit()
708        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
709        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
710                          context=ctx)
711        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
712                          context=ctx)
713        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
714                          keyfile=CERTFILE, context=ctx)
715
716        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
717        self.client.connect(self.server.host, self.server.port)
718        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
719        self.client.auth()
720        self.assertIs(self.client.sock.context, ctx)
721        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
722
723        self.client.prot_p()
724        sock = self.client.transfercmd('list')
725        try:
726            self.assertIs(sock.context, ctx)
727            self.assertIsInstance(sock, ssl.SSLSocket)
728        finally:
729            sock.close()
730
731    def test_check_hostname(self):
732        self.client.quit()
733        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
734        ctx.verify_mode = ssl.CERT_REQUIRED
735        ctx.check_hostname = True
736        ctx.load_verify_locations(CAFILE)
737        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
738
739        # 127.0.0.1 doesn't match SAN
740        self.client.connect(self.server.host, self.server.port)
741        with self.assertRaises(ssl.CertificateError):
742            self.client.auth()
743        # exception quits connection
744
745        self.client.connect(self.server.host, self.server.port)
746        self.client.prot_p()
747        with self.assertRaises(ssl.CertificateError):
748            self.client.transfercmd("list").close()
749        self.client.quit()
750
751        self.client.connect("localhost", self.server.port)
752        self.client.auth()
753        self.client.quit()
754
755        self.client.connect("localhost", self.server.port)
756        self.client.prot_p()
757        self.client.transfercmd("list").close()
758
759
760class TestTimeouts(TestCase):
761
762    def setUp(self):
763        self.evt = threading.Event()
764        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
765        self.sock.settimeout(10)
766        self.port = test_support.bind_port(self.sock)
767        threading.Thread(target=self.server, args=(self.evt,self.sock)).start()
768        # Wait for the server to be ready.
769        self.evt.wait()
770        self.evt.clear()
771        ftplib.FTP.port = self.port
772
773    def tearDown(self):
774        self.evt.wait()
775
776    def server(self, evt, serv):
777        # This method sets the evt 3 times:
778        #  1) when the connection is ready to be accepted.
779        #  2) when it is safe for the caller to close the connection
780        #  3) when we have closed the socket
781        serv.listen(5)
782        # (1) Signal the caller that we are ready to accept the connection.
783        evt.set()
784        try:
785            conn, addr = serv.accept()
786        except socket.timeout:
787            pass
788        else:
789            conn.send("1 Hola mundo\n")
790            # (2) Signal the caller that it is safe to close the socket.
791            evt.set()
792            conn.close()
793        finally:
794            serv.close()
795            # (3) Signal the caller that we are done.
796            evt.set()
797
798    def testTimeoutDefault(self):
799        # default -- use global socket timeout
800        self.assertIsNone(socket.getdefaulttimeout())
801        socket.setdefaulttimeout(30)
802        try:
803            ftp = ftplib.FTP(HOST)
804        finally:
805            socket.setdefaulttimeout(None)
806        self.assertEqual(ftp.sock.gettimeout(), 30)
807        self.evt.wait()
808        ftp.close()
809
810    def testTimeoutNone(self):
811        # no timeout -- do not use global socket timeout
812        self.assertIsNone(socket.getdefaulttimeout())
813        socket.setdefaulttimeout(30)
814        try:
815            ftp = ftplib.FTP(HOST, timeout=None)
816        finally:
817            socket.setdefaulttimeout(None)
818        self.assertIsNone(ftp.sock.gettimeout())
819        self.evt.wait()
820        ftp.close()
821
822    def testTimeoutValue(self):
823        # a value
824        ftp = ftplib.FTP(HOST, timeout=30)
825        self.assertEqual(ftp.sock.gettimeout(), 30)
826        self.evt.wait()
827        ftp.close()
828
829    def testTimeoutConnect(self):
830        ftp = ftplib.FTP()
831        ftp.connect(HOST, timeout=30)
832        self.assertEqual(ftp.sock.gettimeout(), 30)
833        self.evt.wait()
834        ftp.close()
835
836    def testTimeoutDifferentOrder(self):
837        ftp = ftplib.FTP(timeout=30)
838        ftp.connect(HOST)
839        self.assertEqual(ftp.sock.gettimeout(), 30)
840        self.evt.wait()
841        ftp.close()
842
843    def testTimeoutDirectAccess(self):
844        ftp = ftplib.FTP()
845        ftp.timeout = 30
846        ftp.connect(HOST)
847        self.assertEqual(ftp.sock.gettimeout(), 30)
848        self.evt.wait()
849        ftp.close()
850
851
852def test_main():
853    tests = [TestFTPClass, TestTimeouts,
854             TestIPv6Environment,
855             TestTLS_FTPClassMixin, TestTLS_FTPClass]
856
857    thread_info = test_support.threading_setup()
858    try:
859        test_support.run_unittest(*tests)
860    finally:
861        test_support.threading_cleanup(*thread_info)
862
863
864if __name__ == '__main__':
865    test_main()
866