1import asyncore 2import base64 3import email.mime.text 4from email.message import EmailMessage 5from email.base64mime import body_encode as encode_base64 6import email.utils 7import hmac 8import socket 9import smtpd 10import smtplib 11import io 12import re 13import sys 14import time 15import select 16import errno 17import textwrap 18 19import unittest 20from test import support, mock_socket 21 22try: 23 import threading 24except ImportError: 25 threading = None 26 27HOST = support.HOST 28 29if sys.platform == 'darwin': 30 # select.poll returns a select.POLLHUP at the end of the tests 31 # on darwin, so just ignore it 32 def handle_expt(self): 33 pass 34 smtpd.SMTPChannel.handle_expt = handle_expt 35 36 37def server(evt, buf, serv): 38 serv.listen() 39 evt.set() 40 try: 41 conn, addr = serv.accept() 42 except socket.timeout: 43 pass 44 else: 45 n = 500 46 while buf and n > 0: 47 r, w, e = select.select([], [conn], []) 48 if w: 49 sent = conn.send(buf) 50 buf = buf[sent:] 51 52 n -= 1 53 54 conn.close() 55 finally: 56 serv.close() 57 evt.set() 58 59class GeneralTests(unittest.TestCase): 60 61 def setUp(self): 62 smtplib.socket = mock_socket 63 self.port = 25 64 65 def tearDown(self): 66 smtplib.socket = socket 67 68 # This method is no longer used but is retained for backward compatibility, 69 # so test to make sure it still works. 70 def testQuoteData(self): 71 teststr = "abc\n.jkl\rfoo\r\n..blue" 72 expected = "abc\r\n..jkl\r\nfoo\r\n...blue" 73 self.assertEqual(expected, smtplib.quotedata(teststr)) 74 75 def testBasic1(self): 76 mock_socket.reply_with(b"220 Hola mundo") 77 # connects 78 smtp = smtplib.SMTP(HOST, self.port) 79 smtp.close() 80 81 def testSourceAddress(self): 82 mock_socket.reply_with(b"220 Hola mundo") 83 # connects 84 smtp = smtplib.SMTP(HOST, self.port, 85 source_address=('127.0.0.1',19876)) 86 self.assertEqual(smtp.source_address, ('127.0.0.1', 19876)) 87 smtp.close() 88 89 def testBasic2(self): 90 mock_socket.reply_with(b"220 Hola mundo") 91 # connects, include port in host name 92 smtp = smtplib.SMTP("%s:%s" % (HOST, self.port)) 93 smtp.close() 94 95 def testLocalHostName(self): 96 mock_socket.reply_with(b"220 Hola mundo") 97 # check that supplied local_hostname is used 98 smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost") 99 self.assertEqual(smtp.local_hostname, "testhost") 100 smtp.close() 101 102 def testTimeoutDefault(self): 103 mock_socket.reply_with(b"220 Hola mundo") 104 self.assertIsNone(mock_socket.getdefaulttimeout()) 105 mock_socket.setdefaulttimeout(30) 106 self.assertEqual(mock_socket.getdefaulttimeout(), 30) 107 try: 108 smtp = smtplib.SMTP(HOST, self.port) 109 finally: 110 mock_socket.setdefaulttimeout(None) 111 self.assertEqual(smtp.sock.gettimeout(), 30) 112 smtp.close() 113 114 def testTimeoutNone(self): 115 mock_socket.reply_with(b"220 Hola mundo") 116 self.assertIsNone(socket.getdefaulttimeout()) 117 socket.setdefaulttimeout(30) 118 try: 119 smtp = smtplib.SMTP(HOST, self.port, timeout=None) 120 finally: 121 socket.setdefaulttimeout(None) 122 self.assertIsNone(smtp.sock.gettimeout()) 123 smtp.close() 124 125 def testTimeoutValue(self): 126 mock_socket.reply_with(b"220 Hola mundo") 127 smtp = smtplib.SMTP(HOST, self.port, timeout=30) 128 self.assertEqual(smtp.sock.gettimeout(), 30) 129 smtp.close() 130 131 def test_debuglevel(self): 132 mock_socket.reply_with(b"220 Hello world") 133 smtp = smtplib.SMTP() 134 smtp.set_debuglevel(1) 135 with support.captured_stderr() as stderr: 136 smtp.connect(HOST, self.port) 137 smtp.close() 138 expected = re.compile(r"^connect:", re.MULTILINE) 139 self.assertRegex(stderr.getvalue(), expected) 140 141 def test_debuglevel_2(self): 142 mock_socket.reply_with(b"220 Hello world") 143 smtp = smtplib.SMTP() 144 smtp.set_debuglevel(2) 145 with support.captured_stderr() as stderr: 146 smtp.connect(HOST, self.port) 147 smtp.close() 148 expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ", 149 re.MULTILINE) 150 self.assertRegex(stderr.getvalue(), expected) 151 152 153# Test server thread using the specified SMTP server class 154def debugging_server(serv, serv_evt, client_evt): 155 serv_evt.set() 156 157 try: 158 if hasattr(select, 'poll'): 159 poll_fun = asyncore.poll2 160 else: 161 poll_fun = asyncore.poll 162 163 n = 1000 164 while asyncore.socket_map and n > 0: 165 poll_fun(0.01, asyncore.socket_map) 166 167 # when the client conversation is finished, it will 168 # set client_evt, and it's then ok to kill the server 169 if client_evt.is_set(): 170 serv.close() 171 break 172 173 n -= 1 174 175 except socket.timeout: 176 pass 177 finally: 178 if not client_evt.is_set(): 179 # allow some time for the client to read the result 180 time.sleep(0.5) 181 serv.close() 182 asyncore.close_all() 183 serv_evt.set() 184 185MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n' 186MSG_END = '------------ END MESSAGE ------------\n' 187 188# NOTE: Some SMTP objects in the tests below are created with a non-default 189# local_hostname argument to the constructor, since (on some systems) the FQDN 190# lookup caused by the default local_hostname sometimes takes so long that the 191# test server times out, causing the test to fail. 192 193# Test behavior of smtpd.DebuggingServer 194@unittest.skipUnless(threading, 'Threading required for this test.') 195class DebuggingServerTests(unittest.TestCase): 196 197 maxDiff = None 198 199 def setUp(self): 200 self.real_getfqdn = socket.getfqdn 201 socket.getfqdn = mock_socket.getfqdn 202 # temporarily replace sys.stdout to capture DebuggingServer output 203 self.old_stdout = sys.stdout 204 self.output = io.StringIO() 205 sys.stdout = self.output 206 207 self.serv_evt = threading.Event() 208 self.client_evt = threading.Event() 209 # Capture SMTPChannel debug output 210 self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM 211 smtpd.DEBUGSTREAM = io.StringIO() 212 # Pick a random unused port by passing 0 for the port number 213 self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1), 214 decode_data=True) 215 # Keep a note of what port was assigned 216 self.port = self.serv.socket.getsockname()[1] 217 serv_args = (self.serv, self.serv_evt, self.client_evt) 218 self.thread = threading.Thread(target=debugging_server, args=serv_args) 219 self.thread.start() 220 221 # wait until server thread has assigned a port number 222 self.serv_evt.wait() 223 self.serv_evt.clear() 224 225 def tearDown(self): 226 socket.getfqdn = self.real_getfqdn 227 # indicate that the client is finished 228 self.client_evt.set() 229 # wait for the server thread to terminate 230 self.serv_evt.wait() 231 self.thread.join() 232 # restore sys.stdout 233 sys.stdout = self.old_stdout 234 # restore DEBUGSTREAM 235 smtpd.DEBUGSTREAM.close() 236 smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM 237 238 def testBasic(self): 239 # connect 240 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 241 smtp.quit() 242 243 def testSourceAddress(self): 244 # connect 245 port = support.find_unused_port() 246 try: 247 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 248 timeout=3, source_address=('127.0.0.1', port)) 249 self.assertEqual(smtp.source_address, ('127.0.0.1', port)) 250 self.assertEqual(smtp.local_hostname, 'localhost') 251 smtp.quit() 252 except OSError as e: 253 if e.errno == errno.EADDRINUSE: 254 self.skipTest("couldn't bind to port %d" % port) 255 raise 256 257 def testNOOP(self): 258 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 259 expected = (250, b'OK') 260 self.assertEqual(smtp.noop(), expected) 261 smtp.quit() 262 263 def testRSET(self): 264 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 265 expected = (250, b'OK') 266 self.assertEqual(smtp.rset(), expected) 267 smtp.quit() 268 269 def testELHO(self): 270 # EHLO isn't implemented in DebuggingServer 271 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 272 expected = (250, b'\nSIZE 33554432\nHELP') 273 self.assertEqual(smtp.ehlo(), expected) 274 smtp.quit() 275 276 def testEXPNNotImplemented(self): 277 # EXPN isn't implemented in DebuggingServer 278 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 279 expected = (502, b'EXPN not implemented') 280 smtp.putcmd('EXPN') 281 self.assertEqual(smtp.getreply(), expected) 282 smtp.quit() 283 284 def testVRFY(self): 285 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 286 expected = (252, b'Cannot VRFY user, but will accept message ' + \ 287 b'and attempt delivery') 288 self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected) 289 self.assertEqual(smtp.verify('nobody@nowhere.com'), expected) 290 smtp.quit() 291 292 def testSecondHELO(self): 293 # check that a second HELO returns a message that it's a duplicate 294 # (this behavior is specific to smtpd.SMTPChannel) 295 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 296 smtp.helo() 297 expected = (503, b'Duplicate HELO/EHLO') 298 self.assertEqual(smtp.helo(), expected) 299 smtp.quit() 300 301 def testHELP(self): 302 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 303 self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \ 304 b'RCPT DATA RSET NOOP QUIT VRFY') 305 smtp.quit() 306 307 def testSend(self): 308 # connect and send mail 309 m = 'A test message' 310 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 311 smtp.sendmail('John', 'Sally', m) 312 # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor 313 # in asyncore. This sleep might help, but should really be fixed 314 # properly by using an Event variable. 315 time.sleep(0.01) 316 smtp.quit() 317 318 self.client_evt.set() 319 self.serv_evt.wait() 320 self.output.flush() 321 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 322 self.assertEqual(self.output.getvalue(), mexpect) 323 324 def testSendBinary(self): 325 m = b'A test message' 326 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 327 smtp.sendmail('John', 'Sally', m) 328 # XXX (see comment in testSend) 329 time.sleep(0.01) 330 smtp.quit() 331 332 self.client_evt.set() 333 self.serv_evt.wait() 334 self.output.flush() 335 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END) 336 self.assertEqual(self.output.getvalue(), mexpect) 337 338 def testSendNeedingDotQuote(self): 339 # Issue 12283 340 m = '.A test\n.mes.sage.' 341 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 342 smtp.sendmail('John', 'Sally', m) 343 # XXX (see comment in testSend) 344 time.sleep(0.01) 345 smtp.quit() 346 347 self.client_evt.set() 348 self.serv_evt.wait() 349 self.output.flush() 350 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 351 self.assertEqual(self.output.getvalue(), mexpect) 352 353 def testSendNullSender(self): 354 m = 'A test message' 355 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 356 smtp.sendmail('<>', 'Sally', m) 357 # XXX (see comment in testSend) 358 time.sleep(0.01) 359 smtp.quit() 360 361 self.client_evt.set() 362 self.serv_evt.wait() 363 self.output.flush() 364 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 365 self.assertEqual(self.output.getvalue(), mexpect) 366 debugout = smtpd.DEBUGSTREAM.getvalue() 367 sender = re.compile("^sender: <>$", re.MULTILINE) 368 self.assertRegex(debugout, sender) 369 370 def testSendMessage(self): 371 m = email.mime.text.MIMEText('A test message') 372 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 373 smtp.send_message(m, from_addr='John', to_addrs='Sally') 374 # XXX (see comment in testSend) 375 time.sleep(0.01) 376 smtp.quit() 377 378 self.client_evt.set() 379 self.serv_evt.wait() 380 self.output.flush() 381 # Add the X-Peer header that DebuggingServer adds 382 m['X-Peer'] = socket.gethostbyname('localhost') 383 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 384 self.assertEqual(self.output.getvalue(), mexpect) 385 386 def testSendMessageWithAddresses(self): 387 m = email.mime.text.MIMEText('A test message') 388 m['From'] = 'foo@bar.com' 389 m['To'] = 'John' 390 m['CC'] = 'Sally, Fred' 391 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 392 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 393 smtp.send_message(m) 394 # XXX (see comment in testSend) 395 time.sleep(0.01) 396 smtp.quit() 397 # make sure the Bcc header is still in the message. 398 self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" ' 399 '<warped@silly.walks.com>') 400 401 self.client_evt.set() 402 self.serv_evt.wait() 403 self.output.flush() 404 # Add the X-Peer header that DebuggingServer adds 405 m['X-Peer'] = socket.gethostbyname('localhost') 406 # The Bcc header should not be transmitted. 407 del m['Bcc'] 408 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 409 self.assertEqual(self.output.getvalue(), mexpect) 410 debugout = smtpd.DEBUGSTREAM.getvalue() 411 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 412 self.assertRegex(debugout, sender) 413 for addr in ('John', 'Sally', 'Fred', 'root@localhost', 414 'warped@silly.walks.com'): 415 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 416 re.MULTILINE) 417 self.assertRegex(debugout, to_addr) 418 419 def testSendMessageWithSomeAddresses(self): 420 # Make sure nothing breaks if not all of the three 'to' headers exist 421 m = email.mime.text.MIMEText('A test message') 422 m['From'] = 'foo@bar.com' 423 m['To'] = 'John, Dinsdale' 424 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 425 smtp.send_message(m) 426 # XXX (see comment in testSend) 427 time.sleep(0.01) 428 smtp.quit() 429 430 self.client_evt.set() 431 self.serv_evt.wait() 432 self.output.flush() 433 # Add the X-Peer header that DebuggingServer adds 434 m['X-Peer'] = socket.gethostbyname('localhost') 435 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 436 self.assertEqual(self.output.getvalue(), mexpect) 437 debugout = smtpd.DEBUGSTREAM.getvalue() 438 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 439 self.assertRegex(debugout, sender) 440 for addr in ('John', 'Dinsdale'): 441 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 442 re.MULTILINE) 443 self.assertRegex(debugout, to_addr) 444 445 def testSendMessageWithSpecifiedAddresses(self): 446 # Make sure addresses specified in call override those in message. 447 m = email.mime.text.MIMEText('A test message') 448 m['From'] = 'foo@bar.com' 449 m['To'] = 'John, Dinsdale' 450 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 451 smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net') 452 # XXX (see comment in testSend) 453 time.sleep(0.01) 454 smtp.quit() 455 456 self.client_evt.set() 457 self.serv_evt.wait() 458 self.output.flush() 459 # Add the X-Peer header that DebuggingServer adds 460 m['X-Peer'] = socket.gethostbyname('localhost') 461 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 462 self.assertEqual(self.output.getvalue(), mexpect) 463 debugout = smtpd.DEBUGSTREAM.getvalue() 464 sender = re.compile("^sender: joe@example.com$", re.MULTILINE) 465 self.assertRegex(debugout, sender) 466 for addr in ('John', 'Dinsdale'): 467 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 468 re.MULTILINE) 469 self.assertNotRegex(debugout, to_addr) 470 recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE) 471 self.assertRegex(debugout, recip) 472 473 def testSendMessageWithMultipleFrom(self): 474 # Sender overrides To 475 m = email.mime.text.MIMEText('A test message') 476 m['From'] = 'Bernard, Bianca' 477 m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com' 478 m['To'] = 'John, Dinsdale' 479 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 480 smtp.send_message(m) 481 # XXX (see comment in testSend) 482 time.sleep(0.01) 483 smtp.quit() 484 485 self.client_evt.set() 486 self.serv_evt.wait() 487 self.output.flush() 488 # Add the X-Peer header that DebuggingServer adds 489 m['X-Peer'] = socket.gethostbyname('localhost') 490 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 491 self.assertEqual(self.output.getvalue(), mexpect) 492 debugout = smtpd.DEBUGSTREAM.getvalue() 493 sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE) 494 self.assertRegex(debugout, sender) 495 for addr in ('John', 'Dinsdale'): 496 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 497 re.MULTILINE) 498 self.assertRegex(debugout, to_addr) 499 500 def testSendMessageResent(self): 501 m = email.mime.text.MIMEText('A test message') 502 m['From'] = 'foo@bar.com' 503 m['To'] = 'John' 504 m['CC'] = 'Sally, Fred' 505 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 506 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 507 m['Resent-From'] = 'holy@grail.net' 508 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 509 m['Resent-Bcc'] = 'doe@losthope.net' 510 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 511 smtp.send_message(m) 512 # XXX (see comment in testSend) 513 time.sleep(0.01) 514 smtp.quit() 515 516 self.client_evt.set() 517 self.serv_evt.wait() 518 self.output.flush() 519 # The Resent-Bcc headers are deleted before serialization. 520 del m['Bcc'] 521 del m['Resent-Bcc'] 522 # Add the X-Peer header that DebuggingServer adds 523 m['X-Peer'] = socket.gethostbyname('localhost') 524 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 525 self.assertEqual(self.output.getvalue(), mexpect) 526 debugout = smtpd.DEBUGSTREAM.getvalue() 527 sender = re.compile("^sender: holy@grail.net$", re.MULTILINE) 528 self.assertRegex(debugout, sender) 529 for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'): 530 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 531 re.MULTILINE) 532 self.assertRegex(debugout, to_addr) 533 534 def testSendMessageMultipleResentRaises(self): 535 m = email.mime.text.MIMEText('A test message') 536 m['From'] = 'foo@bar.com' 537 m['To'] = 'John' 538 m['CC'] = 'Sally, Fred' 539 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 540 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 541 m['Resent-From'] = 'holy@grail.net' 542 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 543 m['Resent-Bcc'] = 'doe@losthope.net' 544 m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000' 545 m['Resent-To'] = 'holy@grail.net' 546 m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff' 547 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 548 with self.assertRaises(ValueError): 549 smtp.send_message(m) 550 smtp.close() 551 552class NonConnectingTests(unittest.TestCase): 553 554 def testNotConnected(self): 555 # Test various operations on an unconnected SMTP object that 556 # should raise exceptions (at present the attempt in SMTP.send 557 # to reference the nonexistent 'sock' attribute of the SMTP object 558 # causes an AttributeError) 559 smtp = smtplib.SMTP() 560 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo) 561 self.assertRaises(smtplib.SMTPServerDisconnected, 562 smtp.send, 'test msg') 563 564 def testNonnumericPort(self): 565 # check that non-numeric port raises OSError 566 self.assertRaises(OSError, smtplib.SMTP, 567 "localhost", "bogus") 568 self.assertRaises(OSError, smtplib.SMTP, 569 "localhost:bogus") 570 571 572# test response of client to a non-successful HELO message 573@unittest.skipUnless(threading, 'Threading required for this test.') 574class BadHELOServerTests(unittest.TestCase): 575 576 def setUp(self): 577 smtplib.socket = mock_socket 578 mock_socket.reply_with(b"199 no hello for you!") 579 self.old_stdout = sys.stdout 580 self.output = io.StringIO() 581 sys.stdout = self.output 582 self.port = 25 583 584 def tearDown(self): 585 smtplib.socket = socket 586 sys.stdout = self.old_stdout 587 588 def testFailingHELO(self): 589 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, 590 HOST, self.port, 'localhost', 3) 591 592 593@unittest.skipUnless(threading, 'Threading required for this test.') 594class TooLongLineTests(unittest.TestCase): 595 respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n' 596 597 def setUp(self): 598 self.old_stdout = sys.stdout 599 self.output = io.StringIO() 600 sys.stdout = self.output 601 602 self.evt = threading.Event() 603 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 604 self.sock.settimeout(15) 605 self.port = support.bind_port(self.sock) 606 servargs = (self.evt, self.respdata, self.sock) 607 threading.Thread(target=server, args=servargs).start() 608 self.evt.wait() 609 self.evt.clear() 610 611 def tearDown(self): 612 self.evt.wait() 613 sys.stdout = self.old_stdout 614 615 def testLineTooLong(self): 616 self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP, 617 HOST, self.port, 'localhost', 3) 618 619 620sim_users = {'Mr.A@somewhere.com':'John A', 621 'Ms.B@xn--fo-fka.com':'Sally B', 622 'Mrs.C@somewhereesle.com':'Ruth C', 623 } 624 625sim_auth = ('Mr.A@somewhere.com', 'somepassword') 626sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn' 627 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=') 628sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'], 629 'list-2':['Ms.B@xn--fo-fka.com',], 630 } 631 632# Simulated SMTP channel & server 633class ResponseException(Exception): pass 634class SimSMTPChannel(smtpd.SMTPChannel): 635 636 quit_response = None 637 mail_response = None 638 rcpt_response = None 639 data_response = None 640 rcpt_count = 0 641 rset_count = 0 642 disconnect = 0 643 AUTH = 99 # Add protocol state to enable auth testing. 644 authenticated_user = None 645 646 def __init__(self, extra_features, *args, **kw): 647 self._extrafeatures = ''.join( 648 [ "250-{0}\r\n".format(x) for x in extra_features ]) 649 super(SimSMTPChannel, self).__init__(*args, **kw) 650 651 # AUTH related stuff. It would be nice if support for this were in smtpd. 652 def found_terminator(self): 653 if self.smtp_state == self.AUTH: 654 line = self._emptystring.join(self.received_lines) 655 print('Data:', repr(line), file=smtpd.DEBUGSTREAM) 656 self.received_lines = [] 657 try: 658 self.auth_object(line) 659 except ResponseException as e: 660 self.smtp_state = self.COMMAND 661 self.push('%s %s' % (e.smtp_code, e.smtp_error)) 662 return 663 super().found_terminator() 664 665 666 def smtp_AUTH(self, arg): 667 if not self.seen_greeting: 668 self.push('503 Error: send EHLO first') 669 return 670 if not self.extended_smtp or 'AUTH' not in self._extrafeatures: 671 self.push('500 Error: command "AUTH" not recognized') 672 return 673 if self.authenticated_user is not None: 674 self.push( 675 '503 Bad sequence of commands: already authenticated') 676 return 677 args = arg.split() 678 if len(args) not in [1, 2]: 679 self.push('501 Syntax: AUTH <mechanism> [initial-response]') 680 return 681 auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_') 682 try: 683 self.auth_object = getattr(self, auth_object_name) 684 except AttributeError: 685 self.push('504 Command parameter not implemented: unsupported ' 686 ' authentication mechanism {!r}'.format(auth_object_name)) 687 return 688 self.smtp_state = self.AUTH 689 self.auth_object(args[1] if len(args) == 2 else None) 690 691 def _authenticated(self, user, valid): 692 if valid: 693 self.authenticated_user = user 694 self.push('235 Authentication Succeeded') 695 else: 696 self.push('535 Authentication credentials invalid') 697 self.smtp_state = self.COMMAND 698 699 def _decode_base64(self, string): 700 return base64.decodebytes(string.encode('ascii')).decode('utf-8') 701 702 def _auth_plain(self, arg=None): 703 if arg is None: 704 self.push('334 ') 705 else: 706 logpass = self._decode_base64(arg) 707 try: 708 *_, user, password = logpass.split('\0') 709 except ValueError as e: 710 self.push('535 Splitting response {!r} into user and password' 711 ' failed: {}'.format(logpass, e)) 712 return 713 self._authenticated(user, password == sim_auth[1]) 714 715 def _auth_login(self, arg=None): 716 if arg is None: 717 # base64 encoded 'Username:' 718 self.push('334 VXNlcm5hbWU6') 719 elif not hasattr(self, '_auth_login_user'): 720 self._auth_login_user = self._decode_base64(arg) 721 # base64 encoded 'Password:' 722 self.push('334 UGFzc3dvcmQ6') 723 else: 724 password = self._decode_base64(arg) 725 self._authenticated(self._auth_login_user, password == sim_auth[1]) 726 del self._auth_login_user 727 728 def _auth_cram_md5(self, arg=None): 729 if arg is None: 730 self.push('334 {}'.format(sim_cram_md5_challenge)) 731 else: 732 logpass = self._decode_base64(arg) 733 try: 734 user, hashed_pass = logpass.split() 735 except ValueError as e: 736 self.push('535 Splitting response {!r} into user and password' 737 'failed: {}'.format(logpass, e)) 738 return False 739 valid_hashed_pass = hmac.HMAC( 740 sim_auth[1].encode('ascii'), 741 self._decode_base64(sim_cram_md5_challenge).encode('ascii'), 742 'md5').hexdigest() 743 self._authenticated(user, hashed_pass == valid_hashed_pass) 744 # end AUTH related stuff. 745 746 def smtp_EHLO(self, arg): 747 resp = ('250-testhost\r\n' 748 '250-EXPN\r\n' 749 '250-SIZE 20000000\r\n' 750 '250-STARTTLS\r\n' 751 '250-DELIVERBY\r\n') 752 resp = resp + self._extrafeatures + '250 HELP' 753 self.push(resp) 754 self.seen_greeting = arg 755 self.extended_smtp = True 756 757 def smtp_VRFY(self, arg): 758 # For max compatibility smtplib should be sending the raw address. 759 if arg in sim_users: 760 self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg))) 761 else: 762 self.push('550 No such user: %s' % arg) 763 764 def smtp_EXPN(self, arg): 765 list_name = arg.lower() 766 if list_name in sim_lists: 767 user_list = sim_lists[list_name] 768 for n, user_email in enumerate(user_list): 769 quoted_addr = smtplib.quoteaddr(user_email) 770 if n < len(user_list) - 1: 771 self.push('250-%s %s' % (sim_users[user_email], quoted_addr)) 772 else: 773 self.push('250 %s %s' % (sim_users[user_email], quoted_addr)) 774 else: 775 self.push('550 No access for you!') 776 777 def smtp_QUIT(self, arg): 778 if self.quit_response is None: 779 super(SimSMTPChannel, self).smtp_QUIT(arg) 780 else: 781 self.push(self.quit_response) 782 self.close_when_done() 783 784 def smtp_MAIL(self, arg): 785 if self.mail_response is None: 786 super().smtp_MAIL(arg) 787 else: 788 self.push(self.mail_response) 789 if self.disconnect: 790 self.close_when_done() 791 792 def smtp_RCPT(self, arg): 793 if self.rcpt_response is None: 794 super().smtp_RCPT(arg) 795 return 796 self.rcpt_count += 1 797 self.push(self.rcpt_response[self.rcpt_count-1]) 798 799 def smtp_RSET(self, arg): 800 self.rset_count += 1 801 super().smtp_RSET(arg) 802 803 def smtp_DATA(self, arg): 804 if self.data_response is None: 805 super().smtp_DATA(arg) 806 else: 807 self.push(self.data_response) 808 809 def handle_error(self): 810 raise 811 812 813class SimSMTPServer(smtpd.SMTPServer): 814 815 channel_class = SimSMTPChannel 816 817 def __init__(self, *args, **kw): 818 self._extra_features = [] 819 smtpd.SMTPServer.__init__(self, *args, **kw) 820 821 def handle_accepted(self, conn, addr): 822 self._SMTPchannel = self.channel_class( 823 self._extra_features, self, conn, addr, 824 decode_data=self._decode_data) 825 826 def process_message(self, peer, mailfrom, rcpttos, data): 827 pass 828 829 def add_feature(self, feature): 830 self._extra_features.append(feature) 831 832 def handle_error(self): 833 raise 834 835 836# Test various SMTP & ESMTP commands/behaviors that require a simulated server 837# (i.e., something with more features than DebuggingServer) 838@unittest.skipUnless(threading, 'Threading required for this test.') 839class SMTPSimTests(unittest.TestCase): 840 841 def setUp(self): 842 self.real_getfqdn = socket.getfqdn 843 socket.getfqdn = mock_socket.getfqdn 844 self.serv_evt = threading.Event() 845 self.client_evt = threading.Event() 846 # Pick a random unused port by passing 0 for the port number 847 self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True) 848 # Keep a note of what port was assigned 849 self.port = self.serv.socket.getsockname()[1] 850 serv_args = (self.serv, self.serv_evt, self.client_evt) 851 self.thread = threading.Thread(target=debugging_server, args=serv_args) 852 self.thread.start() 853 854 # wait until server thread has assigned a port number 855 self.serv_evt.wait() 856 self.serv_evt.clear() 857 858 def tearDown(self): 859 socket.getfqdn = self.real_getfqdn 860 # indicate that the client is finished 861 self.client_evt.set() 862 # wait for the server thread to terminate 863 self.serv_evt.wait() 864 self.thread.join() 865 866 def testBasic(self): 867 # smoke test 868 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 869 smtp.quit() 870 871 def testEHLO(self): 872 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 873 874 # no features should be present before the EHLO 875 self.assertEqual(smtp.esmtp_features, {}) 876 877 # features expected from the test server 878 expected_features = {'expn':'', 879 'size': '20000000', 880 'starttls': '', 881 'deliverby': '', 882 'help': '', 883 } 884 885 smtp.ehlo() 886 self.assertEqual(smtp.esmtp_features, expected_features) 887 for k in expected_features: 888 self.assertTrue(smtp.has_extn(k)) 889 self.assertFalse(smtp.has_extn('unsupported-feature')) 890 smtp.quit() 891 892 def testVRFY(self): 893 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 894 895 for addr_spec, name in sim_users.items(): 896 expected_known = (250, bytes('%s %s' % 897 (name, smtplib.quoteaddr(addr_spec)), 898 "ascii")) 899 self.assertEqual(smtp.vrfy(addr_spec), expected_known) 900 901 u = 'nobody@nowhere.com' 902 expected_unknown = (550, ('No such user: %s' % u).encode('ascii')) 903 self.assertEqual(smtp.vrfy(u), expected_unknown) 904 smtp.quit() 905 906 def testEXPN(self): 907 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 908 909 for listname, members in sim_lists.items(): 910 users = [] 911 for m in members: 912 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m))) 913 expected_known = (250, bytes('\n'.join(users), "ascii")) 914 self.assertEqual(smtp.expn(listname), expected_known) 915 916 u = 'PSU-Members-List' 917 expected_unknown = (550, b'No access for you!') 918 self.assertEqual(smtp.expn(u), expected_unknown) 919 smtp.quit() 920 921 def testAUTH_PLAIN(self): 922 self.serv.add_feature("AUTH PLAIN") 923 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 924 resp = smtp.login(sim_auth[0], sim_auth[1]) 925 self.assertEqual(resp, (235, b'Authentication Succeeded')) 926 smtp.close() 927 928 def testAUTH_LOGIN(self): 929 self.serv.add_feature("AUTH LOGIN") 930 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 931 resp = smtp.login(sim_auth[0], sim_auth[1]) 932 self.assertEqual(resp, (235, b'Authentication Succeeded')) 933 smtp.close() 934 935 def testAUTH_CRAM_MD5(self): 936 self.serv.add_feature("AUTH CRAM-MD5") 937 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 938 resp = smtp.login(sim_auth[0], sim_auth[1]) 939 self.assertEqual(resp, (235, b'Authentication Succeeded')) 940 smtp.close() 941 942 def testAUTH_multiple(self): 943 # Test that multiple authentication methods are tried. 944 self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5") 945 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 946 resp = smtp.login(sim_auth[0], sim_auth[1]) 947 self.assertEqual(resp, (235, b'Authentication Succeeded')) 948 smtp.close() 949 950 def test_auth_function(self): 951 supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'} 952 for mechanism in supported: 953 self.serv.add_feature("AUTH {}".format(mechanism)) 954 for mechanism in supported: 955 with self.subTest(mechanism=mechanism): 956 smtp = smtplib.SMTP(HOST, self.port, 957 local_hostname='localhost', timeout=15) 958 smtp.ehlo('foo') 959 smtp.user, smtp.password = sim_auth[0], sim_auth[1] 960 method = 'auth_' + mechanism.lower().replace('-', '_') 961 resp = smtp.auth(mechanism, getattr(smtp, method)) 962 self.assertEqual(resp, (235, b'Authentication Succeeded')) 963 smtp.close() 964 965 def test_quit_resets_greeting(self): 966 smtp = smtplib.SMTP(HOST, self.port, 967 local_hostname='localhost', 968 timeout=15) 969 code, message = smtp.ehlo() 970 self.assertEqual(code, 250) 971 self.assertIn('size', smtp.esmtp_features) 972 smtp.quit() 973 self.assertNotIn('size', smtp.esmtp_features) 974 smtp.connect(HOST, self.port) 975 self.assertNotIn('size', smtp.esmtp_features) 976 smtp.ehlo_or_helo_if_needed() 977 self.assertIn('size', smtp.esmtp_features) 978 smtp.quit() 979 980 def test_with_statement(self): 981 with smtplib.SMTP(HOST, self.port) as smtp: 982 code, message = smtp.noop() 983 self.assertEqual(code, 250) 984 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 985 with smtplib.SMTP(HOST, self.port) as smtp: 986 smtp.close() 987 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 988 989 def test_with_statement_QUIT_failure(self): 990 with self.assertRaises(smtplib.SMTPResponseException) as error: 991 with smtplib.SMTP(HOST, self.port) as smtp: 992 smtp.noop() 993 self.serv._SMTPchannel.quit_response = '421 QUIT FAILED' 994 self.assertEqual(error.exception.smtp_code, 421) 995 self.assertEqual(error.exception.smtp_error, b'QUIT FAILED') 996 997 #TODO: add tests for correct AUTH method fallback now that the 998 #test infrastructure can support it. 999 1000 # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception 1001 def test__rest_from_mail_cmd(self): 1002 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1003 smtp.noop() 1004 self.serv._SMTPchannel.mail_response = '451 Requested action aborted' 1005 self.serv._SMTPchannel.disconnect = True 1006 with self.assertRaises(smtplib.SMTPSenderRefused): 1007 smtp.sendmail('John', 'Sally', 'test message') 1008 self.assertIsNone(smtp.sock) 1009 1010 # Issue 5713: make sure close, not rset, is called if we get a 421 error 1011 def test_421_from_mail_cmd(self): 1012 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1013 smtp.noop() 1014 self.serv._SMTPchannel.mail_response = '421 closing connection' 1015 with self.assertRaises(smtplib.SMTPSenderRefused): 1016 smtp.sendmail('John', 'Sally', 'test message') 1017 self.assertIsNone(smtp.sock) 1018 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1019 1020 def test_421_from_rcpt_cmd(self): 1021 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1022 smtp.noop() 1023 self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing'] 1024 with self.assertRaises(smtplib.SMTPRecipientsRefused) as r: 1025 smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message') 1026 self.assertIsNone(smtp.sock) 1027 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1028 self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')}) 1029 1030 def test_421_from_data_cmd(self): 1031 class MySimSMTPChannel(SimSMTPChannel): 1032 def found_terminator(self): 1033 if self.smtp_state == self.DATA: 1034 self.push('421 closing') 1035 else: 1036 super().found_terminator() 1037 self.serv.channel_class = MySimSMTPChannel 1038 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1039 smtp.noop() 1040 with self.assertRaises(smtplib.SMTPDataError): 1041 smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message') 1042 self.assertIsNone(smtp.sock) 1043 self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0) 1044 1045 def test_smtputf8_NotSupportedError_if_no_server_support(self): 1046 smtp = smtplib.SMTP( 1047 HOST, self.port, local_hostname='localhost', timeout=3) 1048 self.addCleanup(smtp.close) 1049 smtp.ehlo() 1050 self.assertTrue(smtp.does_esmtp) 1051 self.assertFalse(smtp.has_extn('smtputf8')) 1052 self.assertRaises( 1053 smtplib.SMTPNotSupportedError, 1054 smtp.sendmail, 1055 'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1056 self.assertRaises( 1057 smtplib.SMTPNotSupportedError, 1058 smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8']) 1059 1060 def test_send_unicode_without_SMTPUTF8(self): 1061 smtp = smtplib.SMTP( 1062 HOST, self.port, local_hostname='localhost', timeout=3) 1063 self.addCleanup(smtp.close) 1064 self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '') 1065 self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice') 1066 1067 1068class SimSMTPUTF8Server(SimSMTPServer): 1069 1070 def __init__(self, *args, **kw): 1071 # The base SMTP server turns these on automatically, but our test 1072 # server is set up to munge the EHLO response, so we need to provide 1073 # them as well. And yes, the call is to SMTPServer not SimSMTPServer. 1074 self._extra_features = ['SMTPUTF8', '8BITMIME'] 1075 smtpd.SMTPServer.__init__(self, *args, **kw) 1076 1077 def handle_accepted(self, conn, addr): 1078 self._SMTPchannel = self.channel_class( 1079 self._extra_features, self, conn, addr, 1080 decode_data=self._decode_data, 1081 enable_SMTPUTF8=self.enable_SMTPUTF8, 1082 ) 1083 1084 def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None, 1085 rcpt_options=None): 1086 self.last_peer = peer 1087 self.last_mailfrom = mailfrom 1088 self.last_rcpttos = rcpttos 1089 self.last_message = data 1090 self.last_mail_options = mail_options 1091 self.last_rcpt_options = rcpt_options 1092 1093 1094@unittest.skipUnless(threading, 'Threading required for this test.') 1095class SMTPUTF8SimTests(unittest.TestCase): 1096 1097 maxDiff = None 1098 1099 def setUp(self): 1100 self.real_getfqdn = socket.getfqdn 1101 socket.getfqdn = mock_socket.getfqdn 1102 self.serv_evt = threading.Event() 1103 self.client_evt = threading.Event() 1104 # Pick a random unused port by passing 0 for the port number 1105 self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1), 1106 decode_data=False, 1107 enable_SMTPUTF8=True) 1108 # Keep a note of what port was assigned 1109 self.port = self.serv.socket.getsockname()[1] 1110 serv_args = (self.serv, self.serv_evt, self.client_evt) 1111 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1112 self.thread.start() 1113 1114 # wait until server thread has assigned a port number 1115 self.serv_evt.wait() 1116 self.serv_evt.clear() 1117 1118 def tearDown(self): 1119 socket.getfqdn = self.real_getfqdn 1120 # indicate that the client is finished 1121 self.client_evt.set() 1122 # wait for the server thread to terminate 1123 self.serv_evt.wait() 1124 self.thread.join() 1125 1126 def test_test_server_supports_extensions(self): 1127 smtp = smtplib.SMTP( 1128 HOST, self.port, local_hostname='localhost', timeout=3) 1129 self.addCleanup(smtp.close) 1130 smtp.ehlo() 1131 self.assertTrue(smtp.does_esmtp) 1132 self.assertTrue(smtp.has_extn('smtputf8')) 1133 1134 def test_send_unicode_with_SMTPUTF8_via_sendmail(self): 1135 m = '¡a test message containing unicode!'.encode('utf-8') 1136 smtp = smtplib.SMTP( 1137 HOST, self.port, local_hostname='localhost', timeout=3) 1138 self.addCleanup(smtp.close) 1139 smtp.sendmail('Jőhn', 'Sálly', m, 1140 mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1141 self.assertEqual(self.serv.last_mailfrom, 'Jőhn') 1142 self.assertEqual(self.serv.last_rcpttos, ['Sálly']) 1143 self.assertEqual(self.serv.last_message, m) 1144 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1145 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1146 self.assertEqual(self.serv.last_rcpt_options, []) 1147 1148 def test_send_unicode_with_SMTPUTF8_via_low_level_API(self): 1149 m = '¡a test message containing unicode!'.encode('utf-8') 1150 smtp = smtplib.SMTP( 1151 HOST, self.port, local_hostname='localhost', timeout=3) 1152 self.addCleanup(smtp.close) 1153 smtp.ehlo() 1154 self.assertEqual( 1155 smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']), 1156 (250, b'OK')) 1157 self.assertEqual(smtp.rcpt('János'), (250, b'OK')) 1158 self.assertEqual(smtp.data(m), (250, b'OK')) 1159 self.assertEqual(self.serv.last_mailfrom, 'Jő') 1160 self.assertEqual(self.serv.last_rcpttos, ['János']) 1161 self.assertEqual(self.serv.last_message, m) 1162 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1163 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1164 self.assertEqual(self.serv.last_rcpt_options, []) 1165 1166 def test_send_message_uses_smtputf8_if_addrs_non_ascii(self): 1167 msg = EmailMessage() 1168 msg['From'] = "Páolo <főo@bar.com>" 1169 msg['To'] = 'Dinsdale' 1170 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1171 # XXX I don't know why I need two \n's here, but this is an existing 1172 # bug (if it is one) and not a problem with the new functionality. 1173 msg.set_content("oh là là, know what I mean, know what I mean?\n\n") 1174 # XXX smtpd converts received /r/n to /n, so we can't easily test that 1175 # we are successfully sending /r/n :(. 1176 expected = textwrap.dedent("""\ 1177 From: Páolo <főo@bar.com> 1178 To: Dinsdale 1179 Subject: Nudge nudge, wink, wink \u1F609 1180 Content-Type: text/plain; charset="utf-8" 1181 Content-Transfer-Encoding: 8bit 1182 MIME-Version: 1.0 1183 1184 oh là là, know what I mean, know what I mean? 1185 """) 1186 smtp = smtplib.SMTP( 1187 HOST, self.port, local_hostname='localhost', timeout=3) 1188 self.addCleanup(smtp.close) 1189 self.assertEqual(smtp.send_message(msg), {}) 1190 self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com') 1191 self.assertEqual(self.serv.last_rcpttos, ['Dinsdale']) 1192 self.assertEqual(self.serv.last_message.decode(), expected) 1193 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1194 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1195 self.assertEqual(self.serv.last_rcpt_options, []) 1196 1197 def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self): 1198 msg = EmailMessage() 1199 msg['From'] = "Páolo <főo@bar.com>" 1200 msg['To'] = 'Dinsdale' 1201 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1202 smtp = smtplib.SMTP( 1203 HOST, self.port, local_hostname='localhost', timeout=3) 1204 self.addCleanup(smtp.close) 1205 self.assertRaises(smtplib.SMTPNotSupportedError, 1206 smtp.send_message(msg)) 1207 1208 1209EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='') 1210 1211class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel): 1212 def smtp_AUTH(self, arg): 1213 # RFC 4954's AUTH command allows for an optional initial-response. 1214 # Not all AUTH methods support this; some require a challenge. AUTH 1215 # PLAIN does those, so test that here. See issue #15014. 1216 args = arg.split() 1217 if args[0].lower() == 'plain': 1218 if len(args) == 2: 1219 # AUTH PLAIN <initial-response> with the response base 64 1220 # encoded. Hard code the expected response for the test. 1221 if args[1] == EXPECTED_RESPONSE: 1222 self.push('235 Ok') 1223 return 1224 self.push('571 Bad authentication') 1225 1226class SimSMTPAUTHInitialResponseServer(SimSMTPServer): 1227 channel_class = SimSMTPAUTHInitialResponseChannel 1228 1229 1230@unittest.skipUnless(threading, 'Threading required for this test.') 1231class SMTPAUTHInitialResponseSimTests(unittest.TestCase): 1232 def setUp(self): 1233 self.real_getfqdn = socket.getfqdn 1234 socket.getfqdn = mock_socket.getfqdn 1235 self.serv_evt = threading.Event() 1236 self.client_evt = threading.Event() 1237 # Pick a random unused port by passing 0 for the port number 1238 self.serv = SimSMTPAUTHInitialResponseServer( 1239 (HOST, 0), ('nowhere', -1), decode_data=True) 1240 # Keep a note of what port was assigned 1241 self.port = self.serv.socket.getsockname()[1] 1242 serv_args = (self.serv, self.serv_evt, self.client_evt) 1243 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1244 self.thread.start() 1245 1246 # wait until server thread has assigned a port number 1247 self.serv_evt.wait() 1248 self.serv_evt.clear() 1249 1250 def tearDown(self): 1251 socket.getfqdn = self.real_getfqdn 1252 # indicate that the client is finished 1253 self.client_evt.set() 1254 # wait for the server thread to terminate 1255 self.serv_evt.wait() 1256 self.thread.join() 1257 1258 def testAUTH_PLAIN_initial_response_login(self): 1259 self.serv.add_feature('AUTH PLAIN') 1260 smtp = smtplib.SMTP(HOST, self.port, 1261 local_hostname='localhost', timeout=15) 1262 smtp.login('psu', 'doesnotexist') 1263 smtp.close() 1264 1265 def testAUTH_PLAIN_initial_response_auth(self): 1266 self.serv.add_feature('AUTH PLAIN') 1267 smtp = smtplib.SMTP(HOST, self.port, 1268 local_hostname='localhost', timeout=15) 1269 smtp.user = 'psu' 1270 smtp.password = 'doesnotexist' 1271 code, response = smtp.auth('plain', smtp.auth_plain) 1272 smtp.close() 1273 self.assertEqual(code, 235) 1274 1275 1276@support.reap_threads 1277def test_main(verbose=None): 1278 support.run_unittest( 1279 BadHELOServerTests, 1280 DebuggingServerTests, 1281 GeneralTests, 1282 NonConnectingTests, 1283 SMTPAUTHInitialResponseSimTests, 1284 SMTPSimTests, 1285 TooLongLineTests, 1286 ) 1287 1288 1289if __name__ == '__main__': 1290 test_main() 1291