1from test import support 2 3from contextlib import contextmanager 4import imaplib 5import os.path 6import socketserver 7import time 8import calendar 9import threading 10import socket 11 12from test.support import (reap_threads, verbose, transient_internet, 13 run_with_tz, run_with_locale, cpython_only, 14 requires_hashdigest) 15import unittest 16from unittest import mock 17from datetime import datetime, timezone, timedelta 18try: 19 import ssl 20except ImportError: 21 ssl = None 22 23CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem") 24CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem") 25 26 27class TestImaplib(unittest.TestCase): 28 29 def test_Internaldate2tuple(self): 30 t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1)) 31 tt = imaplib.Internaldate2tuple( 32 b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")') 33 self.assertEqual(time.mktime(tt), t0) 34 tt = imaplib.Internaldate2tuple( 35 b'25 (INTERNALDATE "01-Jan-2000 11:30:00 +1130")') 36 self.assertEqual(time.mktime(tt), t0) 37 tt = imaplib.Internaldate2tuple( 38 b'25 (INTERNALDATE "31-Dec-1999 12:30:00 -1130")') 39 self.assertEqual(time.mktime(tt), t0) 40 41 @run_with_tz('MST+07MDT,M4.1.0,M10.5.0') 42 def test_Internaldate2tuple_issue10941(self): 43 self.assertNotEqual(imaplib.Internaldate2tuple( 44 b'25 (INTERNALDATE "02-Apr-2000 02:30:00 +0000")'), 45 imaplib.Internaldate2tuple( 46 b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")')) 47 48 def timevalues(self): 49 return [2000000000, 2000000000.0, time.localtime(2000000000), 50 (2033, 5, 18, 5, 33, 20, -1, -1, -1), 51 (2033, 5, 18, 5, 33, 20, -1, -1, 1), 52 datetime.fromtimestamp(2000000000, 53 timezone(timedelta(0, 2 * 60 * 60))), 54 '"18-May-2033 05:33:20 +0200"'] 55 56 @run_with_locale('LC_ALL', 'de_DE', 'fr_FR') 57 # DST rules included to work around quirk where the Gnu C library may not 58 # otherwise restore the previous time zone 59 @run_with_tz('STD-1DST,M3.2.0,M11.1.0') 60 def test_Time2Internaldate(self): 61 expected = '"18-May-2033 05:33:20 +0200"' 62 63 for t in self.timevalues(): 64 internal = imaplib.Time2Internaldate(t) 65 self.assertEqual(internal, expected) 66 67 def test_that_Time2Internaldate_returns_a_result(self): 68 # Without tzset, we can check only that it successfully 69 # produces a result, not the correctness of the result itself, 70 # since the result depends on the timezone the machine is in. 71 for t in self.timevalues(): 72 imaplib.Time2Internaldate(t) 73 74 def test_imap4_host_default_value(self): 75 # Check whether the IMAP4_PORT is truly unavailable. 76 with socket.socket() as s: 77 try: 78 s.connect(('', imaplib.IMAP4_PORT)) 79 self.skipTest( 80 "Cannot run the test with local IMAP server running.") 81 except socket.error: 82 pass 83 84 # This is the exception that should be raised. 85 expected_errnos = support.get_socket_conn_refused_errs() 86 with self.assertRaises(OSError) as cm: 87 imaplib.IMAP4() 88 self.assertIn(cm.exception.errno, expected_errnos) 89 90 91if ssl: 92 class SecureTCPServer(socketserver.TCPServer): 93 94 def get_request(self): 95 newsocket, fromaddr = self.socket.accept() 96 context = ssl.SSLContext() 97 context.load_cert_chain(CERTFILE) 98 connstream = context.wrap_socket(newsocket, server_side=True) 99 return connstream, fromaddr 100 101 IMAP4_SSL = imaplib.IMAP4_SSL 102 103else: 104 105 class SecureTCPServer: 106 pass 107 108 IMAP4_SSL = None 109 110 111class SimpleIMAPHandler(socketserver.StreamRequestHandler): 112 timeout = 1 113 continuation = None 114 capabilities = '' 115 116 def setup(self): 117 super().setup() 118 self.server.logged = None 119 120 def _send(self, message): 121 if verbose: 122 print("SENT: %r" % message.strip()) 123 self.wfile.write(message) 124 125 def _send_line(self, message): 126 self._send(message + b'\r\n') 127 128 def _send_textline(self, message): 129 self._send_line(message.encode('ASCII')) 130 131 def _send_tagged(self, tag, code, message): 132 self._send_textline(' '.join((tag, code, message))) 133 134 def handle(self): 135 # Send a welcome message. 136 self._send_textline('* OK IMAP4rev1') 137 while 1: 138 # Gather up input until we receive a line terminator or we timeout. 139 # Accumulate read(1) because it's simpler to handle the differences 140 # between naked sockets and SSL sockets. 141 line = b'' 142 while 1: 143 try: 144 part = self.rfile.read(1) 145 if part == b'': 146 # Naked sockets return empty strings.. 147 return 148 line += part 149 except OSError: 150 # ..but SSLSockets raise exceptions. 151 return 152 if line.endswith(b'\r\n'): 153 break 154 155 if verbose: 156 print('GOT: %r' % line.strip()) 157 if self.continuation: 158 try: 159 self.continuation.send(line) 160 except StopIteration: 161 self.continuation = None 162 continue 163 splitline = line.decode('ASCII').split() 164 tag = splitline[0] 165 cmd = splitline[1] 166 args = splitline[2:] 167 168 if hasattr(self, 'cmd_' + cmd): 169 continuation = getattr(self, 'cmd_' + cmd)(tag, args) 170 if continuation: 171 self.continuation = continuation 172 next(continuation) 173 else: 174 self._send_tagged(tag, 'BAD', cmd + ' unknown') 175 176 def cmd_CAPABILITY(self, tag, args): 177 caps = ('IMAP4rev1 ' + self.capabilities 178 if self.capabilities 179 else 'IMAP4rev1') 180 self._send_textline('* CAPABILITY ' + caps) 181 self._send_tagged(tag, 'OK', 'CAPABILITY completed') 182 183 def cmd_LOGOUT(self, tag, args): 184 self.server.logged = None 185 self._send_textline('* BYE IMAP4ref1 Server logging out') 186 self._send_tagged(tag, 'OK', 'LOGOUT completed') 187 188 def cmd_LOGIN(self, tag, args): 189 self.server.logged = args[0] 190 self._send_tagged(tag, 'OK', 'LOGIN completed') 191 192 193class NewIMAPTestsMixin(): 194 client = None 195 196 def _setup(self, imap_handler, connect=True): 197 """ 198 Sets up imap_handler for tests. imap_handler should inherit from either: 199 - SimpleIMAPHandler - for testing IMAP commands, 200 - socketserver.StreamRequestHandler - if raw access to stream is needed. 201 Returns (client, server). 202 """ 203 class TestTCPServer(self.server_class): 204 def handle_error(self, request, client_address): 205 """ 206 End request and raise the error if one occurs. 207 """ 208 self.close_request(request) 209 self.server_close() 210 raise 211 212 self.addCleanup(self._cleanup) 213 self.server = self.server_class((support.HOST, 0), imap_handler) 214 self.thread = threading.Thread( 215 name=self._testMethodName+'-server', 216 target=self.server.serve_forever, 217 # Short poll interval to make the test finish quickly. 218 # Time between requests is short enough that we won't wake 219 # up spuriously too many times. 220 kwargs={'poll_interval': 0.01}) 221 self.thread.daemon = True # In case this function raises. 222 self.thread.start() 223 224 if connect: 225 self.client = self.imap_class(*self.server.server_address) 226 227 return self.client, self.server 228 229 def _cleanup(self): 230 """ 231 Cleans up the test server. This method should not be called manually, 232 it is added to the cleanup queue in the _setup method already. 233 """ 234 # if logout was called already we'd raise an exception trying to 235 # shutdown the client once again 236 if self.client is not None and self.client.state != 'LOGOUT': 237 self.client.shutdown() 238 # cleanup the server 239 self.server.shutdown() 240 self.server.server_close() 241 support.join_thread(self.thread, 3.0) 242 # Explicitly clear the attribute to prevent dangling thread 243 self.thread = None 244 245 def test_EOF_without_complete_welcome_message(self): 246 # http://bugs.python.org/issue5949 247 class EOFHandler(socketserver.StreamRequestHandler): 248 def handle(self): 249 self.wfile.write(b'* OK') 250 _, server = self._setup(EOFHandler, connect=False) 251 self.assertRaises(imaplib.IMAP4.abort, self.imap_class, 252 *server.server_address) 253 254 def test_line_termination(self): 255 class BadNewlineHandler(SimpleIMAPHandler): 256 def cmd_CAPABILITY(self, tag, args): 257 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') 258 self._send_tagged(tag, 'OK', 'CAPABILITY completed') 259 _, server = self._setup(BadNewlineHandler, connect=False) 260 self.assertRaises(imaplib.IMAP4.abort, self.imap_class, 261 *server.server_address) 262 263 def test_enable_raises_error_if_not_AUTH(self): 264 class EnableHandler(SimpleIMAPHandler): 265 capabilities = 'AUTH ENABLE UTF8=ACCEPT' 266 client, _ = self._setup(EnableHandler) 267 self.assertFalse(client.utf8_enabled) 268 with self.assertRaisesRegex(imaplib.IMAP4.error, 'ENABLE.*NONAUTH'): 269 client.enable('foo') 270 self.assertFalse(client.utf8_enabled) 271 272 def test_enable_raises_error_if_no_capability(self): 273 client, _ = self._setup(SimpleIMAPHandler) 274 with self.assertRaisesRegex(imaplib.IMAP4.error, 275 'does not support ENABLE'): 276 client.enable('foo') 277 278 def test_enable_UTF8_raises_error_if_not_supported(self): 279 client, _ = self._setup(SimpleIMAPHandler) 280 typ, data = client.login('user', 'pass') 281 self.assertEqual(typ, 'OK') 282 with self.assertRaisesRegex(imaplib.IMAP4.error, 283 'does not support ENABLE'): 284 client.enable('UTF8=ACCEPT') 285 286 def test_enable_UTF8_True_append(self): 287 class UTF8AppendServer(SimpleIMAPHandler): 288 capabilities = 'ENABLE UTF8=ACCEPT' 289 def cmd_ENABLE(self, tag, args): 290 self._send_tagged(tag, 'OK', 'ENABLE successful') 291 def cmd_AUTHENTICATE(self, tag, args): 292 self._send_textline('+') 293 self.server.response = yield 294 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 295 def cmd_APPEND(self, tag, args): 296 self._send_textline('+') 297 self.server.response = yield 298 self._send_tagged(tag, 'OK', 'okay') 299 client, server = self._setup(UTF8AppendServer) 300 self.assertEqual(client._encoding, 'ascii') 301 code, _ = client.authenticate('MYAUTH', lambda x: b'fake') 302 self.assertEqual(code, 'OK') 303 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' 304 code, _ = client.enable('UTF8=ACCEPT') 305 self.assertEqual(code, 'OK') 306 self.assertEqual(client._encoding, 'utf-8') 307 msg_string = 'Subject: üñí©öðé' 308 typ, data = client.append(None, None, None, msg_string.encode('utf-8')) 309 self.assertEqual(typ, 'OK') 310 self.assertEqual(server.response, 311 ('UTF8 (%s)\r\n' % msg_string).encode('utf-8')) 312 313 def test_search_disallows_charset_in_utf8_mode(self): 314 class UTF8Server(SimpleIMAPHandler): 315 capabilities = 'AUTH ENABLE UTF8=ACCEPT' 316 def cmd_ENABLE(self, tag, args): 317 self._send_tagged(tag, 'OK', 'ENABLE successful') 318 def cmd_AUTHENTICATE(self, tag, args): 319 self._send_textline('+') 320 self.server.response = yield 321 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 322 client, _ = self._setup(UTF8Server) 323 typ, _ = client.authenticate('MYAUTH', lambda x: b'fake') 324 self.assertEqual(typ, 'OK') 325 typ, _ = client.enable('UTF8=ACCEPT') 326 self.assertEqual(typ, 'OK') 327 self.assertTrue(client.utf8_enabled) 328 with self.assertRaisesRegex(imaplib.IMAP4.error, 'charset.*UTF8'): 329 client.search('foo', 'bar') 330 331 def test_bad_auth_name(self): 332 class MyServer(SimpleIMAPHandler): 333 def cmd_AUTHENTICATE(self, tag, args): 334 self._send_tagged(tag, 'NO', 335 'unrecognized authentication type {}'.format(args[0])) 336 client, _ = self._setup(MyServer) 337 with self.assertRaisesRegex(imaplib.IMAP4.error, 338 'unrecognized authentication type METHOD'): 339 client.authenticate('METHOD', lambda: 1) 340 341 def test_invalid_authentication(self): 342 class MyServer(SimpleIMAPHandler): 343 def cmd_AUTHENTICATE(self, tag, args): 344 self._send_textline('+') 345 self.response = yield 346 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid') 347 client, _ = self._setup(MyServer) 348 with self.assertRaisesRegex(imaplib.IMAP4.error, 349 r'\[AUTHENTICATIONFAILED\] invalid'): 350 client.authenticate('MYAUTH', lambda x: b'fake') 351 352 def test_valid_authentication_bytes(self): 353 class MyServer(SimpleIMAPHandler): 354 def cmd_AUTHENTICATE(self, tag, args): 355 self._send_textline('+') 356 self.server.response = yield 357 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 358 client, server = self._setup(MyServer) 359 code, _ = client.authenticate('MYAUTH', lambda x: b'fake') 360 self.assertEqual(code, 'OK') 361 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' 362 363 def test_valid_authentication_plain_text(self): 364 class MyServer(SimpleIMAPHandler): 365 def cmd_AUTHENTICATE(self, tag, args): 366 self._send_textline('+') 367 self.server.response = yield 368 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 369 client, server = self._setup(MyServer) 370 code, _ = client.authenticate('MYAUTH', lambda x: 'fake') 371 self.assertEqual(code, 'OK') 372 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' 373 374 @requires_hashdigest('md5') 375 def test_login_cram_md5_bytes(self): 376 class AuthHandler(SimpleIMAPHandler): 377 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' 378 def cmd_AUTHENTICATE(self, tag, args): 379 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' 380 'VzdG9uLm1jaS5uZXQ=') 381 r = yield 382 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' 383 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): 384 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') 385 else: 386 self._send_tagged(tag, 'NO', 'No access') 387 client, _ = self._setup(AuthHandler) 388 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 389 ret, _ = client.login_cram_md5("tim", b"tanstaaftanstaaf") 390 self.assertEqual(ret, "OK") 391 392 @requires_hashdigest('md5') 393 def test_login_cram_md5_plain_text(self): 394 class AuthHandler(SimpleIMAPHandler): 395 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' 396 def cmd_AUTHENTICATE(self, tag, args): 397 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' 398 'VzdG9uLm1jaS5uZXQ=') 399 r = yield 400 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' 401 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): 402 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') 403 else: 404 self._send_tagged(tag, 'NO', 'No access') 405 client, _ = self._setup(AuthHandler) 406 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 407 ret, _ = client.login_cram_md5("tim", "tanstaaftanstaaf") 408 self.assertEqual(ret, "OK") 409 410 def test_aborted_authentication(self): 411 class MyServer(SimpleIMAPHandler): 412 def cmd_AUTHENTICATE(self, tag, args): 413 self._send_textline('+') 414 self.response = yield 415 if self.response == b'*\r\n': 416 self._send_tagged( 417 tag, 418 'NO', 419 '[AUTHENTICATIONFAILED] aborted') 420 else: 421 self._send_tagged(tag, 'OK', 'MYAUTH successful') 422 client, _ = self._setup(MyServer) 423 with self.assertRaisesRegex(imaplib.IMAP4.error, 424 r'\[AUTHENTICATIONFAILED\] aborted'): 425 client.authenticate('MYAUTH', lambda x: None) 426 427 @mock.patch('imaplib._MAXLINE', 10) 428 def test_linetoolong(self): 429 class TooLongHandler(SimpleIMAPHandler): 430 def handle(self): 431 # send response line longer than the limit set in the next line 432 self.wfile.write(b'* OK ' + 11 * b'x' + b'\r\n') 433 _, server = self._setup(TooLongHandler, connect=False) 434 with self.assertRaisesRegex(imaplib.IMAP4.error, 435 'got more than 10 bytes'): 436 self.imap_class(*server.server_address) 437 438 def test_simple_with_statement(self): 439 _, server = self._setup(SimpleIMAPHandler, connect=False) 440 with self.imap_class(*server.server_address): 441 pass 442 443 def test_with_statement(self): 444 _, server = self._setup(SimpleIMAPHandler, connect=False) 445 with self.imap_class(*server.server_address) as imap: 446 imap.login('user', 'pass') 447 self.assertEqual(server.logged, 'user') 448 self.assertIsNone(server.logged) 449 450 def test_with_statement_logout(self): 451 # It is legal to log out explicitly inside the with block 452 _, server = self._setup(SimpleIMAPHandler, connect=False) 453 with self.imap_class(*server.server_address) as imap: 454 imap.login('user', 'pass') 455 self.assertEqual(server.logged, 'user') 456 imap.logout() 457 self.assertIsNone(server.logged) 458 self.assertIsNone(server.logged) 459 460 # command tests 461 462 def test_login(self): 463 client, _ = self._setup(SimpleIMAPHandler) 464 typ, data = client.login('user', 'pass') 465 self.assertEqual(typ, 'OK') 466 self.assertEqual(data[0], b'LOGIN completed') 467 self.assertEqual(client.state, 'AUTH') 468 469 def test_logout(self): 470 client, _ = self._setup(SimpleIMAPHandler) 471 typ, data = client.login('user', 'pass') 472 self.assertEqual(typ, 'OK') 473 self.assertEqual(data[0], b'LOGIN completed') 474 typ, data = client.logout() 475 self.assertEqual(typ, 'BYE', (typ, data)) 476 self.assertEqual(data[0], b'IMAP4ref1 Server logging out', (typ, data)) 477 self.assertEqual(client.state, 'LOGOUT') 478 479 def test_lsub(self): 480 class LsubCmd(SimpleIMAPHandler): 481 def cmd_LSUB(self, tag, args): 482 self._send_textline('* LSUB () "." directoryA') 483 return self._send_tagged(tag, 'OK', 'LSUB completed') 484 client, _ = self._setup(LsubCmd) 485 client.login('user', 'pass') 486 typ, data = client.lsub() 487 self.assertEqual(typ, 'OK') 488 self.assertEqual(data[0], b'() "." directoryA') 489 490 491class NewIMAPTests(NewIMAPTestsMixin, unittest.TestCase): 492 imap_class = imaplib.IMAP4 493 server_class = socketserver.TCPServer 494 495 496@unittest.skipUnless(ssl, "SSL not available") 497class NewIMAPSSLTests(NewIMAPTestsMixin, unittest.TestCase): 498 imap_class = IMAP4_SSL 499 server_class = SecureTCPServer 500 501 def test_ssl_raises(self): 502 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 503 self.assertEqual(ssl_context.verify_mode, ssl.CERT_REQUIRED) 504 self.assertEqual(ssl_context.check_hostname, True) 505 ssl_context.load_verify_locations(CAFILE) 506 507 with self.assertRaisesRegex(ssl.CertificateError, 508 "IP address mismatch, certificate is not valid for " 509 "'127.0.0.1'"): 510 _, server = self._setup(SimpleIMAPHandler) 511 client = self.imap_class(*server.server_address, 512 ssl_context=ssl_context) 513 client.shutdown() 514 515 def test_ssl_verified(self): 516 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 517 ssl_context.load_verify_locations(CAFILE) 518 519 _, server = self._setup(SimpleIMAPHandler) 520 client = self.imap_class("localhost", server.server_address[1], 521 ssl_context=ssl_context) 522 client.shutdown() 523 524 # Mock the private method _connect(), so mark the test as specific 525 # to CPython stdlib 526 @cpython_only 527 def test_certfile_arg_warn(self): 528 with support.check_warnings(('', DeprecationWarning)): 529 with mock.patch.object(self.imap_class, 'open'): 530 with mock.patch.object(self.imap_class, '_connect'): 531 self.imap_class('localhost', 143, certfile=CERTFILE) 532 533class ThreadedNetworkedTests(unittest.TestCase): 534 server_class = socketserver.TCPServer 535 imap_class = imaplib.IMAP4 536 537 def make_server(self, addr, hdlr): 538 539 class MyServer(self.server_class): 540 def handle_error(self, request, client_address): 541 self.close_request(request) 542 self.server_close() 543 raise 544 545 if verbose: 546 print("creating server") 547 server = MyServer(addr, hdlr) 548 self.assertEqual(server.server_address, server.socket.getsockname()) 549 550 if verbose: 551 print("server created") 552 print("ADDR =", addr) 553 print("CLASS =", self.server_class) 554 print("HDLR =", server.RequestHandlerClass) 555 556 t = threading.Thread( 557 name='%s serving' % self.server_class, 558 target=server.serve_forever, 559 # Short poll interval to make the test finish quickly. 560 # Time between requests is short enough that we won't wake 561 # up spuriously too many times. 562 kwargs={'poll_interval': 0.01}) 563 t.daemon = True # In case this function raises. 564 t.start() 565 if verbose: 566 print("server running") 567 return server, t 568 569 def reap_server(self, server, thread): 570 if verbose: 571 print("waiting for server") 572 server.shutdown() 573 server.server_close() 574 thread.join() 575 if verbose: 576 print("done") 577 578 @contextmanager 579 def reaped_server(self, hdlr): 580 server, thread = self.make_server((support.HOST, 0), hdlr) 581 try: 582 yield server 583 finally: 584 self.reap_server(server, thread) 585 586 @contextmanager 587 def reaped_pair(self, hdlr): 588 with self.reaped_server(hdlr) as server: 589 client = self.imap_class(*server.server_address) 590 try: 591 yield server, client 592 finally: 593 client.logout() 594 595 @reap_threads 596 def test_connect(self): 597 with self.reaped_server(SimpleIMAPHandler) as server: 598 client = self.imap_class(*server.server_address) 599 client.shutdown() 600 601 @reap_threads 602 def test_bracket_flags(self): 603 604 # This violates RFC 3501, which disallows ']' characters in tag names, 605 # but imaplib has allowed producing such tags forever, other programs 606 # also produce them (eg: OtherInbox's Organizer app as of 20140716), 607 # and Gmail, for example, accepts them and produces them. So we 608 # support them. See issue #21815. 609 610 class BracketFlagHandler(SimpleIMAPHandler): 611 612 def handle(self): 613 self.flags = ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft'] 614 super().handle() 615 616 def cmd_AUTHENTICATE(self, tag, args): 617 self._send_textline('+') 618 self.server.response = yield 619 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 620 621 def cmd_SELECT(self, tag, args): 622 flag_msg = ' \\'.join(self.flags) 623 self._send_line(('* FLAGS (%s)' % flag_msg).encode('ascii')) 624 self._send_line(b'* 2 EXISTS') 625 self._send_line(b'* 0 RECENT') 626 msg = ('* OK [PERMANENTFLAGS %s \\*)] Flags permitted.' 627 % flag_msg) 628 self._send_line(msg.encode('ascii')) 629 self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') 630 631 def cmd_STORE(self, tag, args): 632 new_flags = args[2].strip('(').strip(')').split() 633 self.flags.extend(new_flags) 634 flags_msg = '(FLAGS (%s))' % ' \\'.join(self.flags) 635 msg = '* %s FETCH %s' % (args[0], flags_msg) 636 self._send_line(msg.encode('ascii')) 637 self._send_tagged(tag, 'OK', 'STORE completed.') 638 639 with self.reaped_pair(BracketFlagHandler) as (server, client): 640 code, data = client.authenticate('MYAUTH', lambda x: b'fake') 641 self.assertEqual(code, 'OK') 642 self.assertEqual(server.response, b'ZmFrZQ==\r\n') 643 client.select('test') 644 typ, [data] = client.store(b'1', "+FLAGS", "[test]") 645 self.assertIn(b'[test]', data) 646 client.select('test') 647 typ, [data] = client.response('PERMANENTFLAGS') 648 self.assertIn(b'[test]', data) 649 650 @reap_threads 651 def test_issue5949(self): 652 653 class EOFHandler(socketserver.StreamRequestHandler): 654 def handle(self): 655 # EOF without sending a complete welcome message. 656 self.wfile.write(b'* OK') 657 658 with self.reaped_server(EOFHandler) as server: 659 self.assertRaises(imaplib.IMAP4.abort, 660 self.imap_class, *server.server_address) 661 662 @reap_threads 663 def test_line_termination(self): 664 665 class BadNewlineHandler(SimpleIMAPHandler): 666 667 def cmd_CAPABILITY(self, tag, args): 668 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') 669 self._send_tagged(tag, 'OK', 'CAPABILITY completed') 670 671 with self.reaped_server(BadNewlineHandler) as server: 672 self.assertRaises(imaplib.IMAP4.abort, 673 self.imap_class, *server.server_address) 674 675 class UTF8Server(SimpleIMAPHandler): 676 capabilities = 'AUTH ENABLE UTF8=ACCEPT' 677 678 def cmd_ENABLE(self, tag, args): 679 self._send_tagged(tag, 'OK', 'ENABLE successful') 680 681 def cmd_AUTHENTICATE(self, tag, args): 682 self._send_textline('+') 683 self.server.response = yield 684 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 685 686 @reap_threads 687 def test_enable_raises_error_if_not_AUTH(self): 688 with self.reaped_pair(self.UTF8Server) as (server, client): 689 self.assertFalse(client.utf8_enabled) 690 self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo') 691 self.assertFalse(client.utf8_enabled) 692 693 # XXX Also need a test that enable after SELECT raises an error. 694 695 @reap_threads 696 def test_enable_raises_error_if_no_capability(self): 697 class NoEnableServer(self.UTF8Server): 698 capabilities = 'AUTH' 699 with self.reaped_pair(NoEnableServer) as (server, client): 700 self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo') 701 702 @reap_threads 703 def test_enable_UTF8_raises_error_if_not_supported(self): 704 class NonUTF8Server(SimpleIMAPHandler): 705 pass 706 with self.assertRaises(imaplib.IMAP4.error): 707 with self.reaped_pair(NonUTF8Server) as (server, client): 708 typ, data = client.login('user', 'pass') 709 self.assertEqual(typ, 'OK') 710 client.enable('UTF8=ACCEPT') 711 pass 712 713 @reap_threads 714 def test_enable_UTF8_True_append(self): 715 716 class UTF8AppendServer(self.UTF8Server): 717 def cmd_APPEND(self, tag, args): 718 self._send_textline('+') 719 self.server.response = yield 720 self._send_tagged(tag, 'OK', 'okay') 721 722 with self.reaped_pair(UTF8AppendServer) as (server, client): 723 self.assertEqual(client._encoding, 'ascii') 724 code, _ = client.authenticate('MYAUTH', lambda x: b'fake') 725 self.assertEqual(code, 'OK') 726 self.assertEqual(server.response, 727 b'ZmFrZQ==\r\n') # b64 encoded 'fake' 728 code, _ = client.enable('UTF8=ACCEPT') 729 self.assertEqual(code, 'OK') 730 self.assertEqual(client._encoding, 'utf-8') 731 msg_string = 'Subject: üñí©öðé' 732 typ, data = client.append( 733 None, None, None, msg_string.encode('utf-8')) 734 self.assertEqual(typ, 'OK') 735 self.assertEqual( 736 server.response, 737 ('UTF8 (%s)\r\n' % msg_string).encode('utf-8') 738 ) 739 740 # XXX also need a test that makes sure that the Literal and Untagged_status 741 # regexes uses unicode in UTF8 mode instead of the default ASCII. 742 743 @reap_threads 744 def test_search_disallows_charset_in_utf8_mode(self): 745 with self.reaped_pair(self.UTF8Server) as (server, client): 746 typ, _ = client.authenticate('MYAUTH', lambda x: b'fake') 747 self.assertEqual(typ, 'OK') 748 typ, _ = client.enable('UTF8=ACCEPT') 749 self.assertEqual(typ, 'OK') 750 self.assertTrue(client.utf8_enabled) 751 self.assertRaises(imaplib.IMAP4.error, client.search, 'foo', 'bar') 752 753 @reap_threads 754 def test_bad_auth_name(self): 755 756 class MyServer(SimpleIMAPHandler): 757 758 def cmd_AUTHENTICATE(self, tag, args): 759 self._send_tagged(tag, 'NO', 'unrecognized authentication ' 760 'type {}'.format(args[0])) 761 762 with self.reaped_pair(MyServer) as (server, client): 763 with self.assertRaises(imaplib.IMAP4.error): 764 client.authenticate('METHOD', lambda: 1) 765 766 @reap_threads 767 def test_invalid_authentication(self): 768 769 class MyServer(SimpleIMAPHandler): 770 771 def cmd_AUTHENTICATE(self, tag, args): 772 self._send_textline('+') 773 self.response = yield 774 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid') 775 776 with self.reaped_pair(MyServer) as (server, client): 777 with self.assertRaises(imaplib.IMAP4.error): 778 code, data = client.authenticate('MYAUTH', lambda x: b'fake') 779 780 @reap_threads 781 def test_valid_authentication(self): 782 783 class MyServer(SimpleIMAPHandler): 784 785 def cmd_AUTHENTICATE(self, tag, args): 786 self._send_textline('+') 787 self.server.response = yield 788 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 789 790 with self.reaped_pair(MyServer) as (server, client): 791 code, data = client.authenticate('MYAUTH', lambda x: b'fake') 792 self.assertEqual(code, 'OK') 793 self.assertEqual(server.response, 794 b'ZmFrZQ==\r\n') # b64 encoded 'fake' 795 796 with self.reaped_pair(MyServer) as (server, client): 797 code, data = client.authenticate('MYAUTH', lambda x: 'fake') 798 self.assertEqual(code, 'OK') 799 self.assertEqual(server.response, 800 b'ZmFrZQ==\r\n') # b64 encoded 'fake' 801 802 @reap_threads 803 @requires_hashdigest('md5') 804 def test_login_cram_md5(self): 805 806 class AuthHandler(SimpleIMAPHandler): 807 808 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' 809 810 def cmd_AUTHENTICATE(self, tag, args): 811 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' 812 'VzdG9uLm1jaS5uZXQ=') 813 r = yield 814 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' 815 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): 816 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') 817 else: 818 self._send_tagged(tag, 'NO', 'No access') 819 820 with self.reaped_pair(AuthHandler) as (server, client): 821 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 822 ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf") 823 self.assertEqual(ret, "OK") 824 825 with self.reaped_pair(AuthHandler) as (server, client): 826 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 827 ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf") 828 self.assertEqual(ret, "OK") 829 830 831 @reap_threads 832 def test_aborted_authentication(self): 833 834 class MyServer(SimpleIMAPHandler): 835 836 def cmd_AUTHENTICATE(self, tag, args): 837 self._send_textline('+') 838 self.response = yield 839 840 if self.response == b'*\r\n': 841 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] aborted') 842 else: 843 self._send_tagged(tag, 'OK', 'MYAUTH successful') 844 845 with self.reaped_pair(MyServer) as (server, client): 846 with self.assertRaises(imaplib.IMAP4.error): 847 code, data = client.authenticate('MYAUTH', lambda x: None) 848 849 850 def test_linetoolong(self): 851 class TooLongHandler(SimpleIMAPHandler): 852 def handle(self): 853 # Send a very long response line 854 self.wfile.write(b'* OK ' + imaplib._MAXLINE * b'x' + b'\r\n') 855 856 with self.reaped_server(TooLongHandler) as server: 857 self.assertRaises(imaplib.IMAP4.error, 858 self.imap_class, *server.server_address) 859 860 @reap_threads 861 def test_simple_with_statement(self): 862 # simplest call 863 with self.reaped_server(SimpleIMAPHandler) as server: 864 with self.imap_class(*server.server_address): 865 pass 866 867 @reap_threads 868 def test_with_statement(self): 869 with self.reaped_server(SimpleIMAPHandler) as server: 870 with self.imap_class(*server.server_address) as imap: 871 imap.login('user', 'pass') 872 self.assertEqual(server.logged, 'user') 873 self.assertIsNone(server.logged) 874 875 @reap_threads 876 def test_with_statement_logout(self): 877 # what happens if already logout in the block? 878 with self.reaped_server(SimpleIMAPHandler) as server: 879 with self.imap_class(*server.server_address) as imap: 880 imap.login('user', 'pass') 881 self.assertEqual(server.logged, 'user') 882 imap.logout() 883 self.assertIsNone(server.logged) 884 self.assertIsNone(server.logged) 885 886 887@unittest.skipUnless(ssl, "SSL not available") 888class ThreadedNetworkedTestsSSL(ThreadedNetworkedTests): 889 server_class = SecureTCPServer 890 imap_class = IMAP4_SSL 891 892 @reap_threads 893 def test_ssl_verified(self): 894 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 895 ssl_context.load_verify_locations(CAFILE) 896 897 with self.assertRaisesRegex( 898 ssl.CertificateError, 899 "IP address mismatch, certificate is not valid for " 900 "'127.0.0.1'"): 901 with self.reaped_server(SimpleIMAPHandler) as server: 902 client = self.imap_class(*server.server_address, 903 ssl_context=ssl_context) 904 client.shutdown() 905 906 with self.reaped_server(SimpleIMAPHandler) as server: 907 client = self.imap_class("localhost", server.server_address[1], 908 ssl_context=ssl_context) 909 client.shutdown() 910 911 912@unittest.skipUnless( 913 support.is_resource_enabled('network'), 'network resource disabled') 914@unittest.skip('cyrus.andrew.cmu.edu blocks connections') 915class RemoteIMAPTest(unittest.TestCase): 916 host = 'cyrus.andrew.cmu.edu' 917 port = 143 918 username = 'anonymous' 919 password = 'pass' 920 imap_class = imaplib.IMAP4 921 922 def setUp(self): 923 with transient_internet(self.host): 924 self.server = self.imap_class(self.host, self.port) 925 926 def tearDown(self): 927 if self.server is not None: 928 with transient_internet(self.host): 929 self.server.logout() 930 931 def test_logincapa(self): 932 with transient_internet(self.host): 933 for cap in self.server.capabilities: 934 self.assertIsInstance(cap, str) 935 self.assertIn('LOGINDISABLED', self.server.capabilities) 936 self.assertIn('AUTH=ANONYMOUS', self.server.capabilities) 937 rs = self.server.login(self.username, self.password) 938 self.assertEqual(rs[0], 'OK') 939 940 def test_logout(self): 941 with transient_internet(self.host): 942 rs = self.server.logout() 943 self.server = None 944 self.assertEqual(rs[0], 'BYE', rs) 945 946 947@unittest.skipUnless(ssl, "SSL not available") 948@unittest.skipUnless( 949 support.is_resource_enabled('network'), 'network resource disabled') 950@unittest.skip('cyrus.andrew.cmu.edu blocks connections') 951class RemoteIMAP_STARTTLSTest(RemoteIMAPTest): 952 953 def setUp(self): 954 super().setUp() 955 with transient_internet(self.host): 956 rs = self.server.starttls() 957 self.assertEqual(rs[0], 'OK') 958 959 def test_logincapa(self): 960 for cap in self.server.capabilities: 961 self.assertIsInstance(cap, str) 962 self.assertNotIn('LOGINDISABLED', self.server.capabilities) 963 964 965@unittest.skipUnless(ssl, "SSL not available") 966@unittest.skip('cyrus.andrew.cmu.edu blocks connections') 967class RemoteIMAP_SSLTest(RemoteIMAPTest): 968 port = 993 969 imap_class = IMAP4_SSL 970 971 def setUp(self): 972 pass 973 974 def tearDown(self): 975 pass 976 977 def create_ssl_context(self): 978 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 979 ssl_context.check_hostname = False 980 ssl_context.verify_mode = ssl.CERT_NONE 981 ssl_context.load_cert_chain(CERTFILE) 982 return ssl_context 983 984 def check_logincapa(self, server): 985 try: 986 for cap in server.capabilities: 987 self.assertIsInstance(cap, str) 988 self.assertNotIn('LOGINDISABLED', server.capabilities) 989 self.assertIn('AUTH=PLAIN', server.capabilities) 990 rs = server.login(self.username, self.password) 991 self.assertEqual(rs[0], 'OK') 992 finally: 993 server.logout() 994 995 def test_logincapa(self): 996 with transient_internet(self.host): 997 _server = self.imap_class(self.host, self.port) 998 self.check_logincapa(_server) 999 1000 def test_logout(self): 1001 with transient_internet(self.host): 1002 _server = self.imap_class(self.host, self.port) 1003 rs = _server.logout() 1004 self.assertEqual(rs[0], 'BYE', rs) 1005 1006 def test_ssl_context_certfile_exclusive(self): 1007 with transient_internet(self.host): 1008 self.assertRaises( 1009 ValueError, self.imap_class, self.host, self.port, 1010 certfile=CERTFILE, ssl_context=self.create_ssl_context()) 1011 1012 def test_ssl_context_keyfile_exclusive(self): 1013 with transient_internet(self.host): 1014 self.assertRaises( 1015 ValueError, self.imap_class, self.host, self.port, 1016 keyfile=CERTFILE, ssl_context=self.create_ssl_context()) 1017 1018 1019if __name__ == "__main__": 1020 unittest.main() 1021