1from test import support 2# If we end up with a significant number of tests that don't require 3# threading, this test module should be split. Right now we skip 4# them all if we don't have threading. 5threading = support.import_module('threading') 6 7from contextlib import contextmanager 8import imaplib 9import os.path 10import socketserver 11import time 12import calendar 13import inspect 14 15from test.support import (reap_threads, verbose, transient_internet, 16 run_with_tz, run_with_locale) 17import unittest 18from unittest import mock 19from datetime import datetime, timezone, timedelta 20try: 21 import ssl 22except ImportError: 23 ssl = None 24 25CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem") 26CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem") 27 28 29class TestImaplib(unittest.TestCase): 30 31 def test_Internaldate2tuple(self): 32 t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1)) 33 tt = imaplib.Internaldate2tuple( 34 b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")') 35 self.assertEqual(time.mktime(tt), t0) 36 tt = imaplib.Internaldate2tuple( 37 b'25 (INTERNALDATE "01-Jan-2000 11:30:00 +1130")') 38 self.assertEqual(time.mktime(tt), t0) 39 tt = imaplib.Internaldate2tuple( 40 b'25 (INTERNALDATE "31-Dec-1999 12:30:00 -1130")') 41 self.assertEqual(time.mktime(tt), t0) 42 43 @run_with_tz('MST+07MDT,M4.1.0,M10.5.0') 44 def test_Internaldate2tuple_issue10941(self): 45 self.assertNotEqual(imaplib.Internaldate2tuple( 46 b'25 (INTERNALDATE "02-Apr-2000 02:30:00 +0000")'), 47 imaplib.Internaldate2tuple( 48 b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")')) 49 50 def timevalues(self): 51 return [2000000000, 2000000000.0, time.localtime(2000000000), 52 (2033, 5, 18, 5, 33, 20, -1, -1, -1), 53 (2033, 5, 18, 5, 33, 20, -1, -1, 1), 54 datetime.fromtimestamp(2000000000, 55 timezone(timedelta(0, 2 * 60 * 60))), 56 '"18-May-2033 05:33:20 +0200"'] 57 58 @run_with_locale('LC_ALL', 'de_DE', 'fr_FR') 59 # DST rules included to work around quirk where the Gnu C library may not 60 # otherwise restore the previous time zone 61 @run_with_tz('STD-1DST,M3.2.0,M11.1.0') 62 def test_Time2Internaldate(self): 63 expected = '"18-May-2033 05:33:20 +0200"' 64 65 for t in self.timevalues(): 66 internal = imaplib.Time2Internaldate(t) 67 self.assertEqual(internal, expected) 68 69 def test_that_Time2Internaldate_returns_a_result(self): 70 # Without tzset, we can check only that it successfully 71 # produces a result, not the correctness of the result itself, 72 # since the result depends on the timezone the machine is in. 73 for t in self.timevalues(): 74 imaplib.Time2Internaldate(t) 75 76 77if ssl: 78 class SecureTCPServer(socketserver.TCPServer): 79 80 def get_request(self): 81 newsocket, fromaddr = self.socket.accept() 82 context = ssl.SSLContext() 83 context.load_cert_chain(CERTFILE) 84 connstream = context.wrap_socket(newsocket, server_side=True) 85 return connstream, fromaddr 86 87 IMAP4_SSL = imaplib.IMAP4_SSL 88 89else: 90 91 class SecureTCPServer: 92 pass 93 94 IMAP4_SSL = None 95 96 97class SimpleIMAPHandler(socketserver.StreamRequestHandler): 98 timeout = 1 99 continuation = None 100 capabilities = '' 101 102 def setup(self): 103 super().setup() 104 self.server.logged = None 105 106 def _send(self, message): 107 if verbose: 108 print("SENT: %r" % message.strip()) 109 self.wfile.write(message) 110 111 def _send_line(self, message): 112 self._send(message + b'\r\n') 113 114 def _send_textline(self, message): 115 self._send_line(message.encode('ASCII')) 116 117 def _send_tagged(self, tag, code, message): 118 self._send_textline(' '.join((tag, code, message))) 119 120 def handle(self): 121 # Send a welcome message. 122 self._send_textline('* OK IMAP4rev1') 123 while 1: 124 # Gather up input until we receive a line terminator or we timeout. 125 # Accumulate read(1) because it's simpler to handle the differences 126 # between naked sockets and SSL sockets. 127 line = b'' 128 while 1: 129 try: 130 part = self.rfile.read(1) 131 if part == b'': 132 # Naked sockets return empty strings.. 133 return 134 line += part 135 except OSError: 136 # ..but SSLSockets raise exceptions. 137 return 138 if line.endswith(b'\r\n'): 139 break 140 141 if verbose: 142 print('GOT: %r' % line.strip()) 143 if self.continuation: 144 try: 145 self.continuation.send(line) 146 except StopIteration: 147 self.continuation = None 148 continue 149 splitline = line.decode('ASCII').split() 150 tag = splitline[0] 151 cmd = splitline[1] 152 args = splitline[2:] 153 154 if hasattr(self, 'cmd_' + cmd): 155 continuation = getattr(self, 'cmd_' + cmd)(tag, args) 156 if continuation: 157 self.continuation = continuation 158 next(continuation) 159 else: 160 self._send_tagged(tag, 'BAD', cmd + ' unknown') 161 162 def cmd_CAPABILITY(self, tag, args): 163 caps = ('IMAP4rev1 ' + self.capabilities 164 if self.capabilities 165 else 'IMAP4rev1') 166 self._send_textline('* CAPABILITY ' + caps) 167 self._send_tagged(tag, 'OK', 'CAPABILITY completed') 168 169 def cmd_LOGOUT(self, tag, args): 170 self.server.logged = None 171 self._send_textline('* BYE IMAP4ref1 Server logging out') 172 self._send_tagged(tag, 'OK', 'LOGOUT completed') 173 174 def cmd_LOGIN(self, tag, args): 175 self.server.logged = args[0] 176 self._send_tagged(tag, 'OK', 'LOGIN completed') 177 178 179class NewIMAPTestsMixin(): 180 client = None 181 182 def _setup(self, imap_handler, connect=True): 183 """ 184 Sets up imap_handler for tests. imap_handler should inherit from either: 185 - SimpleIMAPHandler - for testing IMAP commands, 186 - socketserver.StreamRequestHandler - if raw access to stream is needed. 187 Returns (client, server). 188 """ 189 class TestTCPServer(self.server_class): 190 def handle_error(self, request, client_address): 191 """ 192 End request and raise the error if one occurs. 193 """ 194 self.close_request(request) 195 self.server_close() 196 raise 197 198 self.addCleanup(self._cleanup) 199 self.server = self.server_class((support.HOST, 0), imap_handler) 200 self.thread = threading.Thread( 201 name=self._testMethodName+'-server', 202 target=self.server.serve_forever, 203 # Short poll interval to make the test finish quickly. 204 # Time between requests is short enough that we won't wake 205 # up spuriously too many times. 206 kwargs={'poll_interval': 0.01}) 207 self.thread.daemon = True # In case this function raises. 208 self.thread.start() 209 210 if connect: 211 self.client = self.imap_class(*self.server.server_address) 212 213 return self.client, self.server 214 215 def _cleanup(self): 216 """ 217 Cleans up the test server. This method should not be called manually, 218 it is added to the cleanup queue in the _setup method already. 219 """ 220 # if logout was called already we'd raise an exception trying to 221 # shutdown the client once again 222 if self.client is not None and self.client.state != 'LOGOUT': 223 self.client.shutdown() 224 # cleanup the server 225 self.server.shutdown() 226 self.server.server_close() 227 self.thread.join(3.0) 228 229 def test_EOF_without_complete_welcome_message(self): 230 # http://bugs.python.org/issue5949 231 class EOFHandler(socketserver.StreamRequestHandler): 232 def handle(self): 233 self.wfile.write(b'* OK') 234 _, server = self._setup(EOFHandler, connect=False) 235 self.assertRaises(imaplib.IMAP4.abort, self.imap_class, 236 *server.server_address) 237 238 def test_line_termination(self): 239 class BadNewlineHandler(SimpleIMAPHandler): 240 def cmd_CAPABILITY(self, tag, args): 241 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') 242 self._send_tagged(tag, 'OK', 'CAPABILITY completed') 243 _, server = self._setup(BadNewlineHandler, connect=False) 244 self.assertRaises(imaplib.IMAP4.abort, self.imap_class, 245 *server.server_address) 246 247 def test_enable_raises_error_if_not_AUTH(self): 248 class EnableHandler(SimpleIMAPHandler): 249 capabilities = 'AUTH ENABLE UTF8=ACCEPT' 250 client, _ = self._setup(EnableHandler) 251 self.assertFalse(client.utf8_enabled) 252 with self.assertRaisesRegex(imaplib.IMAP4.error, 'ENABLE.*NONAUTH'): 253 client.enable('foo') 254 self.assertFalse(client.utf8_enabled) 255 256 def test_enable_raises_error_if_no_capability(self): 257 client, _ = self._setup(SimpleIMAPHandler) 258 with self.assertRaisesRegex(imaplib.IMAP4.error, 259 'does not support ENABLE'): 260 client.enable('foo') 261 262 def test_enable_UTF8_raises_error_if_not_supported(self): 263 client, _ = self._setup(SimpleIMAPHandler) 264 typ, data = client.login('user', 'pass') 265 self.assertEqual(typ, 'OK') 266 with self.assertRaisesRegex(imaplib.IMAP4.error, 267 'does not support ENABLE'): 268 client.enable('UTF8=ACCEPT') 269 270 def test_enable_UTF8_True_append(self): 271 class UTF8AppendServer(SimpleIMAPHandler): 272 capabilities = 'ENABLE UTF8=ACCEPT' 273 def cmd_ENABLE(self, tag, args): 274 self._send_tagged(tag, 'OK', 'ENABLE successful') 275 def cmd_AUTHENTICATE(self, tag, args): 276 self._send_textline('+') 277 self.server.response = yield 278 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 279 def cmd_APPEND(self, tag, args): 280 self._send_textline('+') 281 self.server.response = yield 282 self._send_tagged(tag, 'OK', 'okay') 283 client, server = self._setup(UTF8AppendServer) 284 self.assertEqual(client._encoding, 'ascii') 285 code, _ = client.authenticate('MYAUTH', lambda x: b'fake') 286 self.assertEqual(code, 'OK') 287 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' 288 code, _ = client.enable('UTF8=ACCEPT') 289 self.assertEqual(code, 'OK') 290 self.assertEqual(client._encoding, 'utf-8') 291 msg_string = 'Subject: üñí©öðé' 292 typ, data = client.append(None, None, None, msg_string.encode('utf-8')) 293 self.assertEqual(typ, 'OK') 294 self.assertEqual(server.response, 295 ('UTF8 (%s)\r\n' % msg_string).encode('utf-8')) 296 297 def test_search_disallows_charset_in_utf8_mode(self): 298 class UTF8Server(SimpleIMAPHandler): 299 capabilities = 'AUTH ENABLE UTF8=ACCEPT' 300 def cmd_ENABLE(self, tag, args): 301 self._send_tagged(tag, 'OK', 'ENABLE successful') 302 def cmd_AUTHENTICATE(self, tag, args): 303 self._send_textline('+') 304 self.server.response = yield 305 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 306 client, _ = self._setup(UTF8Server) 307 typ, _ = client.authenticate('MYAUTH', lambda x: b'fake') 308 self.assertEqual(typ, 'OK') 309 typ, _ = client.enable('UTF8=ACCEPT') 310 self.assertEqual(typ, 'OK') 311 self.assertTrue(client.utf8_enabled) 312 with self.assertRaisesRegex(imaplib.IMAP4.error, 'charset.*UTF8'): 313 client.search('foo', 'bar') 314 315 def test_bad_auth_name(self): 316 class MyServer(SimpleIMAPHandler): 317 def cmd_AUTHENTICATE(self, tag, args): 318 self._send_tagged(tag, 'NO', 319 'unrecognized authentication type {}'.format(args[0])) 320 client, _ = self._setup(MyServer) 321 with self.assertRaisesRegex(imaplib.IMAP4.error, 322 'unrecognized authentication type METHOD'): 323 client.authenticate('METHOD', lambda: 1) 324 325 def test_invalid_authentication(self): 326 class MyServer(SimpleIMAPHandler): 327 def cmd_AUTHENTICATE(self, tag, args): 328 self._send_textline('+') 329 self.response = yield 330 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid') 331 client, _ = self._setup(MyServer) 332 with self.assertRaisesRegex(imaplib.IMAP4.error, 333 r'\[AUTHENTICATIONFAILED\] invalid'): 334 client.authenticate('MYAUTH', lambda x: b'fake') 335 336 def test_valid_authentication_bytes(self): 337 class MyServer(SimpleIMAPHandler): 338 def cmd_AUTHENTICATE(self, tag, args): 339 self._send_textline('+') 340 self.server.response = yield 341 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 342 client, server = self._setup(MyServer) 343 code, _ = client.authenticate('MYAUTH', lambda x: b'fake') 344 self.assertEqual(code, 'OK') 345 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' 346 347 def test_valid_authentication_plain_text(self): 348 class MyServer(SimpleIMAPHandler): 349 def cmd_AUTHENTICATE(self, tag, args): 350 self._send_textline('+') 351 self.server.response = yield 352 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 353 client, server = self._setup(MyServer) 354 code, _ = client.authenticate('MYAUTH', lambda x: 'fake') 355 self.assertEqual(code, 'OK') 356 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' 357 358 def test_login_cram_md5_bytes(self): 359 class AuthHandler(SimpleIMAPHandler): 360 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' 361 def cmd_AUTHENTICATE(self, tag, args): 362 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' 363 'VzdG9uLm1jaS5uZXQ=') 364 r = yield 365 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' 366 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): 367 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') 368 else: 369 self._send_tagged(tag, 'NO', 'No access') 370 client, _ = self._setup(AuthHandler) 371 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 372 ret, _ = client.login_cram_md5("tim", b"tanstaaftanstaaf") 373 self.assertEqual(ret, "OK") 374 375 def test_login_cram_md5_plain_text(self): 376 class AuthHandler(SimpleIMAPHandler): 377 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' 378 def cmd_AUTHENTICATE(self, tag, args): 379 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' 380 'VzdG9uLm1jaS5uZXQ=') 381 r = yield 382 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' 383 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): 384 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') 385 else: 386 self._send_tagged(tag, 'NO', 'No access') 387 client, _ = self._setup(AuthHandler) 388 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 389 ret, _ = client.login_cram_md5("tim", "tanstaaftanstaaf") 390 self.assertEqual(ret, "OK") 391 392 def test_aborted_authentication(self): 393 class MyServer(SimpleIMAPHandler): 394 def cmd_AUTHENTICATE(self, tag, args): 395 self._send_textline('+') 396 self.response = yield 397 if self.response == b'*\r\n': 398 self._send_tagged( 399 tag, 400 'NO', 401 '[AUTHENTICATIONFAILED] aborted') 402 else: 403 self._send_tagged(tag, 'OK', 'MYAUTH successful') 404 client, _ = self._setup(MyServer) 405 with self.assertRaisesRegex(imaplib.IMAP4.error, 406 r'\[AUTHENTICATIONFAILED\] aborted'): 407 client.authenticate('MYAUTH', lambda x: None) 408 409 @mock.patch('imaplib._MAXLINE', 10) 410 def test_linetoolong(self): 411 class TooLongHandler(SimpleIMAPHandler): 412 def handle(self): 413 # send response line longer than the limit set in the next line 414 self.wfile.write(b'* OK ' + 11 * b'x' + b'\r\n') 415 _, server = self._setup(TooLongHandler, connect=False) 416 with self.assertRaisesRegex(imaplib.IMAP4.error, 417 'got more than 10 bytes'): 418 self.imap_class(*server.server_address) 419 420 def test_simple_with_statement(self): 421 _, server = self._setup(SimpleIMAPHandler, connect=False) 422 with self.imap_class(*server.server_address): 423 pass 424 425 def test_with_statement(self): 426 _, server = self._setup(SimpleIMAPHandler, connect=False) 427 with self.imap_class(*server.server_address) as imap: 428 imap.login('user', 'pass') 429 self.assertEqual(server.logged, 'user') 430 self.assertIsNone(server.logged) 431 432 def test_with_statement_logout(self): 433 # It is legal to log out explicitly inside the with block 434 _, server = self._setup(SimpleIMAPHandler, connect=False) 435 with self.imap_class(*server.server_address) as imap: 436 imap.login('user', 'pass') 437 self.assertEqual(server.logged, 'user') 438 imap.logout() 439 self.assertIsNone(server.logged) 440 self.assertIsNone(server.logged) 441 442 # command tests 443 444 def test_login(self): 445 client, _ = self._setup(SimpleIMAPHandler) 446 typ, data = client.login('user', 'pass') 447 self.assertEqual(typ, 'OK') 448 self.assertEqual(data[0], b'LOGIN completed') 449 self.assertEqual(client.state, 'AUTH') 450 451 def test_logout(self): 452 client, _ = self._setup(SimpleIMAPHandler) 453 typ, data = client.login('user', 'pass') 454 self.assertEqual(typ, 'OK') 455 self.assertEqual(data[0], b'LOGIN completed') 456 typ, data = client.logout() 457 self.assertEqual(typ, 'BYE') 458 self.assertEqual(data[0], b'IMAP4ref1 Server logging out') 459 self.assertEqual(client.state, 'LOGOUT') 460 461 def test_lsub(self): 462 class LsubCmd(SimpleIMAPHandler): 463 def cmd_LSUB(self, tag, args): 464 self._send_textline('* LSUB () "." directoryA') 465 return self._send_tagged(tag, 'OK', 'LSUB completed') 466 client, _ = self._setup(LsubCmd) 467 client.login('user', 'pass') 468 typ, data = client.lsub() 469 self.assertEqual(typ, 'OK') 470 self.assertEqual(data[0], b'() "." directoryA') 471 472 473class NewIMAPTests(NewIMAPTestsMixin, unittest.TestCase): 474 imap_class = imaplib.IMAP4 475 server_class = socketserver.TCPServer 476 477 478@unittest.skipUnless(ssl, "SSL not available") 479class NewIMAPSSLTests(NewIMAPTestsMixin, unittest.TestCase): 480 imap_class = IMAP4_SSL 481 server_class = SecureTCPServer 482 483 def test_ssl_raises(self): 484 ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 485 ssl_context.verify_mode = ssl.CERT_REQUIRED 486 ssl_context.check_hostname = True 487 ssl_context.load_verify_locations(CAFILE) 488 489 with self.assertRaisesRegex(ssl.CertificateError, 490 "hostname '127.0.0.1' doesn't match 'localhost'"): 491 _, server = self._setup(SimpleIMAPHandler) 492 client = self.imap_class(*server.server_address, 493 ssl_context=ssl_context) 494 client.shutdown() 495 496 def test_ssl_verified(self): 497 ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 498 ssl_context.verify_mode = ssl.CERT_REQUIRED 499 ssl_context.check_hostname = True 500 ssl_context.load_verify_locations(CAFILE) 501 502 _, server = self._setup(SimpleIMAPHandler) 503 client = self.imap_class("localhost", server.server_address[1], 504 ssl_context=ssl_context) 505 client.shutdown() 506 507class ThreadedNetworkedTests(unittest.TestCase): 508 server_class = socketserver.TCPServer 509 imap_class = imaplib.IMAP4 510 511 def make_server(self, addr, hdlr): 512 513 class MyServer(self.server_class): 514 def handle_error(self, request, client_address): 515 self.close_request(request) 516 self.server_close() 517 raise 518 519 if verbose: 520 print("creating server") 521 server = MyServer(addr, hdlr) 522 self.assertEqual(server.server_address, server.socket.getsockname()) 523 524 if verbose: 525 print("server created") 526 print("ADDR =", addr) 527 print("CLASS =", self.server_class) 528 print("HDLR =", server.RequestHandlerClass) 529 530 t = threading.Thread( 531 name='%s serving' % self.server_class, 532 target=server.serve_forever, 533 # Short poll interval to make the test finish quickly. 534 # Time between requests is short enough that we won't wake 535 # up spuriously too many times. 536 kwargs={'poll_interval': 0.01}) 537 t.daemon = True # In case this function raises. 538 t.start() 539 if verbose: 540 print("server running") 541 return server, t 542 543 def reap_server(self, server, thread): 544 if verbose: 545 print("waiting for server") 546 server.shutdown() 547 server.server_close() 548 thread.join() 549 if verbose: 550 print("done") 551 552 @contextmanager 553 def reaped_server(self, hdlr): 554 server, thread = self.make_server((support.HOST, 0), hdlr) 555 try: 556 yield server 557 finally: 558 self.reap_server(server, thread) 559 560 @contextmanager 561 def reaped_pair(self, hdlr): 562 with self.reaped_server(hdlr) as server: 563 client = self.imap_class(*server.server_address) 564 try: 565 yield server, client 566 finally: 567 client.logout() 568 569 @reap_threads 570 def test_connect(self): 571 with self.reaped_server(SimpleIMAPHandler) as server: 572 client = self.imap_class(*server.server_address) 573 client.shutdown() 574 575 @reap_threads 576 def test_bracket_flags(self): 577 578 # This violates RFC 3501, which disallows ']' characters in tag names, 579 # but imaplib has allowed producing such tags forever, other programs 580 # also produce them (eg: OtherInbox's Organizer app as of 20140716), 581 # and Gmail, for example, accepts them and produces them. So we 582 # support them. See issue #21815. 583 584 class BracketFlagHandler(SimpleIMAPHandler): 585 586 def handle(self): 587 self.flags = ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft'] 588 super().handle() 589 590 def cmd_AUTHENTICATE(self, tag, args): 591 self._send_textline('+') 592 self.server.response = yield 593 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 594 595 def cmd_SELECT(self, tag, args): 596 flag_msg = ' \\'.join(self.flags) 597 self._send_line(('* FLAGS (%s)' % flag_msg).encode('ascii')) 598 self._send_line(b'* 2 EXISTS') 599 self._send_line(b'* 0 RECENT') 600 msg = ('* OK [PERMANENTFLAGS %s \\*)] Flags permitted.' 601 % flag_msg) 602 self._send_line(msg.encode('ascii')) 603 self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') 604 605 def cmd_STORE(self, tag, args): 606 new_flags = args[2].strip('(').strip(')').split() 607 self.flags.extend(new_flags) 608 flags_msg = '(FLAGS (%s))' % ' \\'.join(self.flags) 609 msg = '* %s FETCH %s' % (args[0], flags_msg) 610 self._send_line(msg.encode('ascii')) 611 self._send_tagged(tag, 'OK', 'STORE completed.') 612 613 with self.reaped_pair(BracketFlagHandler) as (server, client): 614 code, data = client.authenticate('MYAUTH', lambda x: b'fake') 615 self.assertEqual(code, 'OK') 616 self.assertEqual(server.response, b'ZmFrZQ==\r\n') 617 client.select('test') 618 typ, [data] = client.store(b'1', "+FLAGS", "[test]") 619 self.assertIn(b'[test]', data) 620 client.select('test') 621 typ, [data] = client.response('PERMANENTFLAGS') 622 self.assertIn(b'[test]', data) 623 624 @reap_threads 625 def test_issue5949(self): 626 627 class EOFHandler(socketserver.StreamRequestHandler): 628 def handle(self): 629 # EOF without sending a complete welcome message. 630 self.wfile.write(b'* OK') 631 632 with self.reaped_server(EOFHandler) as server: 633 self.assertRaises(imaplib.IMAP4.abort, 634 self.imap_class, *server.server_address) 635 636 @reap_threads 637 def test_line_termination(self): 638 639 class BadNewlineHandler(SimpleIMAPHandler): 640 641 def cmd_CAPABILITY(self, tag, args): 642 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') 643 self._send_tagged(tag, 'OK', 'CAPABILITY completed') 644 645 with self.reaped_server(BadNewlineHandler) as server: 646 self.assertRaises(imaplib.IMAP4.abort, 647 self.imap_class, *server.server_address) 648 649 class UTF8Server(SimpleIMAPHandler): 650 capabilities = 'AUTH ENABLE UTF8=ACCEPT' 651 652 def cmd_ENABLE(self, tag, args): 653 self._send_tagged(tag, 'OK', 'ENABLE successful') 654 655 def cmd_AUTHENTICATE(self, tag, args): 656 self._send_textline('+') 657 self.server.response = yield 658 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 659 660 @reap_threads 661 def test_enable_raises_error_if_not_AUTH(self): 662 with self.reaped_pair(self.UTF8Server) as (server, client): 663 self.assertFalse(client.utf8_enabled) 664 self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo') 665 self.assertFalse(client.utf8_enabled) 666 667 # XXX Also need a test that enable after SELECT raises an error. 668 669 @reap_threads 670 def test_enable_raises_error_if_no_capability(self): 671 class NoEnableServer(self.UTF8Server): 672 capabilities = 'AUTH' 673 with self.reaped_pair(NoEnableServer) as (server, client): 674 self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo') 675 676 @reap_threads 677 def test_enable_UTF8_raises_error_if_not_supported(self): 678 class NonUTF8Server(SimpleIMAPHandler): 679 pass 680 with self.assertRaises(imaplib.IMAP4.error): 681 with self.reaped_pair(NonUTF8Server) as (server, client): 682 typ, data = client.login('user', 'pass') 683 self.assertEqual(typ, 'OK') 684 client.enable('UTF8=ACCEPT') 685 pass 686 687 @reap_threads 688 def test_enable_UTF8_True_append(self): 689 690 class UTF8AppendServer(self.UTF8Server): 691 def cmd_APPEND(self, tag, args): 692 self._send_textline('+') 693 self.server.response = yield 694 self._send_tagged(tag, 'OK', 'okay') 695 696 with self.reaped_pair(UTF8AppendServer) as (server, client): 697 self.assertEqual(client._encoding, 'ascii') 698 code, _ = client.authenticate('MYAUTH', lambda x: b'fake') 699 self.assertEqual(code, 'OK') 700 self.assertEqual(server.response, 701 b'ZmFrZQ==\r\n') # b64 encoded 'fake' 702 code, _ = client.enable('UTF8=ACCEPT') 703 self.assertEqual(code, 'OK') 704 self.assertEqual(client._encoding, 'utf-8') 705 msg_string = 'Subject: üñí©öðé' 706 typ, data = client.append( 707 None, None, None, msg_string.encode('utf-8')) 708 self.assertEqual(typ, 'OK') 709 self.assertEqual( 710 server.response, 711 ('UTF8 (%s)\r\n' % msg_string).encode('utf-8') 712 ) 713 714 # XXX also need a test that makes sure that the Literal and Untagged_status 715 # regexes uses unicode in UTF8 mode instead of the default ASCII. 716 717 @reap_threads 718 def test_search_disallows_charset_in_utf8_mode(self): 719 with self.reaped_pair(self.UTF8Server) as (server, client): 720 typ, _ = client.authenticate('MYAUTH', lambda x: b'fake') 721 self.assertEqual(typ, 'OK') 722 typ, _ = client.enable('UTF8=ACCEPT') 723 self.assertEqual(typ, 'OK') 724 self.assertTrue(client.utf8_enabled) 725 self.assertRaises(imaplib.IMAP4.error, client.search, 'foo', 'bar') 726 727 @reap_threads 728 def test_bad_auth_name(self): 729 730 class MyServer(SimpleIMAPHandler): 731 732 def cmd_AUTHENTICATE(self, tag, args): 733 self._send_tagged(tag, 'NO', 'unrecognized authentication ' 734 'type {}'.format(args[0])) 735 736 with self.reaped_pair(MyServer) as (server, client): 737 with self.assertRaises(imaplib.IMAP4.error): 738 client.authenticate('METHOD', lambda: 1) 739 740 @reap_threads 741 def test_invalid_authentication(self): 742 743 class MyServer(SimpleIMAPHandler): 744 745 def cmd_AUTHENTICATE(self, tag, args): 746 self._send_textline('+') 747 self.response = yield 748 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid') 749 750 with self.reaped_pair(MyServer) as (server, client): 751 with self.assertRaises(imaplib.IMAP4.error): 752 code, data = client.authenticate('MYAUTH', lambda x: b'fake') 753 754 @reap_threads 755 def test_valid_authentication(self): 756 757 class MyServer(SimpleIMAPHandler): 758 759 def cmd_AUTHENTICATE(self, tag, args): 760 self._send_textline('+') 761 self.server.response = yield 762 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 763 764 with self.reaped_pair(MyServer) as (server, client): 765 code, data = client.authenticate('MYAUTH', lambda x: b'fake') 766 self.assertEqual(code, 'OK') 767 self.assertEqual(server.response, 768 b'ZmFrZQ==\r\n') # b64 encoded 'fake' 769 770 with self.reaped_pair(MyServer) as (server, client): 771 code, data = client.authenticate('MYAUTH', lambda x: 'fake') 772 self.assertEqual(code, 'OK') 773 self.assertEqual(server.response, 774 b'ZmFrZQ==\r\n') # b64 encoded 'fake' 775 776 @reap_threads 777 def test_login_cram_md5(self): 778 779 class AuthHandler(SimpleIMAPHandler): 780 781 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' 782 783 def cmd_AUTHENTICATE(self, tag, args): 784 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' 785 'VzdG9uLm1jaS5uZXQ=') 786 r = yield 787 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' 788 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): 789 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') 790 else: 791 self._send_tagged(tag, 'NO', 'No access') 792 793 with self.reaped_pair(AuthHandler) as (server, client): 794 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 795 ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf") 796 self.assertEqual(ret, "OK") 797 798 with self.reaped_pair(AuthHandler) as (server, client): 799 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 800 ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf") 801 self.assertEqual(ret, "OK") 802 803 804 @reap_threads 805 def test_aborted_authentication(self): 806 807 class MyServer(SimpleIMAPHandler): 808 809 def cmd_AUTHENTICATE(self, tag, args): 810 self._send_textline('+') 811 self.response = yield 812 813 if self.response == b'*\r\n': 814 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] aborted') 815 else: 816 self._send_tagged(tag, 'OK', 'MYAUTH successful') 817 818 with self.reaped_pair(MyServer) as (server, client): 819 with self.assertRaises(imaplib.IMAP4.error): 820 code, data = client.authenticate('MYAUTH', lambda x: None) 821 822 823 def test_linetoolong(self): 824 class TooLongHandler(SimpleIMAPHandler): 825 def handle(self): 826 # Send a very long response line 827 self.wfile.write(b'* OK ' + imaplib._MAXLINE * b'x' + b'\r\n') 828 829 with self.reaped_server(TooLongHandler) as server: 830 self.assertRaises(imaplib.IMAP4.error, 831 self.imap_class, *server.server_address) 832 833 @reap_threads 834 def test_simple_with_statement(self): 835 # simplest call 836 with self.reaped_server(SimpleIMAPHandler) as server: 837 with self.imap_class(*server.server_address): 838 pass 839 840 @reap_threads 841 def test_with_statement(self): 842 with self.reaped_server(SimpleIMAPHandler) as server: 843 with self.imap_class(*server.server_address) as imap: 844 imap.login('user', 'pass') 845 self.assertEqual(server.logged, 'user') 846 self.assertIsNone(server.logged) 847 848 @reap_threads 849 def test_with_statement_logout(self): 850 # what happens if already logout in the block? 851 with self.reaped_server(SimpleIMAPHandler) as server: 852 with self.imap_class(*server.server_address) as imap: 853 imap.login('user', 'pass') 854 self.assertEqual(server.logged, 'user') 855 imap.logout() 856 self.assertIsNone(server.logged) 857 self.assertIsNone(server.logged) 858 859 860@unittest.skipUnless(ssl, "SSL not available") 861class ThreadedNetworkedTestsSSL(ThreadedNetworkedTests): 862 server_class = SecureTCPServer 863 imap_class = IMAP4_SSL 864 865 @reap_threads 866 def test_ssl_verified(self): 867 ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 868 ssl_context.verify_mode = ssl.CERT_REQUIRED 869 ssl_context.check_hostname = True 870 ssl_context.load_verify_locations(CAFILE) 871 872 with self.assertRaisesRegex( 873 ssl.CertificateError, 874 "hostname '127.0.0.1' doesn't match 'localhost'"): 875 with self.reaped_server(SimpleIMAPHandler) as server: 876 client = self.imap_class(*server.server_address, 877 ssl_context=ssl_context) 878 client.shutdown() 879 880 with self.reaped_server(SimpleIMAPHandler) as server: 881 client = self.imap_class("localhost", server.server_address[1], 882 ssl_context=ssl_context) 883 client.shutdown() 884 885 886@unittest.skipUnless( 887 support.is_resource_enabled('network'), 'network resource disabled') 888class RemoteIMAPTest(unittest.TestCase): 889 host = 'cyrus.andrew.cmu.edu' 890 port = 143 891 username = 'anonymous' 892 password = 'pass' 893 imap_class = imaplib.IMAP4 894 895 def setUp(self): 896 with transient_internet(self.host): 897 self.server = self.imap_class(self.host, self.port) 898 899 def tearDown(self): 900 if self.server is not None: 901 with transient_internet(self.host): 902 self.server.logout() 903 904 def test_logincapa(self): 905 with transient_internet(self.host): 906 for cap in self.server.capabilities: 907 self.assertIsInstance(cap, str) 908 self.assertIn('LOGINDISABLED', self.server.capabilities) 909 self.assertIn('AUTH=ANONYMOUS', self.server.capabilities) 910 rs = self.server.login(self.username, self.password) 911 self.assertEqual(rs[0], 'OK') 912 913 def test_logout(self): 914 with transient_internet(self.host): 915 rs = self.server.logout() 916 self.server = None 917 self.assertEqual(rs[0], 'BYE') 918 919 920@unittest.skipUnless(ssl, "SSL not available") 921@unittest.skipUnless( 922 support.is_resource_enabled('network'), 'network resource disabled') 923class RemoteIMAP_STARTTLSTest(RemoteIMAPTest): 924 925 def setUp(self): 926 super().setUp() 927 with transient_internet(self.host): 928 rs = self.server.starttls() 929 self.assertEqual(rs[0], 'OK') 930 931 def test_logincapa(self): 932 for cap in self.server.capabilities: 933 self.assertIsInstance(cap, str) 934 self.assertNotIn('LOGINDISABLED', self.server.capabilities) 935 936 937@unittest.skipUnless(ssl, "SSL not available") 938class RemoteIMAP_SSLTest(RemoteIMAPTest): 939 port = 993 940 imap_class = IMAP4_SSL 941 942 def setUp(self): 943 pass 944 945 def tearDown(self): 946 pass 947 948 def create_ssl_context(self): 949 ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 950 ssl_context.load_cert_chain(CERTFILE) 951 return ssl_context 952 953 def check_logincapa(self, server): 954 try: 955 for cap in server.capabilities: 956 self.assertIsInstance(cap, str) 957 self.assertNotIn('LOGINDISABLED', server.capabilities) 958 self.assertIn('AUTH=PLAIN', server.capabilities) 959 rs = server.login(self.username, self.password) 960 self.assertEqual(rs[0], 'OK') 961 finally: 962 server.logout() 963 964 def test_logincapa(self): 965 with transient_internet(self.host): 966 _server = self.imap_class(self.host, self.port) 967 self.check_logincapa(_server) 968 969 def test_logincapa_with_client_certfile(self): 970 with transient_internet(self.host): 971 with support.check_warnings(('', DeprecationWarning)): 972 _server = self.imap_class(self.host, self.port, 973 certfile=CERTFILE) 974 self.check_logincapa(_server) 975 976 def test_logincapa_with_client_ssl_context(self): 977 with transient_internet(self.host): 978 _server = self.imap_class( 979 self.host, self.port, ssl_context=self.create_ssl_context()) 980 self.check_logincapa(_server) 981 982 def test_logout(self): 983 with transient_internet(self.host): 984 _server = self.imap_class(self.host, self.port) 985 rs = _server.logout() 986 self.assertEqual(rs[0], 'BYE') 987 988 def test_ssl_context_certfile_exclusive(self): 989 with transient_internet(self.host): 990 self.assertRaises( 991 ValueError, self.imap_class, self.host, self.port, 992 certfile=CERTFILE, ssl_context=self.create_ssl_context()) 993 994 def test_ssl_context_keyfile_exclusive(self): 995 with transient_internet(self.host): 996 self.assertRaises( 997 ValueError, self.imap_class, self.host, self.port, 998 keyfile=CERTFILE, ssl_context=self.create_ssl_context()) 999 1000 1001if __name__ == "__main__": 1002 unittest.main() 1003