1"""Test script for poplib module.""" 2 3# Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL 4# a real test suite 5 6import poplib 7import asyncore 8import asynchat 9import socket 10import os 11import errno 12import threading 13 14from unittest import TestCase, skipUnless 15from test import support as test_support 16 17HOST = test_support.HOST 18PORT = 0 19 20SUPPORTS_SSL = False 21if hasattr(poplib, 'POP3_SSL'): 22 import ssl 23 24 SUPPORTS_SSL = True 25 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem") 26 CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem") 27 28requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported') 29 30# the dummy data returned by server when LIST and RETR commands are issued 31LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n' 32RETR_RESP = b"""From: postmaster@python.org\ 33\r\nContent-Type: text/plain\r\n\ 34MIME-Version: 1.0\r\n\ 35Subject: Dummy\r\n\ 36\r\n\ 37line1\r\n\ 38line2\r\n\ 39line3\r\n\ 40.\r\n""" 41 42 43class DummyPOP3Handler(asynchat.async_chat): 44 45 CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']} 46 enable_UTF8 = False 47 48 def __init__(self, conn): 49 asynchat.async_chat.__init__(self, conn) 50 self.set_terminator(b"\r\n") 51 self.in_buffer = [] 52 self.push('+OK dummy pop3 server ready. <timestamp>') 53 self.tls_active = False 54 self.tls_starting = False 55 56 def collect_incoming_data(self, data): 57 self.in_buffer.append(data) 58 59 def found_terminator(self): 60 line = b''.join(self.in_buffer) 61 line = str(line, 'ISO-8859-1') 62 self.in_buffer = [] 63 cmd = line.split(' ')[0].lower() 64 space = line.find(' ') 65 if space != -1: 66 arg = line[space + 1:] 67 else: 68 arg = "" 69 if hasattr(self, 'cmd_' + cmd): 70 method = getattr(self, 'cmd_' + cmd) 71 method(arg) 72 else: 73 self.push('-ERR unrecognized POP3 command "%s".' %cmd) 74 75 def handle_error(self): 76 raise 77 78 def push(self, data): 79 asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n') 80 81 def cmd_echo(self, arg): 82 # sends back the received string (used by the test suite) 83 self.push(arg) 84 85 def cmd_user(self, arg): 86 if arg != "guido": 87 self.push("-ERR no such user") 88 self.push('+OK password required') 89 90 def cmd_pass(self, arg): 91 if arg != "python": 92 self.push("-ERR wrong password") 93 self.push('+OK 10 messages') 94 95 def cmd_stat(self, arg): 96 self.push('+OK 10 100') 97 98 def cmd_list(self, arg): 99 if arg: 100 self.push('+OK %s %s' % (arg, arg)) 101 else: 102 self.push('+OK') 103 asynchat.async_chat.push(self, LIST_RESP) 104 105 cmd_uidl = cmd_list 106 107 def cmd_retr(self, arg): 108 self.push('+OK %s bytes' %len(RETR_RESP)) 109 asynchat.async_chat.push(self, RETR_RESP) 110 111 cmd_top = cmd_retr 112 113 def cmd_dele(self, arg): 114 self.push('+OK message marked for deletion.') 115 116 def cmd_noop(self, arg): 117 self.push('+OK done nothing.') 118 119 def cmd_rpop(self, arg): 120 self.push('+OK done nothing.') 121 122 def cmd_apop(self, arg): 123 self.push('+OK done nothing.') 124 125 def cmd_quit(self, arg): 126 self.push('+OK closing.') 127 self.close_when_done() 128 129 def _get_capas(self): 130 _capas = dict(self.CAPAS) 131 if not self.tls_active and SUPPORTS_SSL: 132 _capas['STLS'] = [] 133 return _capas 134 135 def cmd_capa(self, arg): 136 self.push('+OK Capability list follows') 137 if self._get_capas(): 138 for cap, params in self._get_capas().items(): 139 _ln = [cap] 140 if params: 141 _ln.extend(params) 142 self.push(' '.join(_ln)) 143 self.push('.') 144 145 def cmd_utf8(self, arg): 146 self.push('+OK I know RFC6856' 147 if self.enable_UTF8 148 else '-ERR What is UTF8?!') 149 150 if SUPPORTS_SSL: 151 152 def cmd_stls(self, arg): 153 if self.tls_active is False: 154 self.push('+OK Begin TLS negotiation') 155 context = ssl.SSLContext() 156 context.load_cert_chain(CERTFILE) 157 tls_sock = context.wrap_socket(self.socket, 158 server_side=True, 159 do_handshake_on_connect=False, 160 suppress_ragged_eofs=False) 161 self.del_channel() 162 self.set_socket(tls_sock) 163 self.tls_active = True 164 self.tls_starting = True 165 self.in_buffer = [] 166 self._do_tls_handshake() 167 else: 168 self.push('-ERR Command not permitted when TLS active') 169 170 def _do_tls_handshake(self): 171 try: 172 self.socket.do_handshake() 173 except ssl.SSLError as err: 174 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 175 ssl.SSL_ERROR_WANT_WRITE): 176 return 177 elif err.args[0] == ssl.SSL_ERROR_EOF: 178 return self.handle_close() 179 # TODO: SSLError does not expose alert information 180 elif ("SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1] or 181 "SSLV3_ALERT_CERTIFICATE_UNKNOWN" in err.args[1]): 182 return self.handle_close() 183 raise 184 except OSError as err: 185 if err.args[0] == errno.ECONNABORTED: 186 return self.handle_close() 187 else: 188 self.tls_active = True 189 self.tls_starting = False 190 191 def handle_read(self): 192 if self.tls_starting: 193 self._do_tls_handshake() 194 else: 195 try: 196 asynchat.async_chat.handle_read(self) 197 except ssl.SSLEOFError: 198 self.handle_close() 199 200class DummyPOP3Server(asyncore.dispatcher, threading.Thread): 201 202 handler = DummyPOP3Handler 203 204 def __init__(self, address, af=socket.AF_INET): 205 threading.Thread.__init__(self) 206 asyncore.dispatcher.__init__(self) 207 self.daemon = True 208 self.create_socket(af, socket.SOCK_STREAM) 209 self.bind(address) 210 self.listen(5) 211 self.active = False 212 self.active_lock = threading.Lock() 213 self.host, self.port = self.socket.getsockname()[:2] 214 self.handler_instance = None 215 216 def start(self): 217 assert not self.active 218 self.__flag = threading.Event() 219 threading.Thread.start(self) 220 self.__flag.wait() 221 222 def run(self): 223 self.active = True 224 self.__flag.set() 225 try: 226 while self.active and asyncore.socket_map: 227 with self.active_lock: 228 asyncore.loop(timeout=0.1, count=1) 229 finally: 230 asyncore.close_all(ignore_all=True) 231 232 def stop(self): 233 assert self.active 234 self.active = False 235 self.join() 236 237 def handle_accepted(self, conn, addr): 238 self.handler_instance = self.handler(conn) 239 240 def handle_connect(self): 241 self.close() 242 handle_read = handle_connect 243 244 def writable(self): 245 return 0 246 247 def handle_error(self): 248 raise 249 250 251class TestPOP3Class(TestCase): 252 def assertOK(self, resp): 253 self.assertTrue(resp.startswith(b"+OK")) 254 255 def setUp(self): 256 self.server = DummyPOP3Server((HOST, PORT)) 257 self.server.start() 258 self.client = poplib.POP3(self.server.host, self.server.port, timeout=3) 259 260 def tearDown(self): 261 self.client.close() 262 self.server.stop() 263 # Explicitly clear the attribute to prevent dangling thread 264 self.server = None 265 266 def test_getwelcome(self): 267 self.assertEqual(self.client.getwelcome(), 268 b'+OK dummy pop3 server ready. <timestamp>') 269 270 def test_exceptions(self): 271 self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err') 272 273 def test_user(self): 274 self.assertOK(self.client.user('guido')) 275 self.assertRaises(poplib.error_proto, self.client.user, 'invalid') 276 277 def test_pass_(self): 278 self.assertOK(self.client.pass_('python')) 279 self.assertRaises(poplib.error_proto, self.client.user, 'invalid') 280 281 def test_stat(self): 282 self.assertEqual(self.client.stat(), (10, 100)) 283 284 def test_list(self): 285 self.assertEqual(self.client.list()[1:], 286 ([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'], 287 25)) 288 self.assertTrue(self.client.list('1').endswith(b"OK 1 1")) 289 290 def test_retr(self): 291 expected = (b'+OK 116 bytes', 292 [b'From: postmaster@python.org', b'Content-Type: text/plain', 293 b'MIME-Version: 1.0', b'Subject: Dummy', 294 b'', b'line1', b'line2', b'line3'], 295 113) 296 foo = self.client.retr('foo') 297 self.assertEqual(foo, expected) 298 299 def test_too_long_lines(self): 300 self.assertRaises(poplib.error_proto, self.client._shortcmd, 301 'echo +%s' % ((poplib._MAXLINE + 10) * 'a')) 302 303 def test_dele(self): 304 self.assertOK(self.client.dele('foo')) 305 306 def test_noop(self): 307 self.assertOK(self.client.noop()) 308 309 def test_rpop(self): 310 self.assertOK(self.client.rpop('foo')) 311 312 @test_support.requires_hashdigest('md5') 313 def test_apop_normal(self): 314 self.assertOK(self.client.apop('foo', 'dummypassword')) 315 316 @test_support.requires_hashdigest('md5') 317 def test_apop_REDOS(self): 318 # Replace welcome with very long evil welcome. 319 # NB The upper bound on welcome length is currently 2048. 320 # At this length, evil input makes each apop call take 321 # on the order of milliseconds instead of microseconds. 322 evil_welcome = b'+OK' + (b'<' * 1000000) 323 with test_support.swap_attr(self.client, 'welcome', evil_welcome): 324 # The evil welcome is invalid, so apop should throw. 325 self.assertRaises(poplib.error_proto, self.client.apop, 'a', 'kb') 326 327 def test_top(self): 328 expected = (b'+OK 116 bytes', 329 [b'From: postmaster@python.org', b'Content-Type: text/plain', 330 b'MIME-Version: 1.0', b'Subject: Dummy', b'', 331 b'line1', b'line2', b'line3'], 332 113) 333 self.assertEqual(self.client.top(1, 1), expected) 334 335 def test_uidl(self): 336 self.client.uidl() 337 self.client.uidl('foo') 338 339 def test_utf8_raises_if_unsupported(self): 340 self.server.handler.enable_UTF8 = False 341 self.assertRaises(poplib.error_proto, self.client.utf8) 342 343 def test_utf8(self): 344 self.server.handler.enable_UTF8 = True 345 expected = b'+OK I know RFC6856' 346 result = self.client.utf8() 347 self.assertEqual(result, expected) 348 349 def test_capa(self): 350 capa = self.client.capa() 351 self.assertTrue('IMPLEMENTATION' in capa.keys()) 352 353 def test_quit(self): 354 resp = self.client.quit() 355 self.assertTrue(resp) 356 self.assertIsNone(self.client.sock) 357 self.assertIsNone(self.client.file) 358 359 @requires_ssl 360 def test_stls_capa(self): 361 capa = self.client.capa() 362 self.assertTrue('STLS' in capa.keys()) 363 364 @requires_ssl 365 def test_stls(self): 366 expected = b'+OK Begin TLS negotiation' 367 resp = self.client.stls() 368 self.assertEqual(resp, expected) 369 370 @requires_ssl 371 def test_stls_context(self): 372 expected = b'+OK Begin TLS negotiation' 373 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 374 ctx.load_verify_locations(CAFILE) 375 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 376 self.assertEqual(ctx.check_hostname, True) 377 with self.assertRaises(ssl.CertificateError): 378 resp = self.client.stls(context=ctx) 379 self.client = poplib.POP3("localhost", self.server.port, timeout=3) 380 resp = self.client.stls(context=ctx) 381 self.assertEqual(resp, expected) 382 383 384if SUPPORTS_SSL: 385 from test.test_ftplib import SSLConnection 386 387 class DummyPOP3_SSLHandler(SSLConnection, DummyPOP3Handler): 388 389 def __init__(self, conn): 390 asynchat.async_chat.__init__(self, conn) 391 self.secure_connection() 392 self.set_terminator(b"\r\n") 393 self.in_buffer = [] 394 self.push('+OK dummy pop3 server ready. <timestamp>') 395 self.tls_active = True 396 self.tls_starting = False 397 398 399@requires_ssl 400class TestPOP3_SSLClass(TestPOP3Class): 401 # repeat previous tests by using poplib.POP3_SSL 402 403 def setUp(self): 404 self.server = DummyPOP3Server((HOST, PORT)) 405 self.server.handler = DummyPOP3_SSLHandler 406 self.server.start() 407 self.client = poplib.POP3_SSL(self.server.host, self.server.port) 408 409 def test__all__(self): 410 self.assertIn('POP3_SSL', poplib.__all__) 411 412 def test_context(self): 413 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 414 ctx.check_hostname = False 415 ctx.verify_mode = ssl.CERT_NONE 416 self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, 417 self.server.port, keyfile=CERTFILE, context=ctx) 418 self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, 419 self.server.port, certfile=CERTFILE, context=ctx) 420 self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, 421 self.server.port, keyfile=CERTFILE, 422 certfile=CERTFILE, context=ctx) 423 424 self.client.quit() 425 self.client = poplib.POP3_SSL(self.server.host, self.server.port, 426 context=ctx) 427 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 428 self.assertIs(self.client.sock.context, ctx) 429 self.assertTrue(self.client.noop().startswith(b'+OK')) 430 431 def test_stls(self): 432 self.assertRaises(poplib.error_proto, self.client.stls) 433 434 test_stls_context = test_stls 435 436 def test_stls_capa(self): 437 capa = self.client.capa() 438 self.assertFalse('STLS' in capa.keys()) 439 440 441@requires_ssl 442class TestPOP3_TLSClass(TestPOP3Class): 443 # repeat previous tests by using poplib.POP3.stls() 444 445 def setUp(self): 446 self.server = DummyPOP3Server((HOST, PORT)) 447 self.server.start() 448 self.client = poplib.POP3(self.server.host, self.server.port, timeout=3) 449 self.client.stls() 450 451 def tearDown(self): 452 if self.client.file is not None and self.client.sock is not None: 453 try: 454 self.client.quit() 455 except poplib.error_proto: 456 # happens in the test_too_long_lines case; the overlong 457 # response will be treated as response to QUIT and raise 458 # this exception 459 self.client.close() 460 self.server.stop() 461 # Explicitly clear the attribute to prevent dangling thread 462 self.server = None 463 464 def test_stls(self): 465 self.assertRaises(poplib.error_proto, self.client.stls) 466 467 test_stls_context = test_stls 468 469 def test_stls_capa(self): 470 capa = self.client.capa() 471 self.assertFalse(b'STLS' in capa.keys()) 472 473 474class TestTimeouts(TestCase): 475 476 def setUp(self): 477 self.evt = threading.Event() 478 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 479 self.sock.settimeout(60) # Safety net. Look issue 11812 480 self.port = test_support.bind_port(self.sock) 481 self.thread = threading.Thread(target=self.server, args=(self.evt,self.sock)) 482 self.thread.daemon = True 483 self.thread.start() 484 self.evt.wait() 485 486 def tearDown(self): 487 self.thread.join() 488 # Explicitly clear the attribute to prevent dangling thread 489 self.thread = None 490 491 def server(self, evt, serv): 492 serv.listen() 493 evt.set() 494 try: 495 conn, addr = serv.accept() 496 conn.send(b"+ Hola mundo\n") 497 conn.close() 498 except socket.timeout: 499 pass 500 finally: 501 serv.close() 502 503 def testTimeoutDefault(self): 504 self.assertIsNone(socket.getdefaulttimeout()) 505 socket.setdefaulttimeout(30) 506 try: 507 pop = poplib.POP3(HOST, self.port) 508 finally: 509 socket.setdefaulttimeout(None) 510 self.assertEqual(pop.sock.gettimeout(), 30) 511 pop.close() 512 513 def testTimeoutNone(self): 514 self.assertIsNone(socket.getdefaulttimeout()) 515 socket.setdefaulttimeout(30) 516 try: 517 pop = poplib.POP3(HOST, self.port, timeout=None) 518 finally: 519 socket.setdefaulttimeout(None) 520 self.assertIsNone(pop.sock.gettimeout()) 521 pop.close() 522 523 def testTimeoutValue(self): 524 pop = poplib.POP3(HOST, self.port, timeout=30) 525 self.assertEqual(pop.sock.gettimeout(), 30) 526 pop.close() 527 528 529def test_main(): 530 tests = [TestPOP3Class, TestTimeouts, 531 TestPOP3_SSLClass, TestPOP3_TLSClass] 532 thread_info = test_support.threading_setup() 533 try: 534 test_support.run_unittest(*tests) 535 finally: 536 test_support.threading_cleanup(*thread_info) 537 538 539if __name__ == '__main__': 540 test_main() 541