1"""Test script for poplib module.""" 2 3# Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL 4# a real test suite 5 6import poplib 7import asyncore 8import asynchat 9import socket 10import os 11import errno 12 13from unittest import TestCase, skipUnless 14from test import support as test_support 15threading = test_support.import_module('threading') 16 17HOST = test_support.HOST 18PORT = 0 19 20SUPPORTS_SSL = False 21if hasattr(poplib, 'POP3_SSL'): 22 import ssl 23 24 SUPPORTS_SSL = True 25 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem") 26 CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem") 27 28requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported') 29 30# the dummy data returned by server when LIST and RETR commands are issued 31LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n' 32RETR_RESP = b"""From: postmaster@python.org\ 33\r\nContent-Type: text/plain\r\n\ 34MIME-Version: 1.0\r\n\ 35Subject: Dummy\r\n\ 36\r\n\ 37line1\r\n\ 38line2\r\n\ 39line3\r\n\ 40.\r\n""" 41 42 43class DummyPOP3Handler(asynchat.async_chat): 44 45 CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']} 46 enable_UTF8 = False 47 48 def __init__(self, conn): 49 asynchat.async_chat.__init__(self, conn) 50 self.set_terminator(b"\r\n") 51 self.in_buffer = [] 52 self.push('+OK dummy pop3 server ready. <timestamp>') 53 self.tls_active = False 54 self.tls_starting = False 55 56 def collect_incoming_data(self, data): 57 self.in_buffer.append(data) 58 59 def found_terminator(self): 60 line = b''.join(self.in_buffer) 61 line = str(line, 'ISO-8859-1') 62 self.in_buffer = [] 63 cmd = line.split(' ')[0].lower() 64 space = line.find(' ') 65 if space != -1: 66 arg = line[space + 1:] 67 else: 68 arg = "" 69 if hasattr(self, 'cmd_' + cmd): 70 method = getattr(self, 'cmd_' + cmd) 71 method(arg) 72 else: 73 self.push('-ERR unrecognized POP3 command "%s".' %cmd) 74 75 def handle_error(self): 76 raise 77 78 def push(self, data): 79 asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n') 80 81 def cmd_echo(self, arg): 82 # sends back the received string (used by the test suite) 83 self.push(arg) 84 85 def cmd_user(self, arg): 86 if arg != "guido": 87 self.push("-ERR no such user") 88 self.push('+OK password required') 89 90 def cmd_pass(self, arg): 91 if arg != "python": 92 self.push("-ERR wrong password") 93 self.push('+OK 10 messages') 94 95 def cmd_stat(self, arg): 96 self.push('+OK 10 100') 97 98 def cmd_list(self, arg): 99 if arg: 100 self.push('+OK %s %s' % (arg, arg)) 101 else: 102 self.push('+OK') 103 asynchat.async_chat.push(self, LIST_RESP) 104 105 cmd_uidl = cmd_list 106 107 def cmd_retr(self, arg): 108 self.push('+OK %s bytes' %len(RETR_RESP)) 109 asynchat.async_chat.push(self, RETR_RESP) 110 111 cmd_top = cmd_retr 112 113 def cmd_dele(self, arg): 114 self.push('+OK message marked for deletion.') 115 116 def cmd_noop(self, arg): 117 self.push('+OK done nothing.') 118 119 def cmd_rpop(self, arg): 120 self.push('+OK done nothing.') 121 122 def cmd_apop(self, arg): 123 self.push('+OK done nothing.') 124 125 def cmd_quit(self, arg): 126 self.push('+OK closing.') 127 self.close_when_done() 128 129 def _get_capas(self): 130 _capas = dict(self.CAPAS) 131 if not self.tls_active and SUPPORTS_SSL: 132 _capas['STLS'] = [] 133 return _capas 134 135 def cmd_capa(self, arg): 136 self.push('+OK Capability list follows') 137 if self._get_capas(): 138 for cap, params in self._get_capas().items(): 139 _ln = [cap] 140 if params: 141 _ln.extend(params) 142 self.push(' '.join(_ln)) 143 self.push('.') 144 145 def cmd_utf8(self, arg): 146 self.push('+OK I know RFC6856' 147 if self.enable_UTF8 148 else '-ERR What is UTF8?!') 149 150 if SUPPORTS_SSL: 151 152 def cmd_stls(self, arg): 153 if self.tls_active is False: 154 self.push('+OK Begin TLS negotiation') 155 context = ssl.SSLContext() 156 context.load_cert_chain(CERTFILE) 157 tls_sock = context.wrap_socket(self.socket, 158 server_side=True, 159 do_handshake_on_connect=False, 160 suppress_ragged_eofs=False) 161 self.del_channel() 162 self.set_socket(tls_sock) 163 self.tls_active = True 164 self.tls_starting = True 165 self.in_buffer = [] 166 self._do_tls_handshake() 167 else: 168 self.push('-ERR Command not permitted when TLS active') 169 170 def _do_tls_handshake(self): 171 try: 172 self.socket.do_handshake() 173 except ssl.SSLError as err: 174 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 175 ssl.SSL_ERROR_WANT_WRITE): 176 return 177 elif err.args[0] == ssl.SSL_ERROR_EOF: 178 return self.handle_close() 179 raise 180 except OSError as err: 181 if err.args[0] == errno.ECONNABORTED: 182 return self.handle_close() 183 else: 184 self.tls_active = True 185 self.tls_starting = False 186 187 def handle_read(self): 188 if self.tls_starting: 189 self._do_tls_handshake() 190 else: 191 try: 192 asynchat.async_chat.handle_read(self) 193 except ssl.SSLEOFError: 194 self.handle_close() 195 196class DummyPOP3Server(asyncore.dispatcher, threading.Thread): 197 198 handler = DummyPOP3Handler 199 200 def __init__(self, address, af=socket.AF_INET): 201 threading.Thread.__init__(self) 202 asyncore.dispatcher.__init__(self) 203 self.create_socket(af, socket.SOCK_STREAM) 204 self.bind(address) 205 self.listen(5) 206 self.active = False 207 self.active_lock = threading.Lock() 208 self.host, self.port = self.socket.getsockname()[:2] 209 self.handler_instance = None 210 211 def start(self): 212 assert not self.active 213 self.__flag = threading.Event() 214 threading.Thread.start(self) 215 self.__flag.wait() 216 217 def run(self): 218 self.active = True 219 self.__flag.set() 220 while self.active and asyncore.socket_map: 221 self.active_lock.acquire() 222 asyncore.loop(timeout=0.1, count=1) 223 self.active_lock.release() 224 asyncore.close_all(ignore_all=True) 225 226 def stop(self): 227 assert self.active 228 self.active = False 229 self.join() 230 231 def handle_accepted(self, conn, addr): 232 self.handler_instance = self.handler(conn) 233 234 def handle_connect(self): 235 self.close() 236 handle_read = handle_connect 237 238 def writable(self): 239 return 0 240 241 def handle_error(self): 242 raise 243 244 245class TestPOP3Class(TestCase): 246 def assertOK(self, resp): 247 self.assertTrue(resp.startswith(b"+OK")) 248 249 def setUp(self): 250 self.server = DummyPOP3Server((HOST, PORT)) 251 self.server.start() 252 self.client = poplib.POP3(self.server.host, self.server.port, timeout=3) 253 254 def tearDown(self): 255 self.client.close() 256 self.server.stop() 257 258 def test_getwelcome(self): 259 self.assertEqual(self.client.getwelcome(), 260 b'+OK dummy pop3 server ready. <timestamp>') 261 262 def test_exceptions(self): 263 self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err') 264 265 def test_user(self): 266 self.assertOK(self.client.user('guido')) 267 self.assertRaises(poplib.error_proto, self.client.user, 'invalid') 268 269 def test_pass_(self): 270 self.assertOK(self.client.pass_('python')) 271 self.assertRaises(poplib.error_proto, self.client.user, 'invalid') 272 273 def test_stat(self): 274 self.assertEqual(self.client.stat(), (10, 100)) 275 276 def test_list(self): 277 self.assertEqual(self.client.list()[1:], 278 ([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'], 279 25)) 280 self.assertTrue(self.client.list('1').endswith(b"OK 1 1")) 281 282 def test_retr(self): 283 expected = (b'+OK 116 bytes', 284 [b'From: postmaster@python.org', b'Content-Type: text/plain', 285 b'MIME-Version: 1.0', b'Subject: Dummy', 286 b'', b'line1', b'line2', b'line3'], 287 113) 288 foo = self.client.retr('foo') 289 self.assertEqual(foo, expected) 290 291 def test_too_long_lines(self): 292 self.assertRaises(poplib.error_proto, self.client._shortcmd, 293 'echo +%s' % ((poplib._MAXLINE + 10) * 'a')) 294 295 def test_dele(self): 296 self.assertOK(self.client.dele('foo')) 297 298 def test_noop(self): 299 self.assertOK(self.client.noop()) 300 301 def test_rpop(self): 302 self.assertOK(self.client.rpop('foo')) 303 304 def test_apop(self): 305 self.assertOK(self.client.apop('foo', 'dummypassword')) 306 307 def test_top(self): 308 expected = (b'+OK 116 bytes', 309 [b'From: postmaster@python.org', b'Content-Type: text/plain', 310 b'MIME-Version: 1.0', b'Subject: Dummy', b'', 311 b'line1', b'line2', b'line3'], 312 113) 313 self.assertEqual(self.client.top(1, 1), expected) 314 315 def test_uidl(self): 316 self.client.uidl() 317 self.client.uidl('foo') 318 319 def test_utf8_raises_if_unsupported(self): 320 self.server.handler.enable_UTF8 = False 321 self.assertRaises(poplib.error_proto, self.client.utf8) 322 323 def test_utf8(self): 324 self.server.handler.enable_UTF8 = True 325 expected = b'+OK I know RFC6856' 326 result = self.client.utf8() 327 self.assertEqual(result, expected) 328 329 def test_capa(self): 330 capa = self.client.capa() 331 self.assertTrue('IMPLEMENTATION' in capa.keys()) 332 333 def test_quit(self): 334 resp = self.client.quit() 335 self.assertTrue(resp) 336 self.assertIsNone(self.client.sock) 337 self.assertIsNone(self.client.file) 338 339 @requires_ssl 340 def test_stls_capa(self): 341 capa = self.client.capa() 342 self.assertTrue('STLS' in capa.keys()) 343 344 @requires_ssl 345 def test_stls(self): 346 expected = b'+OK Begin TLS negotiation' 347 resp = self.client.stls() 348 self.assertEqual(resp, expected) 349 350 @requires_ssl 351 def test_stls_context(self): 352 expected = b'+OK Begin TLS negotiation' 353 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 354 ctx.load_verify_locations(CAFILE) 355 ctx.verify_mode = ssl.CERT_REQUIRED 356 ctx.check_hostname = True 357 with self.assertRaises(ssl.CertificateError): 358 resp = self.client.stls(context=ctx) 359 self.client = poplib.POP3("localhost", self.server.port, timeout=3) 360 resp = self.client.stls(context=ctx) 361 self.assertEqual(resp, expected) 362 363 364if SUPPORTS_SSL: 365 from test.test_ftplib import SSLConnection 366 367 class DummyPOP3_SSLHandler(SSLConnection, DummyPOP3Handler): 368 369 def __init__(self, conn): 370 asynchat.async_chat.__init__(self, conn) 371 self.secure_connection() 372 self.set_terminator(b"\r\n") 373 self.in_buffer = [] 374 self.push('+OK dummy pop3 server ready. <timestamp>') 375 self.tls_active = True 376 self.tls_starting = False 377 378 379@requires_ssl 380class TestPOP3_SSLClass(TestPOP3Class): 381 # repeat previous tests by using poplib.POP3_SSL 382 383 def setUp(self): 384 self.server = DummyPOP3Server((HOST, PORT)) 385 self.server.handler = DummyPOP3_SSLHandler 386 self.server.start() 387 self.client = poplib.POP3_SSL(self.server.host, self.server.port) 388 389 def test__all__(self): 390 self.assertIn('POP3_SSL', poplib.__all__) 391 392 def test_context(self): 393 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 394 self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, 395 self.server.port, keyfile=CERTFILE, context=ctx) 396 self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, 397 self.server.port, certfile=CERTFILE, context=ctx) 398 self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, 399 self.server.port, keyfile=CERTFILE, 400 certfile=CERTFILE, context=ctx) 401 402 self.client.quit() 403 self.client = poplib.POP3_SSL(self.server.host, self.server.port, 404 context=ctx) 405 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 406 self.assertIs(self.client.sock.context, ctx) 407 self.assertTrue(self.client.noop().startswith(b'+OK')) 408 409 def test_stls(self): 410 self.assertRaises(poplib.error_proto, self.client.stls) 411 412 test_stls_context = test_stls 413 414 def test_stls_capa(self): 415 capa = self.client.capa() 416 self.assertFalse('STLS' in capa.keys()) 417 418 419@requires_ssl 420class TestPOP3_TLSClass(TestPOP3Class): 421 # repeat previous tests by using poplib.POP3.stls() 422 423 def setUp(self): 424 self.server = DummyPOP3Server((HOST, PORT)) 425 self.server.start() 426 self.client = poplib.POP3(self.server.host, self.server.port, timeout=3) 427 self.client.stls() 428 429 def tearDown(self): 430 if self.client.file is not None and self.client.sock is not None: 431 try: 432 self.client.quit() 433 except poplib.error_proto: 434 # happens in the test_too_long_lines case; the overlong 435 # response will be treated as response to QUIT and raise 436 # this exception 437 self.client.close() 438 self.server.stop() 439 440 def test_stls(self): 441 self.assertRaises(poplib.error_proto, self.client.stls) 442 443 test_stls_context = test_stls 444 445 def test_stls_capa(self): 446 capa = self.client.capa() 447 self.assertFalse(b'STLS' in capa.keys()) 448 449 450class TestTimeouts(TestCase): 451 452 def setUp(self): 453 self.evt = threading.Event() 454 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 455 self.sock.settimeout(60) # Safety net. Look issue 11812 456 self.port = test_support.bind_port(self.sock) 457 self.thread = threading.Thread(target=self.server, args=(self.evt,self.sock)) 458 self.thread.setDaemon(True) 459 self.thread.start() 460 self.evt.wait() 461 462 def tearDown(self): 463 self.thread.join() 464 del self.thread # Clear out any dangling Thread objects. 465 466 def server(self, evt, serv): 467 serv.listen() 468 evt.set() 469 try: 470 conn, addr = serv.accept() 471 conn.send(b"+ Hola mundo\n") 472 conn.close() 473 except socket.timeout: 474 pass 475 finally: 476 serv.close() 477 478 def testTimeoutDefault(self): 479 self.assertIsNone(socket.getdefaulttimeout()) 480 socket.setdefaulttimeout(30) 481 try: 482 pop = poplib.POP3(HOST, self.port) 483 finally: 484 socket.setdefaulttimeout(None) 485 self.assertEqual(pop.sock.gettimeout(), 30) 486 pop.sock.close() 487 488 def testTimeoutNone(self): 489 self.assertIsNone(socket.getdefaulttimeout()) 490 socket.setdefaulttimeout(30) 491 try: 492 pop = poplib.POP3(HOST, self.port, timeout=None) 493 finally: 494 socket.setdefaulttimeout(None) 495 self.assertIsNone(pop.sock.gettimeout()) 496 pop.sock.close() 497 498 def testTimeoutValue(self): 499 pop = poplib.POP3(HOST, self.port, timeout=30) 500 self.assertEqual(pop.sock.gettimeout(), 30) 501 pop.sock.close() 502 503 504def test_main(): 505 tests = [TestPOP3Class, TestTimeouts, 506 TestPOP3_SSLClass, TestPOP3_TLSClass] 507 thread_info = test_support.threading_setup() 508 try: 509 test_support.run_unittest(*tests) 510 finally: 511 test_support.threading_cleanup(*thread_info) 512 513 514if __name__ == '__main__': 515 test_main() 516