1from test import test_support as support 2# If we end up with a significant number of tests that don't require 3# threading, this test module should be split. Right now we skip 4# them all if we don't have threading. 5threading = support.import_module('threading') 6 7from contextlib import contextmanager 8import imaplib 9import os.path 10import SocketServer 11import time 12 13from test_support import reap_threads, verbose, transient_internet 14import unittest 15 16try: 17 import ssl 18except ImportError: 19 ssl = None 20 21CERTFILE = None 22 23 24class TestImaplib(unittest.TestCase): 25 26 def test_that_Time2Internaldate_returns_a_result(self): 27 # We can check only that it successfully produces a result, 28 # not the correctness of the result itself, since the result 29 # depends on the timezone the machine is in. 30 timevalues = [2000000000, 2000000000.0, time.localtime(2000000000), 31 '"18-May-2033 05:33:20 +0200"'] 32 33 for t in timevalues: 34 imaplib.Time2Internaldate(t) 35 36 37if ssl: 38 39 class SecureTCPServer(SocketServer.TCPServer): 40 41 def get_request(self): 42 newsocket, fromaddr = self.socket.accept() 43 connstream = ssl.wrap_socket(newsocket, 44 server_side=True, 45 certfile=CERTFILE) 46 return connstream, fromaddr 47 48 IMAP4_SSL = imaplib.IMAP4_SSL 49 50else: 51 52 class SecureTCPServer: 53 pass 54 55 IMAP4_SSL = None 56 57 58class SimpleIMAPHandler(SocketServer.StreamRequestHandler): 59 60 timeout = 1 61 62 def _send(self, message): 63 if verbose: print "SENT:", message.strip() 64 self.wfile.write(message) 65 66 def handle(self): 67 # Send a welcome message. 68 self._send('* OK IMAP4rev1\r\n') 69 while 1: 70 # Gather up input until we receive a line terminator or we timeout. 71 # Accumulate read(1) because it's simpler to handle the differences 72 # between naked sockets and SSL sockets. 73 line = '' 74 while 1: 75 try: 76 part = self.rfile.read(1) 77 if part == '': 78 # Naked sockets return empty strings.. 79 return 80 line += part 81 except IOError: 82 # ..but SSLSockets raise exceptions. 83 return 84 if line.endswith('\r\n'): 85 break 86 87 if verbose: print 'GOT:', line.strip() 88 splitline = line.split() 89 tag = splitline[0] 90 cmd = splitline[1] 91 args = splitline[2:] 92 93 if hasattr(self, 'cmd_%s' % (cmd,)): 94 getattr(self, 'cmd_%s' % (cmd,))(tag, args) 95 else: 96 self._send('%s BAD %s unknown\r\n' % (tag, cmd)) 97 98 def cmd_CAPABILITY(self, tag, args): 99 self._send('* CAPABILITY IMAP4rev1\r\n') 100 self._send('%s OK CAPABILITY completed\r\n' % (tag,)) 101 102 103class BaseThreadedNetworkedTests(unittest.TestCase): 104 105 def make_server(self, addr, hdlr): 106 107 class MyServer(self.server_class): 108 def handle_error(self, request, client_address): 109 self.close_request(request) 110 self.server_close() 111 raise 112 113 if verbose: print "creating server" 114 server = MyServer(addr, hdlr) 115 self.assertEqual(server.server_address, server.socket.getsockname()) 116 117 if verbose: 118 print "server created" 119 print "ADDR =", addr 120 print "CLASS =", self.server_class 121 print "HDLR =", server.RequestHandlerClass 122 123 t = threading.Thread( 124 name='%s serving' % self.server_class, 125 target=server.serve_forever, 126 # Short poll interval to make the test finish quickly. 127 # Time between requests is short enough that we won't wake 128 # up spuriously too many times. 129 kwargs={'poll_interval':0.01}) 130 t.daemon = True # In case this function raises. 131 t.start() 132 if verbose: print "server running" 133 return server, t 134 135 def reap_server(self, server, thread): 136 if verbose: print "waiting for server" 137 server.shutdown() 138 thread.join() 139 if verbose: print "done" 140 141 @contextmanager 142 def reaped_server(self, hdlr): 143 server, thread = self.make_server((support.HOST, 0), hdlr) 144 try: 145 yield server 146 finally: 147 self.reap_server(server, thread) 148 149 @reap_threads 150 def test_connect(self): 151 with self.reaped_server(SimpleIMAPHandler) as server: 152 client = self.imap_class(*server.server_address) 153 client.shutdown() 154 155 @reap_threads 156 def test_issue5949(self): 157 158 class EOFHandler(SocketServer.StreamRequestHandler): 159 def handle(self): 160 # EOF without sending a complete welcome message. 161 self.wfile.write('* OK') 162 163 with self.reaped_server(EOFHandler) as server: 164 self.assertRaises(imaplib.IMAP4.abort, 165 self.imap_class, *server.server_address) 166 167 168 def test_linetoolong(self): 169 class TooLongHandler(SimpleIMAPHandler): 170 def handle(self): 171 # Send a very long response line 172 self.wfile.write('* OK ' + imaplib._MAXLINE*'x' + '\r\n') 173 174 with self.reaped_server(TooLongHandler) as server: 175 self.assertRaises(imaplib.IMAP4.error, 176 self.imap_class, *server.server_address) 177 178class ThreadedNetworkedTests(BaseThreadedNetworkedTests): 179 180 server_class = SocketServer.TCPServer 181 imap_class = imaplib.IMAP4 182 183 184@unittest.skipUnless(ssl, "SSL not available") 185class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests): 186 187 server_class = SecureTCPServer 188 imap_class = IMAP4_SSL 189 190 def test_linetoolong(self): 191 raise unittest.SkipTest("test is not reliable on 2.7; see issue 20118") 192 193 194class RemoteIMAPTest(unittest.TestCase): 195 host = 'cyrus.andrew.cmu.edu' 196 port = 143 197 username = 'anonymous' 198 password = 'pass' 199 imap_class = imaplib.IMAP4 200 201 def setUp(self): 202 with transient_internet(self.host): 203 self.server = self.imap_class(self.host, self.port) 204 205 def tearDown(self): 206 if self.server is not None: 207 self.server.logout() 208 209 def test_logincapa(self): 210 self.assertTrue('LOGINDISABLED' in self.server.capabilities) 211 212 def test_anonlogin(self): 213 self.assertTrue('AUTH=ANONYMOUS' in self.server.capabilities) 214 rs = self.server.login(self.username, self.password) 215 self.assertEqual(rs[0], 'OK') 216 217 def test_logout(self): 218 rs = self.server.logout() 219 self.server = None 220 self.assertEqual(rs[0], 'BYE') 221 222 223@unittest.skipUnless(ssl, "SSL not available") 224class RemoteIMAP_SSLTest(RemoteIMAPTest): 225 port = 993 226 imap_class = IMAP4_SSL 227 228 def test_logincapa(self): 229 self.assertFalse('LOGINDISABLED' in self.server.capabilities) 230 self.assertTrue('AUTH=PLAIN' in self.server.capabilities) 231 232 233def test_main(): 234 tests = [TestImaplib] 235 236 if support.is_resource_enabled('network'): 237 if ssl: 238 global CERTFILE 239 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, 240 "keycert.pem") 241 if not os.path.exists(CERTFILE): 242 raise support.TestFailed("Can't read certificate files!") 243 tests.extend([ 244 ThreadedNetworkedTests, ThreadedNetworkedTestsSSL, 245 RemoteIMAPTest, RemoteIMAP_SSLTest, 246 ]) 247 248 support.run_unittest(*tests) 249 250 251if __name__ == "__main__": 252 test_main() 253