1"""Test script for poplib module.""" 2 3# Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL 4# a real test suite 5 6import poplib 7import socket 8import os 9import errno 10import threading 11 12import unittest 13from unittest import TestCase, skipUnless 14from test import support as test_support 15from test.support import hashlib_helper 16from test.support import socket_helper 17from test.support import threading_helper 18 19import warnings 20with warnings.catch_warnings(): 21 warnings.simplefilter('ignore', DeprecationWarning) 22 import asynchat 23 import asyncore 24 25HOST = socket_helper.HOST 26PORT = 0 27 28SUPPORTS_SSL = False 29if hasattr(poplib, 'POP3_SSL'): 30 import ssl 31 32 SUPPORTS_SSL = True 33 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem") 34 CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem") 35 36requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported') 37 38# the dummy data returned by server when LIST and RETR commands are issued 39LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n' 40RETR_RESP = b"""From: postmaster@python.org\ 41\r\nContent-Type: text/plain\r\n\ 42MIME-Version: 1.0\r\n\ 43Subject: Dummy\r\n\ 44\r\n\ 45line1\r\n\ 46line2\r\n\ 47line3\r\n\ 48.\r\n""" 49 50 51class DummyPOP3Handler(asynchat.async_chat): 52 53 CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']} 54 enable_UTF8 = False 55 56 def __init__(self, conn): 57 asynchat.async_chat.__init__(self, conn) 58 self.set_terminator(b"\r\n") 59 self.in_buffer = [] 60 self.push('+OK dummy pop3 server ready. <timestamp>') 61 self.tls_active = False 62 self.tls_starting = False 63 64 def collect_incoming_data(self, data): 65 self.in_buffer.append(data) 66 67 def found_terminator(self): 68 line = b''.join(self.in_buffer) 69 line = str(line, 'ISO-8859-1') 70 self.in_buffer = [] 71 cmd = line.split(' ')[0].lower() 72 space = line.find(' ') 73 if space != -1: 74 arg = line[space + 1:] 75 else: 76 arg = "" 77 if hasattr(self, 'cmd_' + cmd): 78 method = getattr(self, 'cmd_' + cmd) 79 method(arg) 80 else: 81 self.push('-ERR unrecognized POP3 command "%s".' %cmd) 82 83 def handle_error(self): 84 raise 85 86 def push(self, data): 87 asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n') 88 89 def cmd_echo(self, arg): 90 # sends back the received string (used by the test suite) 91 self.push(arg) 92 93 def cmd_user(self, arg): 94 if arg != "guido": 95 self.push("-ERR no such user") 96 self.push('+OK password required') 97 98 def cmd_pass(self, arg): 99 if arg != "python": 100 self.push("-ERR wrong password") 101 self.push('+OK 10 messages') 102 103 def cmd_stat(self, arg): 104 self.push('+OK 10 100') 105 106 def cmd_list(self, arg): 107 if arg: 108 self.push('+OK %s %s' % (arg, arg)) 109 else: 110 self.push('+OK') 111 asynchat.async_chat.push(self, LIST_RESP) 112 113 cmd_uidl = cmd_list 114 115 def cmd_retr(self, arg): 116 self.push('+OK %s bytes' %len(RETR_RESP)) 117 asynchat.async_chat.push(self, RETR_RESP) 118 119 cmd_top = cmd_retr 120 121 def cmd_dele(self, arg): 122 self.push('+OK message marked for deletion.') 123 124 def cmd_noop(self, arg): 125 self.push('+OK done nothing.') 126 127 def cmd_rpop(self, arg): 128 self.push('+OK done nothing.') 129 130 def cmd_apop(self, arg): 131 self.push('+OK done nothing.') 132 133 def cmd_quit(self, arg): 134 self.push('+OK closing.') 135 self.close_when_done() 136 137 def _get_capas(self): 138 _capas = dict(self.CAPAS) 139 if not self.tls_active and SUPPORTS_SSL: 140 _capas['STLS'] = [] 141 return _capas 142 143 def cmd_capa(self, arg): 144 self.push('+OK Capability list follows') 145 if self._get_capas(): 146 for cap, params in self._get_capas().items(): 147 _ln = [cap] 148 if params: 149 _ln.extend(params) 150 self.push(' '.join(_ln)) 151 self.push('.') 152 153 def cmd_utf8(self, arg): 154 self.push('+OK I know RFC6856' 155 if self.enable_UTF8 156 else '-ERR What is UTF8?!') 157 158 if SUPPORTS_SSL: 159 160 def cmd_stls(self, arg): 161 if self.tls_active is False: 162 self.push('+OK Begin TLS negotiation') 163 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 164 context.load_cert_chain(CERTFILE) 165 tls_sock = context.wrap_socket(self.socket, 166 server_side=True, 167 do_handshake_on_connect=False, 168 suppress_ragged_eofs=False) 169 self.del_channel() 170 self.set_socket(tls_sock) 171 self.tls_active = True 172 self.tls_starting = True 173 self.in_buffer = [] 174 self._do_tls_handshake() 175 else: 176 self.push('-ERR Command not permitted when TLS active') 177 178 def _do_tls_handshake(self): 179 try: 180 self.socket.do_handshake() 181 except ssl.SSLError as err: 182 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 183 ssl.SSL_ERROR_WANT_WRITE): 184 return 185 elif err.args[0] == ssl.SSL_ERROR_EOF: 186 return self.handle_close() 187 # TODO: SSLError does not expose alert information 188 elif ("SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1] or 189 "SSLV3_ALERT_CERTIFICATE_UNKNOWN" in err.args[1]): 190 return self.handle_close() 191 raise 192 except OSError as err: 193 if err.args[0] == errno.ECONNABORTED: 194 return self.handle_close() 195 else: 196 self.tls_active = True 197 self.tls_starting = False 198 199 def handle_read(self): 200 if self.tls_starting: 201 self._do_tls_handshake() 202 else: 203 try: 204 asynchat.async_chat.handle_read(self) 205 except ssl.SSLEOFError: 206 self.handle_close() 207 208class DummyPOP3Server(asyncore.dispatcher, threading.Thread): 209 210 handler = DummyPOP3Handler 211 212 def __init__(self, address, af=socket.AF_INET): 213 threading.Thread.__init__(self) 214 asyncore.dispatcher.__init__(self) 215 self.daemon = True 216 self.create_socket(af, socket.SOCK_STREAM) 217 self.bind(address) 218 self.listen(5) 219 self.active = False 220 self.active_lock = threading.Lock() 221 self.host, self.port = self.socket.getsockname()[:2] 222 self.handler_instance = None 223 224 def start(self): 225 assert not self.active 226 self.__flag = threading.Event() 227 threading.Thread.start(self) 228 self.__flag.wait() 229 230 def run(self): 231 self.active = True 232 self.__flag.set() 233 try: 234 while self.active and asyncore.socket_map: 235 with self.active_lock: 236 asyncore.loop(timeout=0.1, count=1) 237 finally: 238 asyncore.close_all(ignore_all=True) 239 240 def stop(self): 241 assert self.active 242 self.active = False 243 self.join() 244 245 def handle_accepted(self, conn, addr): 246 self.handler_instance = self.handler(conn) 247 248 def handle_connect(self): 249 self.close() 250 handle_read = handle_connect 251 252 def writable(self): 253 return 0 254 255 def handle_error(self): 256 raise 257 258 259class TestPOP3Class(TestCase): 260 def assertOK(self, resp): 261 self.assertTrue(resp.startswith(b"+OK")) 262 263 def setUp(self): 264 self.server = DummyPOP3Server((HOST, PORT)) 265 self.server.start() 266 self.client = poplib.POP3(self.server.host, self.server.port, 267 timeout=test_support.LOOPBACK_TIMEOUT) 268 269 def tearDown(self): 270 self.client.close() 271 self.server.stop() 272 # Explicitly clear the attribute to prevent dangling thread 273 self.server = None 274 275 def test_getwelcome(self): 276 self.assertEqual(self.client.getwelcome(), 277 b'+OK dummy pop3 server ready. <timestamp>') 278 279 def test_exceptions(self): 280 self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err') 281 282 def test_user(self): 283 self.assertOK(self.client.user('guido')) 284 self.assertRaises(poplib.error_proto, self.client.user, 'invalid') 285 286 def test_pass_(self): 287 self.assertOK(self.client.pass_('python')) 288 self.assertRaises(poplib.error_proto, self.client.user, 'invalid') 289 290 def test_stat(self): 291 self.assertEqual(self.client.stat(), (10, 100)) 292 293 def test_list(self): 294 self.assertEqual(self.client.list()[1:], 295 ([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'], 296 25)) 297 self.assertTrue(self.client.list('1').endswith(b"OK 1 1")) 298 299 def test_retr(self): 300 expected = (b'+OK 116 bytes', 301 [b'From: postmaster@python.org', b'Content-Type: text/plain', 302 b'MIME-Version: 1.0', b'Subject: Dummy', 303 b'', b'line1', b'line2', b'line3'], 304 113) 305 foo = self.client.retr('foo') 306 self.assertEqual(foo, expected) 307 308 def test_too_long_lines(self): 309 self.assertRaises(poplib.error_proto, self.client._shortcmd, 310 'echo +%s' % ((poplib._MAXLINE + 10) * 'a')) 311 312 def test_dele(self): 313 self.assertOK(self.client.dele('foo')) 314 315 def test_noop(self): 316 self.assertOK(self.client.noop()) 317 318 def test_rpop(self): 319 self.assertOK(self.client.rpop('foo')) 320 321 @hashlib_helper.requires_hashdigest('md5', openssl=True) 322 def test_apop_normal(self): 323 self.assertOK(self.client.apop('foo', 'dummypassword')) 324 325 @hashlib_helper.requires_hashdigest('md5', openssl=True) 326 def test_apop_REDOS(self): 327 # Replace welcome with very long evil welcome. 328 # NB The upper bound on welcome length is currently 2048. 329 # At this length, evil input makes each apop call take 330 # on the order of milliseconds instead of microseconds. 331 evil_welcome = b'+OK' + (b'<' * 1000000) 332 with test_support.swap_attr(self.client, 'welcome', evil_welcome): 333 # The evil welcome is invalid, so apop should throw. 334 self.assertRaises(poplib.error_proto, self.client.apop, 'a', 'kb') 335 336 def test_top(self): 337 expected = (b'+OK 116 bytes', 338 [b'From: postmaster@python.org', b'Content-Type: text/plain', 339 b'MIME-Version: 1.0', b'Subject: Dummy', b'', 340 b'line1', b'line2', b'line3'], 341 113) 342 self.assertEqual(self.client.top(1, 1), expected) 343 344 def test_uidl(self): 345 self.client.uidl() 346 self.client.uidl('foo') 347 348 def test_utf8_raises_if_unsupported(self): 349 self.server.handler.enable_UTF8 = False 350 self.assertRaises(poplib.error_proto, self.client.utf8) 351 352 def test_utf8(self): 353 self.server.handler.enable_UTF8 = True 354 expected = b'+OK I know RFC6856' 355 result = self.client.utf8() 356 self.assertEqual(result, expected) 357 358 def test_capa(self): 359 capa = self.client.capa() 360 self.assertTrue('IMPLEMENTATION' in capa.keys()) 361 362 def test_quit(self): 363 resp = self.client.quit() 364 self.assertTrue(resp) 365 self.assertIsNone(self.client.sock) 366 self.assertIsNone(self.client.file) 367 368 @requires_ssl 369 def test_stls_capa(self): 370 capa = self.client.capa() 371 self.assertTrue('STLS' in capa.keys()) 372 373 @requires_ssl 374 def test_stls(self): 375 expected = b'+OK Begin TLS negotiation' 376 resp = self.client.stls() 377 self.assertEqual(resp, expected) 378 379 @requires_ssl 380 def test_stls_context(self): 381 expected = b'+OK Begin TLS negotiation' 382 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 383 ctx.load_verify_locations(CAFILE) 384 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 385 self.assertEqual(ctx.check_hostname, True) 386 with self.assertRaises(ssl.CertificateError): 387 resp = self.client.stls(context=ctx) 388 self.client = poplib.POP3("localhost", self.server.port, 389 timeout=test_support.LOOPBACK_TIMEOUT) 390 resp = self.client.stls(context=ctx) 391 self.assertEqual(resp, expected) 392 393 394if SUPPORTS_SSL: 395 from test.test_ftplib import SSLConnection 396 397 class DummyPOP3_SSLHandler(SSLConnection, DummyPOP3Handler): 398 399 def __init__(self, conn): 400 asynchat.async_chat.__init__(self, conn) 401 self.secure_connection() 402 self.set_terminator(b"\r\n") 403 self.in_buffer = [] 404 self.push('+OK dummy pop3 server ready. <timestamp>') 405 self.tls_active = True 406 self.tls_starting = False 407 408 409@requires_ssl 410class TestPOP3_SSLClass(TestPOP3Class): 411 # repeat previous tests by using poplib.POP3_SSL 412 413 def setUp(self): 414 self.server = DummyPOP3Server((HOST, PORT)) 415 self.server.handler = DummyPOP3_SSLHandler 416 self.server.start() 417 self.client = poplib.POP3_SSL(self.server.host, self.server.port) 418 419 def test__all__(self): 420 self.assertIn('POP3_SSL', poplib.__all__) 421 422 def test_context(self): 423 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 424 ctx.check_hostname = False 425 ctx.verify_mode = ssl.CERT_NONE 426 self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, 427 self.server.port, keyfile=CERTFILE, context=ctx) 428 self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, 429 self.server.port, certfile=CERTFILE, context=ctx) 430 self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, 431 self.server.port, keyfile=CERTFILE, 432 certfile=CERTFILE, context=ctx) 433 434 self.client.quit() 435 self.client = poplib.POP3_SSL(self.server.host, self.server.port, 436 context=ctx) 437 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 438 self.assertIs(self.client.sock.context, ctx) 439 self.assertTrue(self.client.noop().startswith(b'+OK')) 440 441 def test_stls(self): 442 self.assertRaises(poplib.error_proto, self.client.stls) 443 444 test_stls_context = test_stls 445 446 def test_stls_capa(self): 447 capa = self.client.capa() 448 self.assertFalse('STLS' in capa.keys()) 449 450 451@requires_ssl 452class TestPOP3_TLSClass(TestPOP3Class): 453 # repeat previous tests by using poplib.POP3.stls() 454 455 def setUp(self): 456 self.server = DummyPOP3Server((HOST, PORT)) 457 self.server.start() 458 self.client = poplib.POP3(self.server.host, self.server.port, 459 timeout=test_support.LOOPBACK_TIMEOUT) 460 self.client.stls() 461 462 def tearDown(self): 463 if self.client.file is not None and self.client.sock is not None: 464 try: 465 self.client.quit() 466 except poplib.error_proto: 467 # happens in the test_too_long_lines case; the overlong 468 # response will be treated as response to QUIT and raise 469 # this exception 470 self.client.close() 471 self.server.stop() 472 # Explicitly clear the attribute to prevent dangling thread 473 self.server = None 474 475 def test_stls(self): 476 self.assertRaises(poplib.error_proto, self.client.stls) 477 478 test_stls_context = test_stls 479 480 def test_stls_capa(self): 481 capa = self.client.capa() 482 self.assertFalse(b'STLS' in capa.keys()) 483 484 485class TestTimeouts(TestCase): 486 487 def setUp(self): 488 self.evt = threading.Event() 489 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 490 self.sock.settimeout(60) # Safety net. Look issue 11812 491 self.port = socket_helper.bind_port(self.sock) 492 self.thread = threading.Thread(target=self.server, args=(self.evt, self.sock)) 493 self.thread.daemon = True 494 self.thread.start() 495 self.evt.wait() 496 497 def tearDown(self): 498 self.thread.join() 499 # Explicitly clear the attribute to prevent dangling thread 500 self.thread = None 501 502 def server(self, evt, serv): 503 serv.listen() 504 evt.set() 505 try: 506 conn, addr = serv.accept() 507 conn.send(b"+ Hola mundo\n") 508 conn.close() 509 except TimeoutError: 510 pass 511 finally: 512 serv.close() 513 514 def testTimeoutDefault(self): 515 self.assertIsNone(socket.getdefaulttimeout()) 516 socket.setdefaulttimeout(test_support.LOOPBACK_TIMEOUT) 517 try: 518 pop = poplib.POP3(HOST, self.port) 519 finally: 520 socket.setdefaulttimeout(None) 521 self.assertEqual(pop.sock.gettimeout(), test_support.LOOPBACK_TIMEOUT) 522 pop.close() 523 524 def testTimeoutNone(self): 525 self.assertIsNone(socket.getdefaulttimeout()) 526 socket.setdefaulttimeout(30) 527 try: 528 pop = poplib.POP3(HOST, self.port, timeout=None) 529 finally: 530 socket.setdefaulttimeout(None) 531 self.assertIsNone(pop.sock.gettimeout()) 532 pop.close() 533 534 def testTimeoutValue(self): 535 pop = poplib.POP3(HOST, self.port, timeout=test_support.LOOPBACK_TIMEOUT) 536 self.assertEqual(pop.sock.gettimeout(), test_support.LOOPBACK_TIMEOUT) 537 pop.close() 538 with self.assertRaises(ValueError): 539 poplib.POP3(HOST, self.port, timeout=0) 540 541 542def setUpModule(): 543 thread_info = threading_helper.threading_setup() 544 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) 545 546 547if __name__ == '__main__': 548 unittest.main() 549