1from test import support 2from test.support import socket_helper 3 4from contextlib import contextmanager 5import imaplib 6import os.path 7import socketserver 8import time 9import calendar 10import threading 11import socket 12 13from test.support import (reap_threads, verbose, 14 run_with_tz, run_with_locale, cpython_only) 15from test.support import hashlib_helper 16import unittest 17from unittest import mock 18from datetime import datetime, timezone, timedelta 19try: 20 import ssl 21except ImportError: 22 ssl = None 23 24CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem") 25CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem") 26 27 28class TestImaplib(unittest.TestCase): 29 30 def test_Internaldate2tuple(self): 31 t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1)) 32 tt = imaplib.Internaldate2tuple( 33 b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")') 34 self.assertEqual(time.mktime(tt), t0) 35 tt = imaplib.Internaldate2tuple( 36 b'25 (INTERNALDATE "01-Jan-2000 11:30:00 +1130")') 37 self.assertEqual(time.mktime(tt), t0) 38 tt = imaplib.Internaldate2tuple( 39 b'25 (INTERNALDATE "31-Dec-1999 12:30:00 -1130")') 40 self.assertEqual(time.mktime(tt), t0) 41 42 @run_with_tz('MST+07MDT,M4.1.0,M10.5.0') 43 def test_Internaldate2tuple_issue10941(self): 44 self.assertNotEqual(imaplib.Internaldate2tuple( 45 b'25 (INTERNALDATE "02-Apr-2000 02:30:00 +0000")'), 46 imaplib.Internaldate2tuple( 47 b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")')) 48 49 def timevalues(self): 50 return [2000000000, 2000000000.0, time.localtime(2000000000), 51 (2033, 5, 18, 5, 33, 20, -1, -1, -1), 52 (2033, 5, 18, 5, 33, 20, -1, -1, 1), 53 datetime.fromtimestamp(2000000000, 54 timezone(timedelta(0, 2 * 60 * 60))), 55 '"18-May-2033 05:33:20 +0200"'] 56 57 @run_with_locale('LC_ALL', 'de_DE', 'fr_FR') 58 # DST rules included to work around quirk where the Gnu C library may not 59 # otherwise restore the previous time zone 60 @run_with_tz('STD-1DST,M3.2.0,M11.1.0') 61 def test_Time2Internaldate(self): 62 expected = '"18-May-2033 05:33:20 +0200"' 63 64 for t in self.timevalues(): 65 internal = imaplib.Time2Internaldate(t) 66 self.assertEqual(internal, expected) 67 68 def test_that_Time2Internaldate_returns_a_result(self): 69 # Without tzset, we can check only that it successfully 70 # produces a result, not the correctness of the result itself, 71 # since the result depends on the timezone the machine is in. 72 for t in self.timevalues(): 73 imaplib.Time2Internaldate(t) 74 75 def test_imap4_host_default_value(self): 76 # Check whether the IMAP4_PORT is truly unavailable. 77 with socket.socket() as s: 78 try: 79 s.connect(('', imaplib.IMAP4_PORT)) 80 self.skipTest( 81 "Cannot run the test with local IMAP server running.") 82 except socket.error: 83 pass 84 85 # This is the exception that should be raised. 86 expected_errnos = socket_helper.get_socket_conn_refused_errs() 87 with self.assertRaises(OSError) as cm: 88 imaplib.IMAP4() 89 self.assertIn(cm.exception.errno, expected_errnos) 90 91 92if ssl: 93 class SecureTCPServer(socketserver.TCPServer): 94 95 def get_request(self): 96 newsocket, fromaddr = self.socket.accept() 97 context = ssl.SSLContext() 98 context.load_cert_chain(CERTFILE) 99 connstream = context.wrap_socket(newsocket, server_side=True) 100 return connstream, fromaddr 101 102 IMAP4_SSL = imaplib.IMAP4_SSL 103 104else: 105 106 class SecureTCPServer: 107 pass 108 109 IMAP4_SSL = None 110 111 112class SimpleIMAPHandler(socketserver.StreamRequestHandler): 113 timeout = support.LOOPBACK_TIMEOUT 114 continuation = None 115 capabilities = '' 116 117 def setup(self): 118 super().setup() 119 self.server.is_selected = False 120 self.server.logged = None 121 122 def _send(self, message): 123 if verbose: 124 print("SENT: %r" % message.strip()) 125 self.wfile.write(message) 126 127 def _send_line(self, message): 128 self._send(message + b'\r\n') 129 130 def _send_textline(self, message): 131 self._send_line(message.encode('ASCII')) 132 133 def _send_tagged(self, tag, code, message): 134 self._send_textline(' '.join((tag, code, message))) 135 136 def handle(self): 137 # Send a welcome message. 138 self._send_textline('* OK IMAP4rev1') 139 while 1: 140 # Gather up input until we receive a line terminator or we timeout. 141 # Accumulate read(1) because it's simpler to handle the differences 142 # between naked sockets and SSL sockets. 143 line = b'' 144 while 1: 145 try: 146 part = self.rfile.read(1) 147 if part == b'': 148 # Naked sockets return empty strings.. 149 return 150 line += part 151 except OSError: 152 # ..but SSLSockets raise exceptions. 153 return 154 if line.endswith(b'\r\n'): 155 break 156 157 if verbose: 158 print('GOT: %r' % line.strip()) 159 if self.continuation: 160 try: 161 self.continuation.send(line) 162 except StopIteration: 163 self.continuation = None 164 continue 165 splitline = line.decode('ASCII').split() 166 tag = splitline[0] 167 cmd = splitline[1] 168 args = splitline[2:] 169 170 if hasattr(self, 'cmd_' + cmd): 171 continuation = getattr(self, 'cmd_' + cmd)(tag, args) 172 if continuation: 173 self.continuation = continuation 174 next(continuation) 175 else: 176 self._send_tagged(tag, 'BAD', cmd + ' unknown') 177 178 def cmd_CAPABILITY(self, tag, args): 179 caps = ('IMAP4rev1 ' + self.capabilities 180 if self.capabilities 181 else 'IMAP4rev1') 182 self._send_textline('* CAPABILITY ' + caps) 183 self._send_tagged(tag, 'OK', 'CAPABILITY completed') 184 185 def cmd_LOGOUT(self, tag, args): 186 self.server.logged = None 187 self._send_textline('* BYE IMAP4ref1 Server logging out') 188 self._send_tagged(tag, 'OK', 'LOGOUT completed') 189 190 def cmd_LOGIN(self, tag, args): 191 self.server.logged = args[0] 192 self._send_tagged(tag, 'OK', 'LOGIN completed') 193 194 def cmd_SELECT(self, tag, args): 195 self.server.is_selected = True 196 self._send_line(b'* 2 EXISTS') 197 self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') 198 199 def cmd_UNSELECT(self, tag, args): 200 if self.server.is_selected: 201 self.server.is_selected = False 202 self._send_tagged(tag, 'OK', 'Returned to authenticated state. (Success)') 203 else: 204 self._send_tagged(tag, 'BAD', 'No mailbox selected') 205 206 207class NewIMAPTestsMixin(): 208 client = None 209 210 def _setup(self, imap_handler, connect=True): 211 """ 212 Sets up imap_handler for tests. imap_handler should inherit from either: 213 - SimpleIMAPHandler - for testing IMAP commands, 214 - socketserver.StreamRequestHandler - if raw access to stream is needed. 215 Returns (client, server). 216 """ 217 class TestTCPServer(self.server_class): 218 def handle_error(self, request, client_address): 219 """ 220 End request and raise the error if one occurs. 221 """ 222 self.close_request(request) 223 self.server_close() 224 raise 225 226 self.addCleanup(self._cleanup) 227 self.server = self.server_class((socket_helper.HOST, 0), imap_handler) 228 self.thread = threading.Thread( 229 name=self._testMethodName+'-server', 230 target=self.server.serve_forever, 231 # Short poll interval to make the test finish quickly. 232 # Time between requests is short enough that we won't wake 233 # up spuriously too many times. 234 kwargs={'poll_interval': 0.01}) 235 self.thread.daemon = True # In case this function raises. 236 self.thread.start() 237 238 if connect: 239 self.client = self.imap_class(*self.server.server_address) 240 241 return self.client, self.server 242 243 def _cleanup(self): 244 """ 245 Cleans up the test server. This method should not be called manually, 246 it is added to the cleanup queue in the _setup method already. 247 """ 248 # if logout was called already we'd raise an exception trying to 249 # shutdown the client once again 250 if self.client is not None and self.client.state != 'LOGOUT': 251 self.client.shutdown() 252 # cleanup the server 253 self.server.shutdown() 254 self.server.server_close() 255 support.join_thread(self.thread) 256 # Explicitly clear the attribute to prevent dangling thread 257 self.thread = None 258 259 def test_EOF_without_complete_welcome_message(self): 260 # http://bugs.python.org/issue5949 261 class EOFHandler(socketserver.StreamRequestHandler): 262 def handle(self): 263 self.wfile.write(b'* OK') 264 _, server = self._setup(EOFHandler, connect=False) 265 self.assertRaises(imaplib.IMAP4.abort, self.imap_class, 266 *server.server_address) 267 268 def test_line_termination(self): 269 class BadNewlineHandler(SimpleIMAPHandler): 270 def cmd_CAPABILITY(self, tag, args): 271 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') 272 self._send_tagged(tag, 'OK', 'CAPABILITY completed') 273 _, server = self._setup(BadNewlineHandler, connect=False) 274 self.assertRaises(imaplib.IMAP4.abort, self.imap_class, 275 *server.server_address) 276 277 def test_enable_raises_error_if_not_AUTH(self): 278 class EnableHandler(SimpleIMAPHandler): 279 capabilities = 'AUTH ENABLE UTF8=ACCEPT' 280 client, _ = self._setup(EnableHandler) 281 self.assertFalse(client.utf8_enabled) 282 with self.assertRaisesRegex(imaplib.IMAP4.error, 'ENABLE.*NONAUTH'): 283 client.enable('foo') 284 self.assertFalse(client.utf8_enabled) 285 286 def test_enable_raises_error_if_no_capability(self): 287 client, _ = self._setup(SimpleIMAPHandler) 288 with self.assertRaisesRegex(imaplib.IMAP4.error, 289 'does not support ENABLE'): 290 client.enable('foo') 291 292 def test_enable_UTF8_raises_error_if_not_supported(self): 293 client, _ = self._setup(SimpleIMAPHandler) 294 typ, data = client.login('user', 'pass') 295 self.assertEqual(typ, 'OK') 296 with self.assertRaisesRegex(imaplib.IMAP4.error, 297 'does not support ENABLE'): 298 client.enable('UTF8=ACCEPT') 299 300 def test_enable_UTF8_True_append(self): 301 class UTF8AppendServer(SimpleIMAPHandler): 302 capabilities = 'ENABLE UTF8=ACCEPT' 303 def cmd_ENABLE(self, tag, args): 304 self._send_tagged(tag, 'OK', 'ENABLE successful') 305 def cmd_AUTHENTICATE(self, tag, args): 306 self._send_textline('+') 307 self.server.response = yield 308 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 309 def cmd_APPEND(self, tag, args): 310 self._send_textline('+') 311 self.server.response = yield 312 self._send_tagged(tag, 'OK', 'okay') 313 client, server = self._setup(UTF8AppendServer) 314 self.assertEqual(client._encoding, 'ascii') 315 code, _ = client.authenticate('MYAUTH', lambda x: b'fake') 316 self.assertEqual(code, 'OK') 317 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' 318 code, _ = client.enable('UTF8=ACCEPT') 319 self.assertEqual(code, 'OK') 320 self.assertEqual(client._encoding, 'utf-8') 321 msg_string = 'Subject: üñí©öðé' 322 typ, data = client.append(None, None, None, msg_string.encode('utf-8')) 323 self.assertEqual(typ, 'OK') 324 self.assertEqual(server.response, 325 ('UTF8 (%s)\r\n' % msg_string).encode('utf-8')) 326 327 def test_search_disallows_charset_in_utf8_mode(self): 328 class UTF8Server(SimpleIMAPHandler): 329 capabilities = 'AUTH ENABLE UTF8=ACCEPT' 330 def cmd_ENABLE(self, tag, args): 331 self._send_tagged(tag, 'OK', 'ENABLE successful') 332 def cmd_AUTHENTICATE(self, tag, args): 333 self._send_textline('+') 334 self.server.response = yield 335 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 336 client, _ = self._setup(UTF8Server) 337 typ, _ = client.authenticate('MYAUTH', lambda x: b'fake') 338 self.assertEqual(typ, 'OK') 339 typ, _ = client.enable('UTF8=ACCEPT') 340 self.assertEqual(typ, 'OK') 341 self.assertTrue(client.utf8_enabled) 342 with self.assertRaisesRegex(imaplib.IMAP4.error, 'charset.*UTF8'): 343 client.search('foo', 'bar') 344 345 def test_bad_auth_name(self): 346 class MyServer(SimpleIMAPHandler): 347 def cmd_AUTHENTICATE(self, tag, args): 348 self._send_tagged(tag, 'NO', 349 'unrecognized authentication type {}'.format(args[0])) 350 client, _ = self._setup(MyServer) 351 with self.assertRaisesRegex(imaplib.IMAP4.error, 352 'unrecognized authentication type METHOD'): 353 client.authenticate('METHOD', lambda: 1) 354 355 def test_invalid_authentication(self): 356 class MyServer(SimpleIMAPHandler): 357 def cmd_AUTHENTICATE(self, tag, args): 358 self._send_textline('+') 359 self.response = yield 360 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid') 361 client, _ = self._setup(MyServer) 362 with self.assertRaisesRegex(imaplib.IMAP4.error, 363 r'\[AUTHENTICATIONFAILED\] invalid'): 364 client.authenticate('MYAUTH', lambda x: b'fake') 365 366 def test_valid_authentication_bytes(self): 367 class MyServer(SimpleIMAPHandler): 368 def cmd_AUTHENTICATE(self, tag, args): 369 self._send_textline('+') 370 self.server.response = yield 371 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 372 client, server = self._setup(MyServer) 373 code, _ = client.authenticate('MYAUTH', lambda x: b'fake') 374 self.assertEqual(code, 'OK') 375 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' 376 377 def test_valid_authentication_plain_text(self): 378 class MyServer(SimpleIMAPHandler): 379 def cmd_AUTHENTICATE(self, tag, args): 380 self._send_textline('+') 381 self.server.response = yield 382 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 383 client, server = self._setup(MyServer) 384 code, _ = client.authenticate('MYAUTH', lambda x: 'fake') 385 self.assertEqual(code, 'OK') 386 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' 387 388 @hashlib_helper.requires_hashdigest('md5') 389 def test_login_cram_md5_bytes(self): 390 class AuthHandler(SimpleIMAPHandler): 391 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' 392 def cmd_AUTHENTICATE(self, tag, args): 393 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' 394 'VzdG9uLm1jaS5uZXQ=') 395 r = yield 396 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' 397 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): 398 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') 399 else: 400 self._send_tagged(tag, 'NO', 'No access') 401 client, _ = self._setup(AuthHandler) 402 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 403 ret, _ = client.login_cram_md5("tim", b"tanstaaftanstaaf") 404 self.assertEqual(ret, "OK") 405 406 @hashlib_helper.requires_hashdigest('md5') 407 def test_login_cram_md5_plain_text(self): 408 class AuthHandler(SimpleIMAPHandler): 409 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' 410 def cmd_AUTHENTICATE(self, tag, args): 411 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' 412 'VzdG9uLm1jaS5uZXQ=') 413 r = yield 414 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' 415 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): 416 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') 417 else: 418 self._send_tagged(tag, 'NO', 'No access') 419 client, _ = self._setup(AuthHandler) 420 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 421 ret, _ = client.login_cram_md5("tim", "tanstaaftanstaaf") 422 self.assertEqual(ret, "OK") 423 424 def test_aborted_authentication(self): 425 class MyServer(SimpleIMAPHandler): 426 def cmd_AUTHENTICATE(self, tag, args): 427 self._send_textline('+') 428 self.response = yield 429 if self.response == b'*\r\n': 430 self._send_tagged( 431 tag, 432 'NO', 433 '[AUTHENTICATIONFAILED] aborted') 434 else: 435 self._send_tagged(tag, 'OK', 'MYAUTH successful') 436 client, _ = self._setup(MyServer) 437 with self.assertRaisesRegex(imaplib.IMAP4.error, 438 r'\[AUTHENTICATIONFAILED\] aborted'): 439 client.authenticate('MYAUTH', lambda x: None) 440 441 @mock.patch('imaplib._MAXLINE', 10) 442 def test_linetoolong(self): 443 class TooLongHandler(SimpleIMAPHandler): 444 def handle(self): 445 # send response line longer than the limit set in the next line 446 self.wfile.write(b'* OK ' + 11 * b'x' + b'\r\n') 447 _, server = self._setup(TooLongHandler, connect=False) 448 with self.assertRaisesRegex(imaplib.IMAP4.error, 449 'got more than 10 bytes'): 450 self.imap_class(*server.server_address) 451 452 def test_simple_with_statement(self): 453 _, server = self._setup(SimpleIMAPHandler, connect=False) 454 with self.imap_class(*server.server_address): 455 pass 456 457 def test_imaplib_timeout_test(self): 458 _, server = self._setup(SimpleIMAPHandler) 459 addr = server.server_address[1] 460 client = self.imap_class("localhost", addr, timeout=None) 461 self.assertEqual(client.sock.timeout, None) 462 client.shutdown() 463 client = self.imap_class("localhost", addr, timeout=support.LOOPBACK_TIMEOUT) 464 self.assertEqual(client.sock.timeout, support.LOOPBACK_TIMEOUT) 465 client.shutdown() 466 with self.assertRaises(ValueError): 467 client = self.imap_class("localhost", addr, timeout=0) 468 469 def test_imaplib_timeout_functionality_test(self): 470 class TimeoutHandler(SimpleIMAPHandler): 471 def handle(self): 472 time.sleep(1) 473 SimpleIMAPHandler.handle(self) 474 475 _, server = self._setup(TimeoutHandler) 476 addr = server.server_address[1] 477 with self.assertRaises(socket.timeout): 478 client = self.imap_class("localhost", addr, timeout=0.001) 479 480 def test_with_statement(self): 481 _, server = self._setup(SimpleIMAPHandler, connect=False) 482 with self.imap_class(*server.server_address) as imap: 483 imap.login('user', 'pass') 484 self.assertEqual(server.logged, 'user') 485 self.assertIsNone(server.logged) 486 487 def test_with_statement_logout(self): 488 # It is legal to log out explicitly inside the with block 489 _, server = self._setup(SimpleIMAPHandler, connect=False) 490 with self.imap_class(*server.server_address) as imap: 491 imap.login('user', 'pass') 492 self.assertEqual(server.logged, 'user') 493 imap.logout() 494 self.assertIsNone(server.logged) 495 self.assertIsNone(server.logged) 496 497 # command tests 498 499 def test_login(self): 500 client, _ = self._setup(SimpleIMAPHandler) 501 typ, data = client.login('user', 'pass') 502 self.assertEqual(typ, 'OK') 503 self.assertEqual(data[0], b'LOGIN completed') 504 self.assertEqual(client.state, 'AUTH') 505 506 def test_logout(self): 507 client, _ = self._setup(SimpleIMAPHandler) 508 typ, data = client.login('user', 'pass') 509 self.assertEqual(typ, 'OK') 510 self.assertEqual(data[0], b'LOGIN completed') 511 typ, data = client.logout() 512 self.assertEqual(typ, 'BYE', (typ, data)) 513 self.assertEqual(data[0], b'IMAP4ref1 Server logging out', (typ, data)) 514 self.assertEqual(client.state, 'LOGOUT') 515 516 def test_lsub(self): 517 class LsubCmd(SimpleIMAPHandler): 518 def cmd_LSUB(self, tag, args): 519 self._send_textline('* LSUB () "." directoryA') 520 return self._send_tagged(tag, 'OK', 'LSUB completed') 521 client, _ = self._setup(LsubCmd) 522 client.login('user', 'pass') 523 typ, data = client.lsub() 524 self.assertEqual(typ, 'OK') 525 self.assertEqual(data[0], b'() "." directoryA') 526 527 def test_unselect(self): 528 client, _ = self._setup(SimpleIMAPHandler) 529 client.login('user', 'pass') 530 typ, data = client.select() 531 self.assertEqual(typ, 'OK') 532 self.assertEqual(data[0], b'2') 533 534 typ, data = client.unselect() 535 self.assertEqual(typ, 'OK') 536 self.assertEqual(data[0], b'Returned to authenticated state. (Success)') 537 self.assertEqual(client.state, 'AUTH') 538 539 540class NewIMAPTests(NewIMAPTestsMixin, unittest.TestCase): 541 imap_class = imaplib.IMAP4 542 server_class = socketserver.TCPServer 543 544 545@unittest.skipUnless(ssl, "SSL not available") 546class NewIMAPSSLTests(NewIMAPTestsMixin, unittest.TestCase): 547 imap_class = IMAP4_SSL 548 server_class = SecureTCPServer 549 550 def test_ssl_raises(self): 551 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 552 self.assertEqual(ssl_context.verify_mode, ssl.CERT_REQUIRED) 553 self.assertEqual(ssl_context.check_hostname, True) 554 ssl_context.load_verify_locations(CAFILE) 555 556 with self.assertRaisesRegex(ssl.CertificateError, 557 "IP address mismatch, certificate is not valid for " 558 "'127.0.0.1'"): 559 _, server = self._setup(SimpleIMAPHandler) 560 client = self.imap_class(*server.server_address, 561 ssl_context=ssl_context) 562 client.shutdown() 563 564 def test_ssl_verified(self): 565 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 566 ssl_context.load_verify_locations(CAFILE) 567 568 _, server = self._setup(SimpleIMAPHandler) 569 client = self.imap_class("localhost", server.server_address[1], 570 ssl_context=ssl_context) 571 client.shutdown() 572 573 # Mock the private method _connect(), so mark the test as specific 574 # to CPython stdlib 575 @cpython_only 576 def test_certfile_arg_warn(self): 577 with support.check_warnings(('', DeprecationWarning)): 578 with mock.patch.object(self.imap_class, 'open'): 579 with mock.patch.object(self.imap_class, '_connect'): 580 self.imap_class('localhost', 143, certfile=CERTFILE) 581 582class ThreadedNetworkedTests(unittest.TestCase): 583 server_class = socketserver.TCPServer 584 imap_class = imaplib.IMAP4 585 586 def make_server(self, addr, hdlr): 587 588 class MyServer(self.server_class): 589 def handle_error(self, request, client_address): 590 self.close_request(request) 591 self.server_close() 592 raise 593 594 if verbose: 595 print("creating server") 596 server = MyServer(addr, hdlr) 597 self.assertEqual(server.server_address, server.socket.getsockname()) 598 599 if verbose: 600 print("server created") 601 print("ADDR =", addr) 602 print("CLASS =", self.server_class) 603 print("HDLR =", server.RequestHandlerClass) 604 605 t = threading.Thread( 606 name='%s serving' % self.server_class, 607 target=server.serve_forever, 608 # Short poll interval to make the test finish quickly. 609 # Time between requests is short enough that we won't wake 610 # up spuriously too many times. 611 kwargs={'poll_interval': 0.01}) 612 t.daemon = True # In case this function raises. 613 t.start() 614 if verbose: 615 print("server running") 616 return server, t 617 618 def reap_server(self, server, thread): 619 if verbose: 620 print("waiting for server") 621 server.shutdown() 622 server.server_close() 623 thread.join() 624 if verbose: 625 print("done") 626 627 @contextmanager 628 def reaped_server(self, hdlr): 629 server, thread = self.make_server((socket_helper.HOST, 0), hdlr) 630 try: 631 yield server 632 finally: 633 self.reap_server(server, thread) 634 635 @contextmanager 636 def reaped_pair(self, hdlr): 637 with self.reaped_server(hdlr) as server: 638 client = self.imap_class(*server.server_address) 639 try: 640 yield server, client 641 finally: 642 client.logout() 643 644 @reap_threads 645 def test_connect(self): 646 with self.reaped_server(SimpleIMAPHandler) as server: 647 client = self.imap_class(*server.server_address) 648 client.shutdown() 649 650 @reap_threads 651 def test_bracket_flags(self): 652 653 # This violates RFC 3501, which disallows ']' characters in tag names, 654 # but imaplib has allowed producing such tags forever, other programs 655 # also produce them (eg: OtherInbox's Organizer app as of 20140716), 656 # and Gmail, for example, accepts them and produces them. So we 657 # support them. See issue #21815. 658 659 class BracketFlagHandler(SimpleIMAPHandler): 660 661 def handle(self): 662 self.flags = ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft'] 663 super().handle() 664 665 def cmd_AUTHENTICATE(self, tag, args): 666 self._send_textline('+') 667 self.server.response = yield 668 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 669 670 def cmd_SELECT(self, tag, args): 671 flag_msg = ' \\'.join(self.flags) 672 self._send_line(('* FLAGS (%s)' % flag_msg).encode('ascii')) 673 self._send_line(b'* 2 EXISTS') 674 self._send_line(b'* 0 RECENT') 675 msg = ('* OK [PERMANENTFLAGS %s \\*)] Flags permitted.' 676 % flag_msg) 677 self._send_line(msg.encode('ascii')) 678 self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') 679 680 def cmd_STORE(self, tag, args): 681 new_flags = args[2].strip('(').strip(')').split() 682 self.flags.extend(new_flags) 683 flags_msg = '(FLAGS (%s))' % ' \\'.join(self.flags) 684 msg = '* %s FETCH %s' % (args[0], flags_msg) 685 self._send_line(msg.encode('ascii')) 686 self._send_tagged(tag, 'OK', 'STORE completed.') 687 688 with self.reaped_pair(BracketFlagHandler) as (server, client): 689 code, data = client.authenticate('MYAUTH', lambda x: b'fake') 690 self.assertEqual(code, 'OK') 691 self.assertEqual(server.response, b'ZmFrZQ==\r\n') 692 client.select('test') 693 typ, [data] = client.store(b'1', "+FLAGS", "[test]") 694 self.assertIn(b'[test]', data) 695 client.select('test') 696 typ, [data] = client.response('PERMANENTFLAGS') 697 self.assertIn(b'[test]', data) 698 699 @reap_threads 700 def test_issue5949(self): 701 702 class EOFHandler(socketserver.StreamRequestHandler): 703 def handle(self): 704 # EOF without sending a complete welcome message. 705 self.wfile.write(b'* OK') 706 707 with self.reaped_server(EOFHandler) as server: 708 self.assertRaises(imaplib.IMAP4.abort, 709 self.imap_class, *server.server_address) 710 711 @reap_threads 712 def test_line_termination(self): 713 714 class BadNewlineHandler(SimpleIMAPHandler): 715 716 def cmd_CAPABILITY(self, tag, args): 717 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') 718 self._send_tagged(tag, 'OK', 'CAPABILITY completed') 719 720 with self.reaped_server(BadNewlineHandler) as server: 721 self.assertRaises(imaplib.IMAP4.abort, 722 self.imap_class, *server.server_address) 723 724 class UTF8Server(SimpleIMAPHandler): 725 capabilities = 'AUTH ENABLE UTF8=ACCEPT' 726 727 def cmd_ENABLE(self, tag, args): 728 self._send_tagged(tag, 'OK', 'ENABLE successful') 729 730 def cmd_AUTHENTICATE(self, tag, args): 731 self._send_textline('+') 732 self.server.response = yield 733 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 734 735 @reap_threads 736 def test_enable_raises_error_if_not_AUTH(self): 737 with self.reaped_pair(self.UTF8Server) as (server, client): 738 self.assertFalse(client.utf8_enabled) 739 self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo') 740 self.assertFalse(client.utf8_enabled) 741 742 # XXX Also need a test that enable after SELECT raises an error. 743 744 @reap_threads 745 def test_enable_raises_error_if_no_capability(self): 746 class NoEnableServer(self.UTF8Server): 747 capabilities = 'AUTH' 748 with self.reaped_pair(NoEnableServer) as (server, client): 749 self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo') 750 751 @reap_threads 752 def test_enable_UTF8_raises_error_if_not_supported(self): 753 class NonUTF8Server(SimpleIMAPHandler): 754 pass 755 with self.assertRaises(imaplib.IMAP4.error): 756 with self.reaped_pair(NonUTF8Server) as (server, client): 757 typ, data = client.login('user', 'pass') 758 self.assertEqual(typ, 'OK') 759 client.enable('UTF8=ACCEPT') 760 pass 761 762 @reap_threads 763 def test_enable_UTF8_True_append(self): 764 765 class UTF8AppendServer(self.UTF8Server): 766 def cmd_APPEND(self, tag, args): 767 self._send_textline('+') 768 self.server.response = yield 769 self._send_tagged(tag, 'OK', 'okay') 770 771 with self.reaped_pair(UTF8AppendServer) as (server, client): 772 self.assertEqual(client._encoding, 'ascii') 773 code, _ = client.authenticate('MYAUTH', lambda x: b'fake') 774 self.assertEqual(code, 'OK') 775 self.assertEqual(server.response, 776 b'ZmFrZQ==\r\n') # b64 encoded 'fake' 777 code, _ = client.enable('UTF8=ACCEPT') 778 self.assertEqual(code, 'OK') 779 self.assertEqual(client._encoding, 'utf-8') 780 msg_string = 'Subject: üñí©öðé' 781 typ, data = client.append( 782 None, None, None, msg_string.encode('utf-8')) 783 self.assertEqual(typ, 'OK') 784 self.assertEqual( 785 server.response, 786 ('UTF8 (%s)\r\n' % msg_string).encode('utf-8') 787 ) 788 789 # XXX also need a test that makes sure that the Literal and Untagged_status 790 # regexes uses unicode in UTF8 mode instead of the default ASCII. 791 792 @reap_threads 793 def test_search_disallows_charset_in_utf8_mode(self): 794 with self.reaped_pair(self.UTF8Server) as (server, client): 795 typ, _ = client.authenticate('MYAUTH', lambda x: b'fake') 796 self.assertEqual(typ, 'OK') 797 typ, _ = client.enable('UTF8=ACCEPT') 798 self.assertEqual(typ, 'OK') 799 self.assertTrue(client.utf8_enabled) 800 self.assertRaises(imaplib.IMAP4.error, client.search, 'foo', 'bar') 801 802 @reap_threads 803 def test_bad_auth_name(self): 804 805 class MyServer(SimpleIMAPHandler): 806 807 def cmd_AUTHENTICATE(self, tag, args): 808 self._send_tagged(tag, 'NO', 'unrecognized authentication ' 809 'type {}'.format(args[0])) 810 811 with self.reaped_pair(MyServer) as (server, client): 812 with self.assertRaises(imaplib.IMAP4.error): 813 client.authenticate('METHOD', lambda: 1) 814 815 @reap_threads 816 def test_invalid_authentication(self): 817 818 class MyServer(SimpleIMAPHandler): 819 820 def cmd_AUTHENTICATE(self, tag, args): 821 self._send_textline('+') 822 self.response = yield 823 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid') 824 825 with self.reaped_pair(MyServer) as (server, client): 826 with self.assertRaises(imaplib.IMAP4.error): 827 code, data = client.authenticate('MYAUTH', lambda x: b'fake') 828 829 @reap_threads 830 def test_valid_authentication(self): 831 832 class MyServer(SimpleIMAPHandler): 833 834 def cmd_AUTHENTICATE(self, tag, args): 835 self._send_textline('+') 836 self.server.response = yield 837 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 838 839 with self.reaped_pair(MyServer) as (server, client): 840 code, data = client.authenticate('MYAUTH', lambda x: b'fake') 841 self.assertEqual(code, 'OK') 842 self.assertEqual(server.response, 843 b'ZmFrZQ==\r\n') # b64 encoded 'fake' 844 845 with self.reaped_pair(MyServer) as (server, client): 846 code, data = client.authenticate('MYAUTH', lambda x: 'fake') 847 self.assertEqual(code, 'OK') 848 self.assertEqual(server.response, 849 b'ZmFrZQ==\r\n') # b64 encoded 'fake' 850 851 @reap_threads 852 @hashlib_helper.requires_hashdigest('md5') 853 def test_login_cram_md5(self): 854 855 class AuthHandler(SimpleIMAPHandler): 856 857 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' 858 859 def cmd_AUTHENTICATE(self, tag, args): 860 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' 861 'VzdG9uLm1jaS5uZXQ=') 862 r = yield 863 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' 864 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): 865 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') 866 else: 867 self._send_tagged(tag, 'NO', 'No access') 868 869 with self.reaped_pair(AuthHandler) as (server, client): 870 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 871 ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf") 872 self.assertEqual(ret, "OK") 873 874 with self.reaped_pair(AuthHandler) as (server, client): 875 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 876 ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf") 877 self.assertEqual(ret, "OK") 878 879 880 @reap_threads 881 def test_aborted_authentication(self): 882 883 class MyServer(SimpleIMAPHandler): 884 885 def cmd_AUTHENTICATE(self, tag, args): 886 self._send_textline('+') 887 self.response = yield 888 889 if self.response == b'*\r\n': 890 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] aborted') 891 else: 892 self._send_tagged(tag, 'OK', 'MYAUTH successful') 893 894 with self.reaped_pair(MyServer) as (server, client): 895 with self.assertRaises(imaplib.IMAP4.error): 896 code, data = client.authenticate('MYAUTH', lambda x: None) 897 898 899 def test_linetoolong(self): 900 class TooLongHandler(SimpleIMAPHandler): 901 def handle(self): 902 # Send a very long response line 903 self.wfile.write(b'* OK ' + imaplib._MAXLINE * b'x' + b'\r\n') 904 905 with self.reaped_server(TooLongHandler) as server: 906 self.assertRaises(imaplib.IMAP4.error, 907 self.imap_class, *server.server_address) 908 909 @reap_threads 910 def test_simple_with_statement(self): 911 # simplest call 912 with self.reaped_server(SimpleIMAPHandler) as server: 913 with self.imap_class(*server.server_address): 914 pass 915 916 @reap_threads 917 def test_with_statement(self): 918 with self.reaped_server(SimpleIMAPHandler) as server: 919 with self.imap_class(*server.server_address) as imap: 920 imap.login('user', 'pass') 921 self.assertEqual(server.logged, 'user') 922 self.assertIsNone(server.logged) 923 924 @reap_threads 925 def test_with_statement_logout(self): 926 # what happens if already logout in the block? 927 with self.reaped_server(SimpleIMAPHandler) as server: 928 with self.imap_class(*server.server_address) as imap: 929 imap.login('user', 'pass') 930 self.assertEqual(server.logged, 'user') 931 imap.logout() 932 self.assertIsNone(server.logged) 933 self.assertIsNone(server.logged) 934 935 936@unittest.skipUnless(ssl, "SSL not available") 937class ThreadedNetworkedTestsSSL(ThreadedNetworkedTests): 938 server_class = SecureTCPServer 939 imap_class = IMAP4_SSL 940 941 @reap_threads 942 def test_ssl_verified(self): 943 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 944 ssl_context.load_verify_locations(CAFILE) 945 946 with self.assertRaisesRegex( 947 ssl.CertificateError, 948 "IP address mismatch, certificate is not valid for " 949 "'127.0.0.1'"): 950 with self.reaped_server(SimpleIMAPHandler) as server: 951 client = self.imap_class(*server.server_address, 952 ssl_context=ssl_context) 953 client.shutdown() 954 955 with self.reaped_server(SimpleIMAPHandler) as server: 956 client = self.imap_class("localhost", server.server_address[1], 957 ssl_context=ssl_context) 958 client.shutdown() 959 960 961@unittest.skipUnless( 962 support.is_resource_enabled('network'), 'network resource disabled') 963@unittest.skip('cyrus.andrew.cmu.edu blocks connections') 964class RemoteIMAPTest(unittest.TestCase): 965 host = 'cyrus.andrew.cmu.edu' 966 port = 143 967 username = 'anonymous' 968 password = 'pass' 969 imap_class = imaplib.IMAP4 970 971 def setUp(self): 972 with socket_helper.transient_internet(self.host): 973 self.server = self.imap_class(self.host, self.port) 974 975 def tearDown(self): 976 if self.server is not None: 977 with socket_helper.transient_internet(self.host): 978 self.server.logout() 979 980 def test_logincapa(self): 981 with socket_helper.transient_internet(self.host): 982 for cap in self.server.capabilities: 983 self.assertIsInstance(cap, str) 984 self.assertIn('LOGINDISABLED', self.server.capabilities) 985 self.assertIn('AUTH=ANONYMOUS', self.server.capabilities) 986 rs = self.server.login(self.username, self.password) 987 self.assertEqual(rs[0], 'OK') 988 989 def test_logout(self): 990 with socket_helper.transient_internet(self.host): 991 rs = self.server.logout() 992 self.server = None 993 self.assertEqual(rs[0], 'BYE', rs) 994 995 996@unittest.skipUnless(ssl, "SSL not available") 997@unittest.skipUnless( 998 support.is_resource_enabled('network'), 'network resource disabled') 999@unittest.skip('cyrus.andrew.cmu.edu blocks connections') 1000class RemoteIMAP_STARTTLSTest(RemoteIMAPTest): 1001 1002 def setUp(self): 1003 super().setUp() 1004 with socket_helper.transient_internet(self.host): 1005 rs = self.server.starttls() 1006 self.assertEqual(rs[0], 'OK') 1007 1008 def test_logincapa(self): 1009 for cap in self.server.capabilities: 1010 self.assertIsInstance(cap, str) 1011 self.assertNotIn('LOGINDISABLED', self.server.capabilities) 1012 1013 1014@unittest.skipUnless(ssl, "SSL not available") 1015@unittest.skip('cyrus.andrew.cmu.edu blocks connections') 1016class RemoteIMAP_SSLTest(RemoteIMAPTest): 1017 port = 993 1018 imap_class = IMAP4_SSL 1019 1020 def setUp(self): 1021 pass 1022 1023 def tearDown(self): 1024 pass 1025 1026 def create_ssl_context(self): 1027 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1028 ssl_context.check_hostname = False 1029 ssl_context.verify_mode = ssl.CERT_NONE 1030 ssl_context.load_cert_chain(CERTFILE) 1031 return ssl_context 1032 1033 def check_logincapa(self, server): 1034 try: 1035 for cap in server.capabilities: 1036 self.assertIsInstance(cap, str) 1037 self.assertNotIn('LOGINDISABLED', server.capabilities) 1038 self.assertIn('AUTH=PLAIN', server.capabilities) 1039 rs = server.login(self.username, self.password) 1040 self.assertEqual(rs[0], 'OK') 1041 finally: 1042 server.logout() 1043 1044 def test_logincapa(self): 1045 with socket_helper.transient_internet(self.host): 1046 _server = self.imap_class(self.host, self.port) 1047 self.check_logincapa(_server) 1048 1049 def test_logout(self): 1050 with socket_helper.transient_internet(self.host): 1051 _server = self.imap_class(self.host, self.port) 1052 rs = _server.logout() 1053 self.assertEqual(rs[0], 'BYE', rs) 1054 1055 def test_ssl_context_certfile_exclusive(self): 1056 with socket_helper.transient_internet(self.host): 1057 self.assertRaises( 1058 ValueError, self.imap_class, self.host, self.port, 1059 certfile=CERTFILE, ssl_context=self.create_ssl_context()) 1060 1061 def test_ssl_context_keyfile_exclusive(self): 1062 with socket_helper.transient_internet(self.host): 1063 self.assertRaises( 1064 ValueError, self.imap_class, self.host, self.port, 1065 keyfile=CERTFILE, ssl_context=self.create_ssl_context()) 1066 1067 1068if __name__ == "__main__": 1069 unittest.main() 1070