1from test import support 2from test.support import socket_helper 3 4from contextlib import contextmanager 5import imaplib 6import os.path 7import socketserver 8import time 9import calendar 10import threading 11import socket 12 13from test.support import (verbose, 14 run_with_tz, run_with_locale, cpython_only) 15from test.support import hashlib_helper 16from test.support import threading_helper 17from test.support import warnings_helper 18import unittest 19from unittest import mock 20from datetime import datetime, timezone, timedelta 21try: 22 import ssl 23except ImportError: 24 ssl = None 25 26CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem") 27CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem") 28 29 30class TestImaplib(unittest.TestCase): 31 32 def test_Internaldate2tuple(self): 33 t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1)) 34 tt = imaplib.Internaldate2tuple( 35 b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")') 36 self.assertEqual(time.mktime(tt), t0) 37 tt = imaplib.Internaldate2tuple( 38 b'25 (INTERNALDATE "01-Jan-2000 11:30:00 +1130")') 39 self.assertEqual(time.mktime(tt), t0) 40 tt = imaplib.Internaldate2tuple( 41 b'25 (INTERNALDATE "31-Dec-1999 12:30:00 -1130")') 42 self.assertEqual(time.mktime(tt), t0) 43 44 @run_with_tz('MST+07MDT,M4.1.0,M10.5.0') 45 def test_Internaldate2tuple_issue10941(self): 46 self.assertNotEqual(imaplib.Internaldate2tuple( 47 b'25 (INTERNALDATE "02-Apr-2000 02:30:00 +0000")'), 48 imaplib.Internaldate2tuple( 49 b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")')) 50 51 def timevalues(self): 52 return [2000000000, 2000000000.0, time.localtime(2000000000), 53 (2033, 5, 18, 5, 33, 20, -1, -1, -1), 54 (2033, 5, 18, 5, 33, 20, -1, -1, 1), 55 datetime.fromtimestamp(2000000000, 56 timezone(timedelta(0, 2 * 60 * 60))), 57 '"18-May-2033 05:33:20 +0200"'] 58 59 @run_with_locale('LC_ALL', 'de_DE', 'fr_FR') 60 # DST rules included to work around quirk where the Gnu C library may not 61 # otherwise restore the previous time zone 62 @run_with_tz('STD-1DST,M3.2.0,M11.1.0') 63 def test_Time2Internaldate(self): 64 expected = '"18-May-2033 05:33:20 +0200"' 65 66 for t in self.timevalues(): 67 internal = imaplib.Time2Internaldate(t) 68 self.assertEqual(internal, expected) 69 70 def test_that_Time2Internaldate_returns_a_result(self): 71 # Without tzset, we can check only that it successfully 72 # produces a result, not the correctness of the result itself, 73 # since the result depends on the timezone the machine is in. 74 for t in self.timevalues(): 75 imaplib.Time2Internaldate(t) 76 77 def test_imap4_host_default_value(self): 78 # Check whether the IMAP4_PORT is truly unavailable. 79 with socket.socket() as s: 80 try: 81 s.connect(('', imaplib.IMAP4_PORT)) 82 self.skipTest( 83 "Cannot run the test with local IMAP server running.") 84 except socket.error: 85 pass 86 87 # This is the exception that should be raised. 88 expected_errnos = socket_helper.get_socket_conn_refused_errs() 89 with self.assertRaises(OSError) as cm: 90 imaplib.IMAP4() 91 self.assertIn(cm.exception.errno, expected_errnos) 92 93 94if ssl: 95 class SecureTCPServer(socketserver.TCPServer): 96 97 def get_request(self): 98 newsocket, fromaddr = self.socket.accept() 99 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 100 context.load_cert_chain(CERTFILE) 101 connstream = context.wrap_socket(newsocket, server_side=True) 102 return connstream, fromaddr 103 104 IMAP4_SSL = imaplib.IMAP4_SSL 105 106else: 107 108 class SecureTCPServer: 109 pass 110 111 IMAP4_SSL = None 112 113 114class SimpleIMAPHandler(socketserver.StreamRequestHandler): 115 timeout = support.LOOPBACK_TIMEOUT 116 continuation = None 117 capabilities = '' 118 119 def setup(self): 120 super().setup() 121 self.server.is_selected = False 122 self.server.logged = None 123 124 def _send(self, message): 125 if verbose: 126 print("SENT: %r" % message.strip()) 127 self.wfile.write(message) 128 129 def _send_line(self, message): 130 self._send(message + b'\r\n') 131 132 def _send_textline(self, message): 133 self._send_line(message.encode('ASCII')) 134 135 def _send_tagged(self, tag, code, message): 136 self._send_textline(' '.join((tag, code, message))) 137 138 def handle(self): 139 # Send a welcome message. 140 self._send_textline('* OK IMAP4rev1') 141 while 1: 142 # Gather up input until we receive a line terminator or we timeout. 143 # Accumulate read(1) because it's simpler to handle the differences 144 # between naked sockets and SSL sockets. 145 line = b'' 146 while 1: 147 try: 148 part = self.rfile.read(1) 149 if part == b'': 150 # Naked sockets return empty strings.. 151 return 152 line += part 153 except OSError: 154 # ..but SSLSockets raise exceptions. 155 return 156 if line.endswith(b'\r\n'): 157 break 158 159 if verbose: 160 print('GOT: %r' % line.strip()) 161 if self.continuation: 162 try: 163 self.continuation.send(line) 164 except StopIteration: 165 self.continuation = None 166 continue 167 splitline = line.decode('ASCII').split() 168 tag = splitline[0] 169 cmd = splitline[1] 170 args = splitline[2:] 171 172 if hasattr(self, 'cmd_' + cmd): 173 continuation = getattr(self, 'cmd_' + cmd)(tag, args) 174 if continuation: 175 self.continuation = continuation 176 next(continuation) 177 else: 178 self._send_tagged(tag, 'BAD', cmd + ' unknown') 179 180 def cmd_CAPABILITY(self, tag, args): 181 caps = ('IMAP4rev1 ' + self.capabilities 182 if self.capabilities 183 else 'IMAP4rev1') 184 self._send_textline('* CAPABILITY ' + caps) 185 self._send_tagged(tag, 'OK', 'CAPABILITY completed') 186 187 def cmd_LOGOUT(self, tag, args): 188 self.server.logged = None 189 self._send_textline('* BYE IMAP4ref1 Server logging out') 190 self._send_tagged(tag, 'OK', 'LOGOUT completed') 191 192 def cmd_LOGIN(self, tag, args): 193 self.server.logged = args[0] 194 self._send_tagged(tag, 'OK', 'LOGIN completed') 195 196 def cmd_SELECT(self, tag, args): 197 self.server.is_selected = True 198 self._send_line(b'* 2 EXISTS') 199 self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') 200 201 def cmd_UNSELECT(self, tag, args): 202 if self.server.is_selected: 203 self.server.is_selected = False 204 self._send_tagged(tag, 'OK', 'Returned to authenticated state. (Success)') 205 else: 206 self._send_tagged(tag, 'BAD', 'No mailbox selected') 207 208 209class NewIMAPTestsMixin(): 210 client = None 211 212 def _setup(self, imap_handler, connect=True): 213 """ 214 Sets up imap_handler for tests. imap_handler should inherit from either: 215 - SimpleIMAPHandler - for testing IMAP commands, 216 - socketserver.StreamRequestHandler - if raw access to stream is needed. 217 Returns (client, server). 218 """ 219 class TestTCPServer(self.server_class): 220 def handle_error(self, request, client_address): 221 """ 222 End request and raise the error if one occurs. 223 """ 224 self.close_request(request) 225 self.server_close() 226 raise 227 228 self.addCleanup(self._cleanup) 229 self.server = self.server_class((socket_helper.HOST, 0), imap_handler) 230 self.thread = threading.Thread( 231 name=self._testMethodName+'-server', 232 target=self.server.serve_forever, 233 # Short poll interval to make the test finish quickly. 234 # Time between requests is short enough that we won't wake 235 # up spuriously too many times. 236 kwargs={'poll_interval': 0.01}) 237 self.thread.daemon = True # In case this function raises. 238 self.thread.start() 239 240 if connect: 241 self.client = self.imap_class(*self.server.server_address) 242 243 return self.client, self.server 244 245 def _cleanup(self): 246 """ 247 Cleans up the test server. This method should not be called manually, 248 it is added to the cleanup queue in the _setup method already. 249 """ 250 # if logout was called already we'd raise an exception trying to 251 # shutdown the client once again 252 if self.client is not None and self.client.state != 'LOGOUT': 253 self.client.shutdown() 254 # cleanup the server 255 self.server.shutdown() 256 self.server.server_close() 257 threading_helper.join_thread(self.thread) 258 # Explicitly clear the attribute to prevent dangling thread 259 self.thread = None 260 261 def test_EOF_without_complete_welcome_message(self): 262 # http://bugs.python.org/issue5949 263 class EOFHandler(socketserver.StreamRequestHandler): 264 def handle(self): 265 self.wfile.write(b'* OK') 266 _, server = self._setup(EOFHandler, connect=False) 267 self.assertRaises(imaplib.IMAP4.abort, self.imap_class, 268 *server.server_address) 269 270 def test_line_termination(self): 271 class BadNewlineHandler(SimpleIMAPHandler): 272 def cmd_CAPABILITY(self, tag, args): 273 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') 274 self._send_tagged(tag, 'OK', 'CAPABILITY completed') 275 _, server = self._setup(BadNewlineHandler, connect=False) 276 self.assertRaises(imaplib.IMAP4.abort, self.imap_class, 277 *server.server_address) 278 279 def test_enable_raises_error_if_not_AUTH(self): 280 class EnableHandler(SimpleIMAPHandler): 281 capabilities = 'AUTH ENABLE UTF8=ACCEPT' 282 client, _ = self._setup(EnableHandler) 283 self.assertFalse(client.utf8_enabled) 284 with self.assertRaisesRegex(imaplib.IMAP4.error, 'ENABLE.*NONAUTH'): 285 client.enable('foo') 286 self.assertFalse(client.utf8_enabled) 287 288 def test_enable_raises_error_if_no_capability(self): 289 client, _ = self._setup(SimpleIMAPHandler) 290 with self.assertRaisesRegex(imaplib.IMAP4.error, 291 'does not support ENABLE'): 292 client.enable('foo') 293 294 def test_enable_UTF8_raises_error_if_not_supported(self): 295 client, _ = self._setup(SimpleIMAPHandler) 296 typ, data = client.login('user', 'pass') 297 self.assertEqual(typ, 'OK') 298 with self.assertRaisesRegex(imaplib.IMAP4.error, 299 'does not support ENABLE'): 300 client.enable('UTF8=ACCEPT') 301 302 def test_enable_UTF8_True_append(self): 303 class UTF8AppendServer(SimpleIMAPHandler): 304 capabilities = 'ENABLE UTF8=ACCEPT' 305 def cmd_ENABLE(self, tag, args): 306 self._send_tagged(tag, 'OK', 'ENABLE successful') 307 def cmd_AUTHENTICATE(self, tag, args): 308 self._send_textline('+') 309 self.server.response = yield 310 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 311 def cmd_APPEND(self, tag, args): 312 self._send_textline('+') 313 self.server.response = yield 314 self._send_tagged(tag, 'OK', 'okay') 315 client, server = self._setup(UTF8AppendServer) 316 self.assertEqual(client._encoding, 'ascii') 317 code, _ = client.authenticate('MYAUTH', lambda x: b'fake') 318 self.assertEqual(code, 'OK') 319 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' 320 code, _ = client.enable('UTF8=ACCEPT') 321 self.assertEqual(code, 'OK') 322 self.assertEqual(client._encoding, 'utf-8') 323 msg_string = 'Subject: üñí©öðé' 324 typ, data = client.append(None, None, None, msg_string.encode('utf-8')) 325 self.assertEqual(typ, 'OK') 326 self.assertEqual(server.response, 327 ('UTF8 (%s)\r\n' % msg_string).encode('utf-8')) 328 329 def test_search_disallows_charset_in_utf8_mode(self): 330 class UTF8Server(SimpleIMAPHandler): 331 capabilities = 'AUTH ENABLE UTF8=ACCEPT' 332 def cmd_ENABLE(self, tag, args): 333 self._send_tagged(tag, 'OK', 'ENABLE successful') 334 def cmd_AUTHENTICATE(self, tag, args): 335 self._send_textline('+') 336 self.server.response = yield 337 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 338 client, _ = self._setup(UTF8Server) 339 typ, _ = client.authenticate('MYAUTH', lambda x: b'fake') 340 self.assertEqual(typ, 'OK') 341 typ, _ = client.enable('UTF8=ACCEPT') 342 self.assertEqual(typ, 'OK') 343 self.assertTrue(client.utf8_enabled) 344 with self.assertRaisesRegex(imaplib.IMAP4.error, 'charset.*UTF8'): 345 client.search('foo', 'bar') 346 347 def test_bad_auth_name(self): 348 class MyServer(SimpleIMAPHandler): 349 def cmd_AUTHENTICATE(self, tag, args): 350 self._send_tagged(tag, 'NO', 351 'unrecognized authentication type {}'.format(args[0])) 352 client, _ = self._setup(MyServer) 353 with self.assertRaisesRegex(imaplib.IMAP4.error, 354 'unrecognized authentication type METHOD'): 355 client.authenticate('METHOD', lambda: 1) 356 357 def test_invalid_authentication(self): 358 class MyServer(SimpleIMAPHandler): 359 def cmd_AUTHENTICATE(self, tag, args): 360 self._send_textline('+') 361 self.response = yield 362 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid') 363 client, _ = self._setup(MyServer) 364 with self.assertRaisesRegex(imaplib.IMAP4.error, 365 r'\[AUTHENTICATIONFAILED\] invalid'): 366 client.authenticate('MYAUTH', lambda x: b'fake') 367 368 def test_valid_authentication_bytes(self): 369 class MyServer(SimpleIMAPHandler): 370 def cmd_AUTHENTICATE(self, tag, args): 371 self._send_textline('+') 372 self.server.response = yield 373 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 374 client, server = self._setup(MyServer) 375 code, _ = client.authenticate('MYAUTH', lambda x: b'fake') 376 self.assertEqual(code, 'OK') 377 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' 378 379 def test_valid_authentication_plain_text(self): 380 class MyServer(SimpleIMAPHandler): 381 def cmd_AUTHENTICATE(self, tag, args): 382 self._send_textline('+') 383 self.server.response = yield 384 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 385 client, server = self._setup(MyServer) 386 code, _ = client.authenticate('MYAUTH', lambda x: 'fake') 387 self.assertEqual(code, 'OK') 388 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' 389 390 @hashlib_helper.requires_hashdigest('md5', openssl=True) 391 def test_login_cram_md5_bytes(self): 392 class AuthHandler(SimpleIMAPHandler): 393 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' 394 def cmd_AUTHENTICATE(self, tag, args): 395 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' 396 'VzdG9uLm1jaS5uZXQ=') 397 r = yield 398 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' 399 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): 400 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') 401 else: 402 self._send_tagged(tag, 'NO', 'No access') 403 client, _ = self._setup(AuthHandler) 404 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 405 ret, _ = client.login_cram_md5("tim", b"tanstaaftanstaaf") 406 self.assertEqual(ret, "OK") 407 408 @hashlib_helper.requires_hashdigest('md5', openssl=True) 409 def test_login_cram_md5_plain_text(self): 410 class AuthHandler(SimpleIMAPHandler): 411 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' 412 def cmd_AUTHENTICATE(self, tag, args): 413 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' 414 'VzdG9uLm1jaS5uZXQ=') 415 r = yield 416 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' 417 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): 418 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') 419 else: 420 self._send_tagged(tag, 'NO', 'No access') 421 client, _ = self._setup(AuthHandler) 422 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 423 ret, _ = client.login_cram_md5("tim", "tanstaaftanstaaf") 424 self.assertEqual(ret, "OK") 425 426 def test_aborted_authentication(self): 427 class MyServer(SimpleIMAPHandler): 428 def cmd_AUTHENTICATE(self, tag, args): 429 self._send_textline('+') 430 self.response = yield 431 if self.response == b'*\r\n': 432 self._send_tagged( 433 tag, 434 'NO', 435 '[AUTHENTICATIONFAILED] aborted') 436 else: 437 self._send_tagged(tag, 'OK', 'MYAUTH successful') 438 client, _ = self._setup(MyServer) 439 with self.assertRaisesRegex(imaplib.IMAP4.error, 440 r'\[AUTHENTICATIONFAILED\] aborted'): 441 client.authenticate('MYAUTH', lambda x: None) 442 443 @mock.patch('imaplib._MAXLINE', 10) 444 def test_linetoolong(self): 445 class TooLongHandler(SimpleIMAPHandler): 446 def handle(self): 447 # send response line longer than the limit set in the next line 448 self.wfile.write(b'* OK ' + 11 * b'x' + b'\r\n') 449 _, server = self._setup(TooLongHandler, connect=False) 450 with self.assertRaisesRegex(imaplib.IMAP4.error, 451 'got more than 10 bytes'): 452 self.imap_class(*server.server_address) 453 454 def test_simple_with_statement(self): 455 _, server = self._setup(SimpleIMAPHandler, connect=False) 456 with self.imap_class(*server.server_address): 457 pass 458 459 def test_imaplib_timeout_test(self): 460 _, server = self._setup(SimpleIMAPHandler) 461 addr = server.server_address[1] 462 client = self.imap_class("localhost", addr, timeout=None) 463 self.assertEqual(client.sock.timeout, None) 464 client.shutdown() 465 client = self.imap_class("localhost", addr, timeout=support.LOOPBACK_TIMEOUT) 466 self.assertEqual(client.sock.timeout, support.LOOPBACK_TIMEOUT) 467 client.shutdown() 468 with self.assertRaises(ValueError): 469 client = self.imap_class("localhost", addr, timeout=0) 470 471 def test_imaplib_timeout_functionality_test(self): 472 class TimeoutHandler(SimpleIMAPHandler): 473 def handle(self): 474 time.sleep(1) 475 SimpleIMAPHandler.handle(self) 476 477 _, server = self._setup(TimeoutHandler) 478 addr = server.server_address[1] 479 with self.assertRaises(TimeoutError): 480 client = self.imap_class("localhost", addr, timeout=0.001) 481 482 def test_with_statement(self): 483 _, server = self._setup(SimpleIMAPHandler, connect=False) 484 with self.imap_class(*server.server_address) as imap: 485 imap.login('user', 'pass') 486 self.assertEqual(server.logged, 'user') 487 self.assertIsNone(server.logged) 488 489 def test_with_statement_logout(self): 490 # It is legal to log out explicitly inside the with block 491 _, server = self._setup(SimpleIMAPHandler, connect=False) 492 with self.imap_class(*server.server_address) as imap: 493 imap.login('user', 'pass') 494 self.assertEqual(server.logged, 'user') 495 imap.logout() 496 self.assertIsNone(server.logged) 497 self.assertIsNone(server.logged) 498 499 # command tests 500 501 def test_login(self): 502 client, _ = self._setup(SimpleIMAPHandler) 503 typ, data = client.login('user', 'pass') 504 self.assertEqual(typ, 'OK') 505 self.assertEqual(data[0], b'LOGIN completed') 506 self.assertEqual(client.state, 'AUTH') 507 508 def test_logout(self): 509 client, _ = self._setup(SimpleIMAPHandler) 510 typ, data = client.login('user', 'pass') 511 self.assertEqual(typ, 'OK') 512 self.assertEqual(data[0], b'LOGIN completed') 513 typ, data = client.logout() 514 self.assertEqual(typ, 'BYE', (typ, data)) 515 self.assertEqual(data[0], b'IMAP4ref1 Server logging out', (typ, data)) 516 self.assertEqual(client.state, 'LOGOUT') 517 518 def test_lsub(self): 519 class LsubCmd(SimpleIMAPHandler): 520 def cmd_LSUB(self, tag, args): 521 self._send_textline('* LSUB () "." directoryA') 522 return self._send_tagged(tag, 'OK', 'LSUB completed') 523 client, _ = self._setup(LsubCmd) 524 client.login('user', 'pass') 525 typ, data = client.lsub() 526 self.assertEqual(typ, 'OK') 527 self.assertEqual(data[0], b'() "." directoryA') 528 529 def test_unselect(self): 530 client, _ = self._setup(SimpleIMAPHandler) 531 client.login('user', 'pass') 532 typ, data = client.select() 533 self.assertEqual(typ, 'OK') 534 self.assertEqual(data[0], b'2') 535 536 typ, data = client.unselect() 537 self.assertEqual(typ, 'OK') 538 self.assertEqual(data[0], b'Returned to authenticated state. (Success)') 539 self.assertEqual(client.state, 'AUTH') 540 541 542class NewIMAPTests(NewIMAPTestsMixin, unittest.TestCase): 543 imap_class = imaplib.IMAP4 544 server_class = socketserver.TCPServer 545 546 547@unittest.skipUnless(ssl, "SSL not available") 548class NewIMAPSSLTests(NewIMAPTestsMixin, unittest.TestCase): 549 imap_class = IMAP4_SSL 550 server_class = SecureTCPServer 551 552 def test_ssl_raises(self): 553 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 554 self.assertEqual(ssl_context.verify_mode, ssl.CERT_REQUIRED) 555 self.assertEqual(ssl_context.check_hostname, True) 556 ssl_context.load_verify_locations(CAFILE) 557 558 with self.assertRaisesRegex(ssl.CertificateError, 559 "IP address mismatch, certificate is not valid for " 560 "'127.0.0.1'"): 561 _, server = self._setup(SimpleIMAPHandler) 562 client = self.imap_class(*server.server_address, 563 ssl_context=ssl_context) 564 client.shutdown() 565 566 def test_ssl_verified(self): 567 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 568 ssl_context.load_verify_locations(CAFILE) 569 570 _, server = self._setup(SimpleIMAPHandler) 571 client = self.imap_class("localhost", server.server_address[1], 572 ssl_context=ssl_context) 573 client.shutdown() 574 575 # Mock the private method _connect(), so mark the test as specific 576 # to CPython stdlib 577 @cpython_only 578 def test_certfile_arg_warn(self): 579 with warnings_helper.check_warnings(('', DeprecationWarning)): 580 with mock.patch.object(self.imap_class, 'open'): 581 with mock.patch.object(self.imap_class, '_connect'): 582 self.imap_class('localhost', 143, certfile=CERTFILE) 583 584class ThreadedNetworkedTests(unittest.TestCase): 585 server_class = socketserver.TCPServer 586 imap_class = imaplib.IMAP4 587 588 def make_server(self, addr, hdlr): 589 590 class MyServer(self.server_class): 591 def handle_error(self, request, client_address): 592 self.close_request(request) 593 self.server_close() 594 raise 595 596 if verbose: 597 print("creating server") 598 server = MyServer(addr, hdlr) 599 self.assertEqual(server.server_address, server.socket.getsockname()) 600 601 if verbose: 602 print("server created") 603 print("ADDR =", addr) 604 print("CLASS =", self.server_class) 605 print("HDLR =", server.RequestHandlerClass) 606 607 t = threading.Thread( 608 name='%s serving' % self.server_class, 609 target=server.serve_forever, 610 # Short poll interval to make the test finish quickly. 611 # Time between requests is short enough that we won't wake 612 # up spuriously too many times. 613 kwargs={'poll_interval': 0.01}) 614 t.daemon = True # In case this function raises. 615 t.start() 616 if verbose: 617 print("server running") 618 return server, t 619 620 def reap_server(self, server, thread): 621 if verbose: 622 print("waiting for server") 623 server.shutdown() 624 server.server_close() 625 thread.join() 626 if verbose: 627 print("done") 628 629 @contextmanager 630 def reaped_server(self, hdlr): 631 server, thread = self.make_server((socket_helper.HOST, 0), hdlr) 632 try: 633 yield server 634 finally: 635 self.reap_server(server, thread) 636 637 @contextmanager 638 def reaped_pair(self, hdlr): 639 with self.reaped_server(hdlr) as server: 640 client = self.imap_class(*server.server_address) 641 try: 642 yield server, client 643 finally: 644 client.logout() 645 646 @threading_helper.reap_threads 647 def test_connect(self): 648 with self.reaped_server(SimpleIMAPHandler) as server: 649 client = self.imap_class(*server.server_address) 650 client.shutdown() 651 652 @threading_helper.reap_threads 653 def test_bracket_flags(self): 654 655 # This violates RFC 3501, which disallows ']' characters in tag names, 656 # but imaplib has allowed producing such tags forever, other programs 657 # also produce them (eg: OtherInbox's Organizer app as of 20140716), 658 # and Gmail, for example, accepts them and produces them. So we 659 # support them. See issue #21815. 660 661 class BracketFlagHandler(SimpleIMAPHandler): 662 663 def handle(self): 664 self.flags = ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft'] 665 super().handle() 666 667 def cmd_AUTHENTICATE(self, tag, args): 668 self._send_textline('+') 669 self.server.response = yield 670 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 671 672 def cmd_SELECT(self, tag, args): 673 flag_msg = ' \\'.join(self.flags) 674 self._send_line(('* FLAGS (%s)' % flag_msg).encode('ascii')) 675 self._send_line(b'* 2 EXISTS') 676 self._send_line(b'* 0 RECENT') 677 msg = ('* OK [PERMANENTFLAGS %s \\*)] Flags permitted.' 678 % flag_msg) 679 self._send_line(msg.encode('ascii')) 680 self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') 681 682 def cmd_STORE(self, tag, args): 683 new_flags = args[2].strip('(').strip(')').split() 684 self.flags.extend(new_flags) 685 flags_msg = '(FLAGS (%s))' % ' \\'.join(self.flags) 686 msg = '* %s FETCH %s' % (args[0], flags_msg) 687 self._send_line(msg.encode('ascii')) 688 self._send_tagged(tag, 'OK', 'STORE completed.') 689 690 with self.reaped_pair(BracketFlagHandler) as (server, client): 691 code, data = client.authenticate('MYAUTH', lambda x: b'fake') 692 self.assertEqual(code, 'OK') 693 self.assertEqual(server.response, b'ZmFrZQ==\r\n') 694 client.select('test') 695 typ, [data] = client.store(b'1', "+FLAGS", "[test]") 696 self.assertIn(b'[test]', data) 697 client.select('test') 698 typ, [data] = client.response('PERMANENTFLAGS') 699 self.assertIn(b'[test]', data) 700 701 @threading_helper.reap_threads 702 def test_issue5949(self): 703 704 class EOFHandler(socketserver.StreamRequestHandler): 705 def handle(self): 706 # EOF without sending a complete welcome message. 707 self.wfile.write(b'* OK') 708 709 with self.reaped_server(EOFHandler) as server: 710 self.assertRaises(imaplib.IMAP4.abort, 711 self.imap_class, *server.server_address) 712 713 @threading_helper.reap_threads 714 def test_line_termination(self): 715 716 class BadNewlineHandler(SimpleIMAPHandler): 717 718 def cmd_CAPABILITY(self, tag, args): 719 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') 720 self._send_tagged(tag, 'OK', 'CAPABILITY completed') 721 722 with self.reaped_server(BadNewlineHandler) as server: 723 self.assertRaises(imaplib.IMAP4.abort, 724 self.imap_class, *server.server_address) 725 726 class UTF8Server(SimpleIMAPHandler): 727 capabilities = 'AUTH ENABLE UTF8=ACCEPT' 728 729 def cmd_ENABLE(self, tag, args): 730 self._send_tagged(tag, 'OK', 'ENABLE successful') 731 732 def cmd_AUTHENTICATE(self, tag, args): 733 self._send_textline('+') 734 self.server.response = yield 735 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 736 737 @threading_helper.reap_threads 738 def test_enable_raises_error_if_not_AUTH(self): 739 with self.reaped_pair(self.UTF8Server) as (server, client): 740 self.assertFalse(client.utf8_enabled) 741 self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo') 742 self.assertFalse(client.utf8_enabled) 743 744 # XXX Also need a test that enable after SELECT raises an error. 745 746 @threading_helper.reap_threads 747 def test_enable_raises_error_if_no_capability(self): 748 class NoEnableServer(self.UTF8Server): 749 capabilities = 'AUTH' 750 with self.reaped_pair(NoEnableServer) as (server, client): 751 self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo') 752 753 @threading_helper.reap_threads 754 def test_enable_UTF8_raises_error_if_not_supported(self): 755 class NonUTF8Server(SimpleIMAPHandler): 756 pass 757 with self.assertRaises(imaplib.IMAP4.error): 758 with self.reaped_pair(NonUTF8Server) as (server, client): 759 typ, data = client.login('user', 'pass') 760 self.assertEqual(typ, 'OK') 761 client.enable('UTF8=ACCEPT') 762 pass 763 764 @threading_helper.reap_threads 765 def test_enable_UTF8_True_append(self): 766 767 class UTF8AppendServer(self.UTF8Server): 768 def cmd_APPEND(self, tag, args): 769 self._send_textline('+') 770 self.server.response = yield 771 self._send_tagged(tag, 'OK', 'okay') 772 773 with self.reaped_pair(UTF8AppendServer) as (server, client): 774 self.assertEqual(client._encoding, 'ascii') 775 code, _ = client.authenticate('MYAUTH', lambda x: b'fake') 776 self.assertEqual(code, 'OK') 777 self.assertEqual(server.response, 778 b'ZmFrZQ==\r\n') # b64 encoded 'fake' 779 code, _ = client.enable('UTF8=ACCEPT') 780 self.assertEqual(code, 'OK') 781 self.assertEqual(client._encoding, 'utf-8') 782 msg_string = 'Subject: üñí©öðé' 783 typ, data = client.append( 784 None, None, None, msg_string.encode('utf-8')) 785 self.assertEqual(typ, 'OK') 786 self.assertEqual( 787 server.response, 788 ('UTF8 (%s)\r\n' % msg_string).encode('utf-8') 789 ) 790 791 # XXX also need a test that makes sure that the Literal and Untagged_status 792 # regexes uses unicode in UTF8 mode instead of the default ASCII. 793 794 @threading_helper.reap_threads 795 def test_search_disallows_charset_in_utf8_mode(self): 796 with self.reaped_pair(self.UTF8Server) as (server, client): 797 typ, _ = client.authenticate('MYAUTH', lambda x: b'fake') 798 self.assertEqual(typ, 'OK') 799 typ, _ = client.enable('UTF8=ACCEPT') 800 self.assertEqual(typ, 'OK') 801 self.assertTrue(client.utf8_enabled) 802 self.assertRaises(imaplib.IMAP4.error, client.search, 'foo', 'bar') 803 804 @threading_helper.reap_threads 805 def test_bad_auth_name(self): 806 807 class MyServer(SimpleIMAPHandler): 808 809 def cmd_AUTHENTICATE(self, tag, args): 810 self._send_tagged(tag, 'NO', 'unrecognized authentication ' 811 'type {}'.format(args[0])) 812 813 with self.reaped_pair(MyServer) as (server, client): 814 with self.assertRaises(imaplib.IMAP4.error): 815 client.authenticate('METHOD', lambda: 1) 816 817 @threading_helper.reap_threads 818 def test_invalid_authentication(self): 819 820 class MyServer(SimpleIMAPHandler): 821 822 def cmd_AUTHENTICATE(self, tag, args): 823 self._send_textline('+') 824 self.response = yield 825 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid') 826 827 with self.reaped_pair(MyServer) as (server, client): 828 with self.assertRaises(imaplib.IMAP4.error): 829 code, data = client.authenticate('MYAUTH', lambda x: b'fake') 830 831 @threading_helper.reap_threads 832 def test_valid_authentication(self): 833 834 class MyServer(SimpleIMAPHandler): 835 836 def cmd_AUTHENTICATE(self, tag, args): 837 self._send_textline('+') 838 self.server.response = yield 839 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 840 841 with self.reaped_pair(MyServer) as (server, client): 842 code, data = client.authenticate('MYAUTH', lambda x: b'fake') 843 self.assertEqual(code, 'OK') 844 self.assertEqual(server.response, 845 b'ZmFrZQ==\r\n') # b64 encoded 'fake' 846 847 with self.reaped_pair(MyServer) as (server, client): 848 code, data = client.authenticate('MYAUTH', lambda x: 'fake') 849 self.assertEqual(code, 'OK') 850 self.assertEqual(server.response, 851 b'ZmFrZQ==\r\n') # b64 encoded 'fake' 852 853 @threading_helper.reap_threads 854 @hashlib_helper.requires_hashdigest('md5', openssl=True) 855 def test_login_cram_md5(self): 856 857 class AuthHandler(SimpleIMAPHandler): 858 859 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' 860 861 def cmd_AUTHENTICATE(self, tag, args): 862 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' 863 'VzdG9uLm1jaS5uZXQ=') 864 r = yield 865 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' 866 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): 867 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') 868 else: 869 self._send_tagged(tag, 'NO', 'No access') 870 871 with self.reaped_pair(AuthHandler) as (server, client): 872 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 873 ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf") 874 self.assertEqual(ret, "OK") 875 876 with self.reaped_pair(AuthHandler) as (server, client): 877 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 878 ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf") 879 self.assertEqual(ret, "OK") 880 881 882 @threading_helper.reap_threads 883 def test_aborted_authentication(self): 884 885 class MyServer(SimpleIMAPHandler): 886 887 def cmd_AUTHENTICATE(self, tag, args): 888 self._send_textline('+') 889 self.response = yield 890 891 if self.response == b'*\r\n': 892 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] aborted') 893 else: 894 self._send_tagged(tag, 'OK', 'MYAUTH successful') 895 896 with self.reaped_pair(MyServer) as (server, client): 897 with self.assertRaises(imaplib.IMAP4.error): 898 code, data = client.authenticate('MYAUTH', lambda x: None) 899 900 901 def test_linetoolong(self): 902 class TooLongHandler(SimpleIMAPHandler): 903 def handle(self): 904 # Send a very long response line 905 self.wfile.write(b'* OK ' + imaplib._MAXLINE * b'x' + b'\r\n') 906 907 with self.reaped_server(TooLongHandler) as server: 908 self.assertRaises(imaplib.IMAP4.error, 909 self.imap_class, *server.server_address) 910 911 @threading_helper.reap_threads 912 def test_simple_with_statement(self): 913 # simplest call 914 with self.reaped_server(SimpleIMAPHandler) as server: 915 with self.imap_class(*server.server_address): 916 pass 917 918 @threading_helper.reap_threads 919 def test_with_statement(self): 920 with self.reaped_server(SimpleIMAPHandler) as server: 921 with self.imap_class(*server.server_address) as imap: 922 imap.login('user', 'pass') 923 self.assertEqual(server.logged, 'user') 924 self.assertIsNone(server.logged) 925 926 @threading_helper.reap_threads 927 def test_with_statement_logout(self): 928 # what happens if already logout in the block? 929 with self.reaped_server(SimpleIMAPHandler) as server: 930 with self.imap_class(*server.server_address) as imap: 931 imap.login('user', 'pass') 932 self.assertEqual(server.logged, 'user') 933 imap.logout() 934 self.assertIsNone(server.logged) 935 self.assertIsNone(server.logged) 936 937 @threading_helper.reap_threads 938 @cpython_only 939 def test_dump_ur(self): 940 # See: http://bugs.python.org/issue26543 941 untagged_resp_dict = {'READ-WRITE': [b'']} 942 943 with self.reaped_server(SimpleIMAPHandler) as server: 944 with self.imap_class(*server.server_address) as imap: 945 with mock.patch.object(imap, '_mesg') as mock_mesg: 946 imap._dump_ur(untagged_resp_dict) 947 mock_mesg.assert_called_with( 948 "untagged responses dump:READ-WRITE: [b'']" 949 ) 950 951 952@unittest.skipUnless(ssl, "SSL not available") 953class ThreadedNetworkedTestsSSL(ThreadedNetworkedTests): 954 server_class = SecureTCPServer 955 imap_class = IMAP4_SSL 956 957 @threading_helper.reap_threads 958 def test_ssl_verified(self): 959 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 960 ssl_context.load_verify_locations(CAFILE) 961 962 with self.assertRaisesRegex( 963 ssl.CertificateError, 964 "IP address mismatch, certificate is not valid for " 965 "'127.0.0.1'"): 966 with self.reaped_server(SimpleIMAPHandler) as server: 967 client = self.imap_class(*server.server_address, 968 ssl_context=ssl_context) 969 client.shutdown() 970 971 with self.reaped_server(SimpleIMAPHandler) as server: 972 client = self.imap_class("localhost", server.server_address[1], 973 ssl_context=ssl_context) 974 client.shutdown() 975 976 977@unittest.skipUnless( 978 support.is_resource_enabled('network'), 'network resource disabled') 979@unittest.skip('cyrus.andrew.cmu.edu blocks connections') 980class RemoteIMAPTest(unittest.TestCase): 981 host = 'cyrus.andrew.cmu.edu' 982 port = 143 983 username = 'anonymous' 984 password = 'pass' 985 imap_class = imaplib.IMAP4 986 987 def setUp(self): 988 with socket_helper.transient_internet(self.host): 989 self.server = self.imap_class(self.host, self.port) 990 991 def tearDown(self): 992 if self.server is not None: 993 with socket_helper.transient_internet(self.host): 994 self.server.logout() 995 996 def test_logincapa(self): 997 with socket_helper.transient_internet(self.host): 998 for cap in self.server.capabilities: 999 self.assertIsInstance(cap, str) 1000 self.assertIn('LOGINDISABLED', self.server.capabilities) 1001 self.assertIn('AUTH=ANONYMOUS', self.server.capabilities) 1002 rs = self.server.login(self.username, self.password) 1003 self.assertEqual(rs[0], 'OK') 1004 1005 def test_logout(self): 1006 with socket_helper.transient_internet(self.host): 1007 rs = self.server.logout() 1008 self.server = None 1009 self.assertEqual(rs[0], 'BYE', rs) 1010 1011 1012@unittest.skipUnless(ssl, "SSL not available") 1013@unittest.skipUnless( 1014 support.is_resource_enabled('network'), 'network resource disabled') 1015@unittest.skip('cyrus.andrew.cmu.edu blocks connections') 1016class RemoteIMAP_STARTTLSTest(RemoteIMAPTest): 1017 1018 def setUp(self): 1019 super().setUp() 1020 with socket_helper.transient_internet(self.host): 1021 rs = self.server.starttls() 1022 self.assertEqual(rs[0], 'OK') 1023 1024 def test_logincapa(self): 1025 for cap in self.server.capabilities: 1026 self.assertIsInstance(cap, str) 1027 self.assertNotIn('LOGINDISABLED', self.server.capabilities) 1028 1029 1030@unittest.skipUnless(ssl, "SSL not available") 1031@unittest.skip('cyrus.andrew.cmu.edu blocks connections') 1032class RemoteIMAP_SSLTest(RemoteIMAPTest): 1033 port = 993 1034 imap_class = IMAP4_SSL 1035 1036 def setUp(self): 1037 pass 1038 1039 def tearDown(self): 1040 pass 1041 1042 def create_ssl_context(self): 1043 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1044 ssl_context.check_hostname = False 1045 ssl_context.verify_mode = ssl.CERT_NONE 1046 ssl_context.load_cert_chain(CERTFILE) 1047 return ssl_context 1048 1049 def check_logincapa(self, server): 1050 try: 1051 for cap in server.capabilities: 1052 self.assertIsInstance(cap, str) 1053 self.assertNotIn('LOGINDISABLED', server.capabilities) 1054 self.assertIn('AUTH=PLAIN', server.capabilities) 1055 rs = server.login(self.username, self.password) 1056 self.assertEqual(rs[0], 'OK') 1057 finally: 1058 server.logout() 1059 1060 def test_logincapa(self): 1061 with socket_helper.transient_internet(self.host): 1062 _server = self.imap_class(self.host, self.port) 1063 self.check_logincapa(_server) 1064 1065 def test_logout(self): 1066 with socket_helper.transient_internet(self.host): 1067 _server = self.imap_class(self.host, self.port) 1068 rs = _server.logout() 1069 self.assertEqual(rs[0], 'BYE', rs) 1070 1071 def test_ssl_context_certfile_exclusive(self): 1072 with socket_helper.transient_internet(self.host): 1073 self.assertRaises( 1074 ValueError, self.imap_class, self.host, self.port, 1075 certfile=CERTFILE, ssl_context=self.create_ssl_context()) 1076 1077 def test_ssl_context_keyfile_exclusive(self): 1078 with socket_helper.transient_internet(self.host): 1079 self.assertRaises( 1080 ValueError, self.imap_class, self.host, self.port, 1081 keyfile=CERTFILE, ssl_context=self.create_ssl_context()) 1082 1083 1084if __name__ == "__main__": 1085 unittest.main() 1086