1import base64 2import email.mime.text 3from email.message import EmailMessage 4from email.base64mime import body_encode as encode_base64 5import email.utils 6import hashlib 7import hmac 8import socket 9import smtplib 10import io 11import re 12import sys 13import time 14import select 15import errno 16import textwrap 17import threading 18 19import unittest 20from test import support, mock_socket 21from test.support import hashlib_helper 22from test.support import socket_helper 23from test.support import threading_helper 24from test.support import asyncore 25from test.support import smtpd 26from unittest.mock import Mock 27 28 29support.requires_working_socket(module=True) 30 31HOST = socket_helper.HOST 32 33if sys.platform == 'darwin': 34 # select.poll returns a select.POLLHUP at the end of the tests 35 # on darwin, so just ignore it 36 def handle_expt(self): 37 pass 38 smtpd.SMTPChannel.handle_expt = handle_expt 39 40 41def server(evt, buf, serv): 42 serv.listen() 43 evt.set() 44 try: 45 conn, addr = serv.accept() 46 except TimeoutError: 47 pass 48 else: 49 n = 500 50 while buf and n > 0: 51 r, w, e = select.select([], [conn], []) 52 if w: 53 sent = conn.send(buf) 54 buf = buf[sent:] 55 56 n -= 1 57 58 conn.close() 59 finally: 60 serv.close() 61 evt.set() 62 63class GeneralTests: 64 65 def setUp(self): 66 smtplib.socket = mock_socket 67 self.port = 25 68 69 def tearDown(self): 70 smtplib.socket = socket 71 72 # This method is no longer used but is retained for backward compatibility, 73 # so test to make sure it still works. 74 def testQuoteData(self): 75 teststr = "abc\n.jkl\rfoo\r\n..blue" 76 expected = "abc\r\n..jkl\r\nfoo\r\n...blue" 77 self.assertEqual(expected, smtplib.quotedata(teststr)) 78 79 def testBasic1(self): 80 mock_socket.reply_with(b"220 Hola mundo") 81 # connects 82 client = self.client(HOST, self.port) 83 client.close() 84 85 def testSourceAddress(self): 86 mock_socket.reply_with(b"220 Hola mundo") 87 # connects 88 client = self.client(HOST, self.port, 89 source_address=('127.0.0.1',19876)) 90 self.assertEqual(client.source_address, ('127.0.0.1', 19876)) 91 client.close() 92 93 def testBasic2(self): 94 mock_socket.reply_with(b"220 Hola mundo") 95 # connects, include port in host name 96 client = self.client("%s:%s" % (HOST, self.port)) 97 client.close() 98 99 def testLocalHostName(self): 100 mock_socket.reply_with(b"220 Hola mundo") 101 # check that supplied local_hostname is used 102 client = self.client(HOST, self.port, local_hostname="testhost") 103 self.assertEqual(client.local_hostname, "testhost") 104 client.close() 105 106 def testTimeoutDefault(self): 107 mock_socket.reply_with(b"220 Hola mundo") 108 self.assertIsNone(mock_socket.getdefaulttimeout()) 109 mock_socket.setdefaulttimeout(30) 110 self.assertEqual(mock_socket.getdefaulttimeout(), 30) 111 try: 112 client = self.client(HOST, self.port) 113 finally: 114 mock_socket.setdefaulttimeout(None) 115 self.assertEqual(client.sock.gettimeout(), 30) 116 client.close() 117 118 def testTimeoutNone(self): 119 mock_socket.reply_with(b"220 Hola mundo") 120 self.assertIsNone(socket.getdefaulttimeout()) 121 socket.setdefaulttimeout(30) 122 try: 123 client = self.client(HOST, self.port, timeout=None) 124 finally: 125 socket.setdefaulttimeout(None) 126 self.assertIsNone(client.sock.gettimeout()) 127 client.close() 128 129 def testTimeoutZero(self): 130 mock_socket.reply_with(b"220 Hola mundo") 131 with self.assertRaises(ValueError): 132 self.client(HOST, self.port, timeout=0) 133 134 def testTimeoutValue(self): 135 mock_socket.reply_with(b"220 Hola mundo") 136 client = self.client(HOST, self.port, timeout=30) 137 self.assertEqual(client.sock.gettimeout(), 30) 138 client.close() 139 140 def test_debuglevel(self): 141 mock_socket.reply_with(b"220 Hello world") 142 client = self.client() 143 client.set_debuglevel(1) 144 with support.captured_stderr() as stderr: 145 client.connect(HOST, self.port) 146 client.close() 147 expected = re.compile(r"^connect:", re.MULTILINE) 148 self.assertRegex(stderr.getvalue(), expected) 149 150 def test_debuglevel_2(self): 151 mock_socket.reply_with(b"220 Hello world") 152 client = self.client() 153 client.set_debuglevel(2) 154 with support.captured_stderr() as stderr: 155 client.connect(HOST, self.port) 156 client.close() 157 expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ", 158 re.MULTILINE) 159 self.assertRegex(stderr.getvalue(), expected) 160 161 162class SMTPGeneralTests(GeneralTests, unittest.TestCase): 163 164 client = smtplib.SMTP 165 166 167class LMTPGeneralTests(GeneralTests, unittest.TestCase): 168 169 client = smtplib.LMTP 170 171 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), "test requires Unix domain socket") 172 def testUnixDomainSocketTimeoutDefault(self): 173 local_host = '/some/local/lmtp/delivery/program' 174 mock_socket.reply_with(b"220 Hello world") 175 try: 176 client = self.client(local_host, self.port) 177 finally: 178 mock_socket.setdefaulttimeout(None) 179 self.assertIsNone(client.sock.gettimeout()) 180 client.close() 181 182 def testTimeoutZero(self): 183 super().testTimeoutZero() 184 local_host = '/some/local/lmtp/delivery/program' 185 with self.assertRaises(ValueError): 186 self.client(local_host, timeout=0) 187 188# Test server thread using the specified SMTP server class 189def debugging_server(serv, serv_evt, client_evt): 190 serv_evt.set() 191 192 try: 193 if hasattr(select, 'poll'): 194 poll_fun = asyncore.poll2 195 else: 196 poll_fun = asyncore.poll 197 198 n = 1000 199 while asyncore.socket_map and n > 0: 200 poll_fun(0.01, asyncore.socket_map) 201 202 # when the client conversation is finished, it will 203 # set client_evt, and it's then ok to kill the server 204 if client_evt.is_set(): 205 serv.close() 206 break 207 208 n -= 1 209 210 except TimeoutError: 211 pass 212 finally: 213 if not client_evt.is_set(): 214 # allow some time for the client to read the result 215 time.sleep(0.5) 216 serv.close() 217 asyncore.close_all() 218 serv_evt.set() 219 220MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n' 221MSG_END = '------------ END MESSAGE ------------\n' 222 223# NOTE: Some SMTP objects in the tests below are created with a non-default 224# local_hostname argument to the constructor, since (on some systems) the FQDN 225# lookup caused by the default local_hostname sometimes takes so long that the 226# test server times out, causing the test to fail. 227 228# Test behavior of smtpd.DebuggingServer 229class DebuggingServerTests(unittest.TestCase): 230 231 maxDiff = None 232 233 def setUp(self): 234 self.thread_key = threading_helper.threading_setup() 235 self.real_getfqdn = socket.getfqdn 236 socket.getfqdn = mock_socket.getfqdn 237 # temporarily replace sys.stdout to capture DebuggingServer output 238 self.old_stdout = sys.stdout 239 self.output = io.StringIO() 240 sys.stdout = self.output 241 242 self.serv_evt = threading.Event() 243 self.client_evt = threading.Event() 244 # Capture SMTPChannel debug output 245 self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM 246 smtpd.DEBUGSTREAM = io.StringIO() 247 # Pick a random unused port by passing 0 for the port number 248 self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1), 249 decode_data=True) 250 # Keep a note of what server host and port were assigned 251 self.host, self.port = self.serv.socket.getsockname()[:2] 252 serv_args = (self.serv, self.serv_evt, self.client_evt) 253 self.thread = threading.Thread(target=debugging_server, args=serv_args) 254 self.thread.start() 255 256 # wait until server thread has assigned a port number 257 self.serv_evt.wait() 258 self.serv_evt.clear() 259 260 def tearDown(self): 261 socket.getfqdn = self.real_getfqdn 262 # indicate that the client is finished 263 self.client_evt.set() 264 # wait for the server thread to terminate 265 self.serv_evt.wait() 266 threading_helper.join_thread(self.thread) 267 # restore sys.stdout 268 sys.stdout = self.old_stdout 269 # restore DEBUGSTREAM 270 smtpd.DEBUGSTREAM.close() 271 smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM 272 del self.thread 273 self.doCleanups() 274 threading_helper.threading_cleanup(*self.thread_key) 275 276 def get_output_without_xpeer(self): 277 test_output = self.output.getvalue() 278 return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2', 279 test_output, flags=re.MULTILINE|re.DOTALL) 280 281 def testBasic(self): 282 # connect 283 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 284 timeout=support.LOOPBACK_TIMEOUT) 285 smtp.quit() 286 287 def testSourceAddress(self): 288 # connect 289 src_port = socket_helper.find_unused_port() 290 try: 291 smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost', 292 timeout=support.LOOPBACK_TIMEOUT, 293 source_address=(self.host, src_port)) 294 self.addCleanup(smtp.close) 295 self.assertEqual(smtp.source_address, (self.host, src_port)) 296 self.assertEqual(smtp.local_hostname, 'localhost') 297 smtp.quit() 298 except OSError as e: 299 if e.errno == errno.EADDRINUSE: 300 self.skipTest("couldn't bind to source port %d" % src_port) 301 raise 302 303 def testNOOP(self): 304 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 305 timeout=support.LOOPBACK_TIMEOUT) 306 self.addCleanup(smtp.close) 307 expected = (250, b'OK') 308 self.assertEqual(smtp.noop(), expected) 309 smtp.quit() 310 311 def testRSET(self): 312 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 313 timeout=support.LOOPBACK_TIMEOUT) 314 self.addCleanup(smtp.close) 315 expected = (250, b'OK') 316 self.assertEqual(smtp.rset(), expected) 317 smtp.quit() 318 319 def testELHO(self): 320 # EHLO isn't implemented in DebuggingServer 321 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 322 timeout=support.LOOPBACK_TIMEOUT) 323 self.addCleanup(smtp.close) 324 expected = (250, b'\nSIZE 33554432\nHELP') 325 self.assertEqual(smtp.ehlo(), expected) 326 smtp.quit() 327 328 def testEXPNNotImplemented(self): 329 # EXPN isn't implemented in DebuggingServer 330 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 331 timeout=support.LOOPBACK_TIMEOUT) 332 self.addCleanup(smtp.close) 333 expected = (502, b'EXPN not implemented') 334 smtp.putcmd('EXPN') 335 self.assertEqual(smtp.getreply(), expected) 336 smtp.quit() 337 338 def test_issue43124_putcmd_escapes_newline(self): 339 # see: https://bugs.python.org/issue43124 340 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 341 timeout=support.LOOPBACK_TIMEOUT) 342 self.addCleanup(smtp.close) 343 with self.assertRaises(ValueError) as exc: 344 smtp.putcmd('helo\nX-INJECTED') 345 self.assertIn("prohibited newline characters", str(exc.exception)) 346 smtp.quit() 347 348 def testVRFY(self): 349 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 350 timeout=support.LOOPBACK_TIMEOUT) 351 self.addCleanup(smtp.close) 352 expected = (252, b'Cannot VRFY user, but will accept message ' + \ 353 b'and attempt delivery') 354 self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected) 355 self.assertEqual(smtp.verify('nobody@nowhere.com'), expected) 356 smtp.quit() 357 358 def testSecondHELO(self): 359 # check that a second HELO returns a message that it's a duplicate 360 # (this behavior is specific to smtpd.SMTPChannel) 361 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 362 timeout=support.LOOPBACK_TIMEOUT) 363 self.addCleanup(smtp.close) 364 smtp.helo() 365 expected = (503, b'Duplicate HELO/EHLO') 366 self.assertEqual(smtp.helo(), expected) 367 smtp.quit() 368 369 def testHELP(self): 370 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 371 timeout=support.LOOPBACK_TIMEOUT) 372 self.addCleanup(smtp.close) 373 self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \ 374 b'RCPT DATA RSET NOOP QUIT VRFY') 375 smtp.quit() 376 377 def testSend(self): 378 # connect and send mail 379 m = 'A test message' 380 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 381 timeout=support.LOOPBACK_TIMEOUT) 382 self.addCleanup(smtp.close) 383 smtp.sendmail('John', 'Sally', m) 384 # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor 385 # in asyncore. This sleep might help, but should really be fixed 386 # properly by using an Event variable. 387 time.sleep(0.01) 388 smtp.quit() 389 390 self.client_evt.set() 391 self.serv_evt.wait() 392 self.output.flush() 393 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 394 self.assertEqual(self.output.getvalue(), mexpect) 395 396 def testSendBinary(self): 397 m = b'A test message' 398 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 399 timeout=support.LOOPBACK_TIMEOUT) 400 self.addCleanup(smtp.close) 401 smtp.sendmail('John', 'Sally', m) 402 # XXX (see comment in testSend) 403 time.sleep(0.01) 404 smtp.quit() 405 406 self.client_evt.set() 407 self.serv_evt.wait() 408 self.output.flush() 409 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END) 410 self.assertEqual(self.output.getvalue(), mexpect) 411 412 def testSendNeedingDotQuote(self): 413 # Issue 12283 414 m = '.A test\n.mes.sage.' 415 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 416 timeout=support.LOOPBACK_TIMEOUT) 417 self.addCleanup(smtp.close) 418 smtp.sendmail('John', 'Sally', m) 419 # XXX (see comment in testSend) 420 time.sleep(0.01) 421 smtp.quit() 422 423 self.client_evt.set() 424 self.serv_evt.wait() 425 self.output.flush() 426 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 427 self.assertEqual(self.output.getvalue(), mexpect) 428 429 def test_issue43124_escape_localhostname(self): 430 # see: https://bugs.python.org/issue43124 431 # connect and send mail 432 m = 'wazzuuup\nlinetwo' 433 smtp = smtplib.SMTP(HOST, self.port, local_hostname='hi\nX-INJECTED', 434 timeout=support.LOOPBACK_TIMEOUT) 435 self.addCleanup(smtp.close) 436 with self.assertRaises(ValueError) as exc: 437 smtp.sendmail("hi@me.com", "you@me.com", m) 438 self.assertIn( 439 "prohibited newline characters: ehlo hi\\nX-INJECTED", 440 str(exc.exception), 441 ) 442 # XXX (see comment in testSend) 443 time.sleep(0.01) 444 smtp.quit() 445 446 debugout = smtpd.DEBUGSTREAM.getvalue() 447 self.assertNotIn("X-INJECTED", debugout) 448 449 def test_issue43124_escape_options(self): 450 # see: https://bugs.python.org/issue43124 451 # connect and send mail 452 m = 'wazzuuup\nlinetwo' 453 smtp = smtplib.SMTP( 454 HOST, self.port, local_hostname='localhost', 455 timeout=support.LOOPBACK_TIMEOUT) 456 457 self.addCleanup(smtp.close) 458 smtp.sendmail("hi@me.com", "you@me.com", m) 459 with self.assertRaises(ValueError) as exc: 460 smtp.mail("hi@me.com", ["X-OPTION\nX-INJECTED-1", "X-OPTION2\nX-INJECTED-2"]) 461 msg = str(exc.exception) 462 self.assertIn("prohibited newline characters", msg) 463 self.assertIn("X-OPTION\\nX-INJECTED-1 X-OPTION2\\nX-INJECTED-2", msg) 464 # XXX (see comment in testSend) 465 time.sleep(0.01) 466 smtp.quit() 467 468 debugout = smtpd.DEBUGSTREAM.getvalue() 469 self.assertNotIn("X-OPTION", debugout) 470 self.assertNotIn("X-OPTION2", debugout) 471 self.assertNotIn("X-INJECTED-1", debugout) 472 self.assertNotIn("X-INJECTED-2", debugout) 473 474 def testSendNullSender(self): 475 m = 'A test message' 476 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 477 timeout=support.LOOPBACK_TIMEOUT) 478 self.addCleanup(smtp.close) 479 smtp.sendmail('<>', 'Sally', m) 480 # XXX (see comment in testSend) 481 time.sleep(0.01) 482 smtp.quit() 483 484 self.client_evt.set() 485 self.serv_evt.wait() 486 self.output.flush() 487 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 488 self.assertEqual(self.output.getvalue(), mexpect) 489 debugout = smtpd.DEBUGSTREAM.getvalue() 490 sender = re.compile("^sender: <>$", re.MULTILINE) 491 self.assertRegex(debugout, sender) 492 493 def testSendMessage(self): 494 m = email.mime.text.MIMEText('A test message') 495 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 496 timeout=support.LOOPBACK_TIMEOUT) 497 self.addCleanup(smtp.close) 498 smtp.send_message(m, from_addr='John', to_addrs='Sally') 499 # XXX (see comment in testSend) 500 time.sleep(0.01) 501 smtp.quit() 502 503 self.client_evt.set() 504 self.serv_evt.wait() 505 self.output.flush() 506 # Remove the X-Peer header that DebuggingServer adds as figuring out 507 # exactly what IP address format is put there is not easy (and 508 # irrelevant to our test). Typically 127.0.0.1 or ::1, but it is 509 # not always the same as socket.gethostbyname(HOST). :( 510 test_output = self.get_output_without_xpeer() 511 del m['X-Peer'] 512 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 513 self.assertEqual(test_output, mexpect) 514 515 def testSendMessageWithAddresses(self): 516 m = email.mime.text.MIMEText('A test message') 517 m['From'] = 'foo@bar.com' 518 m['To'] = 'John' 519 m['CC'] = 'Sally, Fred' 520 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 521 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 522 timeout=support.LOOPBACK_TIMEOUT) 523 self.addCleanup(smtp.close) 524 smtp.send_message(m) 525 # XXX (see comment in testSend) 526 time.sleep(0.01) 527 smtp.quit() 528 # make sure the Bcc header is still in the message. 529 self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" ' 530 '<warped@silly.walks.com>') 531 532 self.client_evt.set() 533 self.serv_evt.wait() 534 self.output.flush() 535 # Remove the X-Peer header that DebuggingServer adds. 536 test_output = self.get_output_without_xpeer() 537 del m['X-Peer'] 538 # The Bcc header should not be transmitted. 539 del m['Bcc'] 540 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 541 self.assertEqual(test_output, mexpect) 542 debugout = smtpd.DEBUGSTREAM.getvalue() 543 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 544 self.assertRegex(debugout, sender) 545 for addr in ('John', 'Sally', 'Fred', 'root@localhost', 546 'warped@silly.walks.com'): 547 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 548 re.MULTILINE) 549 self.assertRegex(debugout, to_addr) 550 551 def testSendMessageWithSomeAddresses(self): 552 # Make sure nothing breaks if not all of the three 'to' headers exist 553 m = email.mime.text.MIMEText('A test message') 554 m['From'] = 'foo@bar.com' 555 m['To'] = 'John, Dinsdale' 556 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 557 timeout=support.LOOPBACK_TIMEOUT) 558 self.addCleanup(smtp.close) 559 smtp.send_message(m) 560 # XXX (see comment in testSend) 561 time.sleep(0.01) 562 smtp.quit() 563 564 self.client_evt.set() 565 self.serv_evt.wait() 566 self.output.flush() 567 # Remove the X-Peer header that DebuggingServer adds. 568 test_output = self.get_output_without_xpeer() 569 del m['X-Peer'] 570 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 571 self.assertEqual(test_output, mexpect) 572 debugout = smtpd.DEBUGSTREAM.getvalue() 573 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 574 self.assertRegex(debugout, sender) 575 for addr in ('John', 'Dinsdale'): 576 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 577 re.MULTILINE) 578 self.assertRegex(debugout, to_addr) 579 580 def testSendMessageWithSpecifiedAddresses(self): 581 # Make sure addresses specified in call override those in message. 582 m = email.mime.text.MIMEText('A test message') 583 m['From'] = 'foo@bar.com' 584 m['To'] = 'John, Dinsdale' 585 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 586 timeout=support.LOOPBACK_TIMEOUT) 587 self.addCleanup(smtp.close) 588 smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net') 589 # XXX (see comment in testSend) 590 time.sleep(0.01) 591 smtp.quit() 592 593 self.client_evt.set() 594 self.serv_evt.wait() 595 self.output.flush() 596 # Remove the X-Peer header that DebuggingServer adds. 597 test_output = self.get_output_without_xpeer() 598 del m['X-Peer'] 599 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 600 self.assertEqual(test_output, mexpect) 601 debugout = smtpd.DEBUGSTREAM.getvalue() 602 sender = re.compile("^sender: joe@example.com$", re.MULTILINE) 603 self.assertRegex(debugout, sender) 604 for addr in ('John', 'Dinsdale'): 605 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 606 re.MULTILINE) 607 self.assertNotRegex(debugout, to_addr) 608 recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE) 609 self.assertRegex(debugout, recip) 610 611 def testSendMessageWithMultipleFrom(self): 612 # Sender overrides To 613 m = email.mime.text.MIMEText('A test message') 614 m['From'] = 'Bernard, Bianca' 615 m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com' 616 m['To'] = 'John, Dinsdale' 617 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 618 timeout=support.LOOPBACK_TIMEOUT) 619 self.addCleanup(smtp.close) 620 smtp.send_message(m) 621 # XXX (see comment in testSend) 622 time.sleep(0.01) 623 smtp.quit() 624 625 self.client_evt.set() 626 self.serv_evt.wait() 627 self.output.flush() 628 # Remove the X-Peer header that DebuggingServer adds. 629 test_output = self.get_output_without_xpeer() 630 del m['X-Peer'] 631 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 632 self.assertEqual(test_output, mexpect) 633 debugout = smtpd.DEBUGSTREAM.getvalue() 634 sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE) 635 self.assertRegex(debugout, sender) 636 for addr in ('John', 'Dinsdale'): 637 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 638 re.MULTILINE) 639 self.assertRegex(debugout, to_addr) 640 641 def testSendMessageResent(self): 642 m = email.mime.text.MIMEText('A test message') 643 m['From'] = 'foo@bar.com' 644 m['To'] = 'John' 645 m['CC'] = 'Sally, Fred' 646 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 647 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 648 m['Resent-From'] = 'holy@grail.net' 649 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 650 m['Resent-Bcc'] = 'doe@losthope.net' 651 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 652 timeout=support.LOOPBACK_TIMEOUT) 653 self.addCleanup(smtp.close) 654 smtp.send_message(m) 655 # XXX (see comment in testSend) 656 time.sleep(0.01) 657 smtp.quit() 658 659 self.client_evt.set() 660 self.serv_evt.wait() 661 self.output.flush() 662 # The Resent-Bcc headers are deleted before serialization. 663 del m['Bcc'] 664 del m['Resent-Bcc'] 665 # Remove the X-Peer header that DebuggingServer adds. 666 test_output = self.get_output_without_xpeer() 667 del m['X-Peer'] 668 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 669 self.assertEqual(test_output, mexpect) 670 debugout = smtpd.DEBUGSTREAM.getvalue() 671 sender = re.compile("^sender: holy@grail.net$", re.MULTILINE) 672 self.assertRegex(debugout, sender) 673 for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'): 674 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 675 re.MULTILINE) 676 self.assertRegex(debugout, to_addr) 677 678 def testSendMessageMultipleResentRaises(self): 679 m = email.mime.text.MIMEText('A test message') 680 m['From'] = 'foo@bar.com' 681 m['To'] = 'John' 682 m['CC'] = 'Sally, Fred' 683 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 684 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 685 m['Resent-From'] = 'holy@grail.net' 686 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 687 m['Resent-Bcc'] = 'doe@losthope.net' 688 m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000' 689 m['Resent-To'] = 'holy@grail.net' 690 m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff' 691 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 692 timeout=support.LOOPBACK_TIMEOUT) 693 self.addCleanup(smtp.close) 694 with self.assertRaises(ValueError): 695 smtp.send_message(m) 696 smtp.close() 697 698class NonConnectingTests(unittest.TestCase): 699 700 def testNotConnected(self): 701 # Test various operations on an unconnected SMTP object that 702 # should raise exceptions (at present the attempt in SMTP.send 703 # to reference the nonexistent 'sock' attribute of the SMTP object 704 # causes an AttributeError) 705 smtp = smtplib.SMTP() 706 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo) 707 self.assertRaises(smtplib.SMTPServerDisconnected, 708 smtp.send, 'test msg') 709 710 def testNonnumericPort(self): 711 # check that non-numeric port raises OSError 712 self.assertRaises(OSError, smtplib.SMTP, 713 "localhost", "bogus") 714 self.assertRaises(OSError, smtplib.SMTP, 715 "localhost:bogus") 716 717 def testSockAttributeExists(self): 718 # check that sock attribute is present outside of a connect() call 719 # (regression test, the previous behavior raised an 720 # AttributeError: 'SMTP' object has no attribute 'sock') 721 with smtplib.SMTP() as smtp: 722 self.assertIsNone(smtp.sock) 723 724 725class DefaultArgumentsTests(unittest.TestCase): 726 727 def setUp(self): 728 self.msg = EmailMessage() 729 self.msg['From'] = 'Páolo <főo@bar.com>' 730 self.smtp = smtplib.SMTP() 731 self.smtp.ehlo = Mock(return_value=(200, 'OK')) 732 self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock() 733 734 def testSendMessage(self): 735 expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME') 736 self.smtp.send_message(self.msg) 737 self.smtp.send_message(self.msg) 738 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 739 expected_mail_options) 740 self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3], 741 expected_mail_options) 742 743 def testSendMessageWithMailOptions(self): 744 mail_options = ['STARTTLS'] 745 expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME') 746 self.smtp.send_message(self.msg, None, None, mail_options) 747 self.assertEqual(mail_options, ['STARTTLS']) 748 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 749 expected_mail_options) 750 751 752# test response of client to a non-successful HELO message 753class BadHELOServerTests(unittest.TestCase): 754 755 def setUp(self): 756 smtplib.socket = mock_socket 757 mock_socket.reply_with(b"199 no hello for you!") 758 self.old_stdout = sys.stdout 759 self.output = io.StringIO() 760 sys.stdout = self.output 761 self.port = 25 762 763 def tearDown(self): 764 smtplib.socket = socket 765 sys.stdout = self.old_stdout 766 767 def testFailingHELO(self): 768 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, 769 HOST, self.port, 'localhost', 3) 770 771 772class TooLongLineTests(unittest.TestCase): 773 respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n' 774 775 def setUp(self): 776 self.thread_key = threading_helper.threading_setup() 777 self.old_stdout = sys.stdout 778 self.output = io.StringIO() 779 sys.stdout = self.output 780 781 self.evt = threading.Event() 782 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 783 self.sock.settimeout(15) 784 self.port = socket_helper.bind_port(self.sock) 785 servargs = (self.evt, self.respdata, self.sock) 786 self.thread = threading.Thread(target=server, args=servargs) 787 self.thread.start() 788 self.evt.wait() 789 self.evt.clear() 790 791 def tearDown(self): 792 self.evt.wait() 793 sys.stdout = self.old_stdout 794 threading_helper.join_thread(self.thread) 795 del self.thread 796 self.doCleanups() 797 threading_helper.threading_cleanup(*self.thread_key) 798 799 def testLineTooLong(self): 800 self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP, 801 HOST, self.port, 'localhost', 3) 802 803 804sim_users = {'Mr.A@somewhere.com':'John A', 805 'Ms.B@xn--fo-fka.com':'Sally B', 806 'Mrs.C@somewhereesle.com':'Ruth C', 807 } 808 809sim_auth = ('Mr.A@somewhere.com', 'somepassword') 810sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn' 811 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=') 812sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'], 813 'list-2':['Ms.B@xn--fo-fka.com',], 814 } 815 816# Simulated SMTP channel & server 817class ResponseException(Exception): pass 818class SimSMTPChannel(smtpd.SMTPChannel): 819 820 quit_response = None 821 mail_response = None 822 rcpt_response = None 823 data_response = None 824 rcpt_count = 0 825 rset_count = 0 826 disconnect = 0 827 AUTH = 99 # Add protocol state to enable auth testing. 828 authenticated_user = None 829 830 def __init__(self, extra_features, *args, **kw): 831 self._extrafeatures = ''.join( 832 [ "250-{0}\r\n".format(x) for x in extra_features ]) 833 self.all_received_lines = [] 834 super(SimSMTPChannel, self).__init__(*args, **kw) 835 836 # AUTH related stuff. It would be nice if support for this were in smtpd. 837 def found_terminator(self): 838 if self.smtp_state == self.AUTH: 839 line = self._emptystring.join(self.received_lines) 840 print('Data:', repr(line), file=smtpd.DEBUGSTREAM) 841 self.received_lines = [] 842 try: 843 self.auth_object(line) 844 except ResponseException as e: 845 self.smtp_state = self.COMMAND 846 self.push('%s %s' % (e.smtp_code, e.smtp_error)) 847 return 848 self.all_received_lines.append(self.received_lines) 849 super().found_terminator() 850 851 852 def smtp_AUTH(self, arg): 853 if not self.seen_greeting: 854 self.push('503 Error: send EHLO first') 855 return 856 if not self.extended_smtp or 'AUTH' not in self._extrafeatures: 857 self.push('500 Error: command "AUTH" not recognized') 858 return 859 if self.authenticated_user is not None: 860 self.push( 861 '503 Bad sequence of commands: already authenticated') 862 return 863 args = arg.split() 864 if len(args) not in [1, 2]: 865 self.push('501 Syntax: AUTH <mechanism> [initial-response]') 866 return 867 auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_') 868 try: 869 self.auth_object = getattr(self, auth_object_name) 870 except AttributeError: 871 self.push('504 Command parameter not implemented: unsupported ' 872 ' authentication mechanism {!r}'.format(auth_object_name)) 873 return 874 self.smtp_state = self.AUTH 875 self.auth_object(args[1] if len(args) == 2 else None) 876 877 def _authenticated(self, user, valid): 878 if valid: 879 self.authenticated_user = user 880 self.push('235 Authentication Succeeded') 881 else: 882 self.push('535 Authentication credentials invalid') 883 self.smtp_state = self.COMMAND 884 885 def _decode_base64(self, string): 886 return base64.decodebytes(string.encode('ascii')).decode('utf-8') 887 888 def _auth_plain(self, arg=None): 889 if arg is None: 890 self.push('334 ') 891 else: 892 logpass = self._decode_base64(arg) 893 try: 894 *_, user, password = logpass.split('\0') 895 except ValueError as e: 896 self.push('535 Splitting response {!r} into user and password' 897 ' failed: {}'.format(logpass, e)) 898 return 899 self._authenticated(user, password == sim_auth[1]) 900 901 def _auth_login(self, arg=None): 902 if arg is None: 903 # base64 encoded 'Username:' 904 self.push('334 VXNlcm5hbWU6') 905 elif not hasattr(self, '_auth_login_user'): 906 self._auth_login_user = self._decode_base64(arg) 907 # base64 encoded 'Password:' 908 self.push('334 UGFzc3dvcmQ6') 909 else: 910 password = self._decode_base64(arg) 911 self._authenticated(self._auth_login_user, password == sim_auth[1]) 912 del self._auth_login_user 913 914 def _auth_buggy(self, arg=None): 915 # This AUTH mechanism will 'trap' client in a neverending 334 916 # base64 encoded 'BuGgYbUgGy' 917 self.push('334 QnVHZ1liVWdHeQ==') 918 919 def _auth_cram_md5(self, arg=None): 920 if arg is None: 921 self.push('334 {}'.format(sim_cram_md5_challenge)) 922 else: 923 logpass = self._decode_base64(arg) 924 try: 925 user, hashed_pass = logpass.split() 926 except ValueError as e: 927 self.push('535 Splitting response {!r} into user and password ' 928 'failed: {}'.format(logpass, e)) 929 return False 930 valid_hashed_pass = hmac.HMAC( 931 sim_auth[1].encode('ascii'), 932 self._decode_base64(sim_cram_md5_challenge).encode('ascii'), 933 'md5').hexdigest() 934 self._authenticated(user, hashed_pass == valid_hashed_pass) 935 # end AUTH related stuff. 936 937 def smtp_EHLO(self, arg): 938 resp = ('250-testhost\r\n' 939 '250-EXPN\r\n' 940 '250-SIZE 20000000\r\n' 941 '250-STARTTLS\r\n' 942 '250-DELIVERBY\r\n') 943 resp = resp + self._extrafeatures + '250 HELP' 944 self.push(resp) 945 self.seen_greeting = arg 946 self.extended_smtp = True 947 948 def smtp_VRFY(self, arg): 949 # For max compatibility smtplib should be sending the raw address. 950 if arg in sim_users: 951 self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg))) 952 else: 953 self.push('550 No such user: %s' % arg) 954 955 def smtp_EXPN(self, arg): 956 list_name = arg.lower() 957 if list_name in sim_lists: 958 user_list = sim_lists[list_name] 959 for n, user_email in enumerate(user_list): 960 quoted_addr = smtplib.quoteaddr(user_email) 961 if n < len(user_list) - 1: 962 self.push('250-%s %s' % (sim_users[user_email], quoted_addr)) 963 else: 964 self.push('250 %s %s' % (sim_users[user_email], quoted_addr)) 965 else: 966 self.push('550 No access for you!') 967 968 def smtp_QUIT(self, arg): 969 if self.quit_response is None: 970 super(SimSMTPChannel, self).smtp_QUIT(arg) 971 else: 972 self.push(self.quit_response) 973 self.close_when_done() 974 975 def smtp_MAIL(self, arg): 976 if self.mail_response is None: 977 super().smtp_MAIL(arg) 978 else: 979 self.push(self.mail_response) 980 if self.disconnect: 981 self.close_when_done() 982 983 def smtp_RCPT(self, arg): 984 if self.rcpt_response is None: 985 super().smtp_RCPT(arg) 986 return 987 self.rcpt_count += 1 988 self.push(self.rcpt_response[self.rcpt_count-1]) 989 990 def smtp_RSET(self, arg): 991 self.rset_count += 1 992 super().smtp_RSET(arg) 993 994 def smtp_DATA(self, arg): 995 if self.data_response is None: 996 super().smtp_DATA(arg) 997 else: 998 self.push(self.data_response) 999 1000 def handle_error(self): 1001 raise 1002 1003 1004class SimSMTPServer(smtpd.SMTPServer): 1005 1006 channel_class = SimSMTPChannel 1007 1008 def __init__(self, *args, **kw): 1009 self._extra_features = [] 1010 self._addresses = {} 1011 smtpd.SMTPServer.__init__(self, *args, **kw) 1012 1013 def handle_accepted(self, conn, addr): 1014 self._SMTPchannel = self.channel_class( 1015 self._extra_features, self, conn, addr, 1016 decode_data=self._decode_data) 1017 1018 def process_message(self, peer, mailfrom, rcpttos, data): 1019 self._addresses['from'] = mailfrom 1020 self._addresses['tos'] = rcpttos 1021 1022 def add_feature(self, feature): 1023 self._extra_features.append(feature) 1024 1025 def handle_error(self): 1026 raise 1027 1028 1029# Test various SMTP & ESMTP commands/behaviors that require a simulated server 1030# (i.e., something with more features than DebuggingServer) 1031class SMTPSimTests(unittest.TestCase): 1032 1033 def setUp(self): 1034 self.thread_key = threading_helper.threading_setup() 1035 self.real_getfqdn = socket.getfqdn 1036 socket.getfqdn = mock_socket.getfqdn 1037 self.serv_evt = threading.Event() 1038 self.client_evt = threading.Event() 1039 # Pick a random unused port by passing 0 for the port number 1040 self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True) 1041 # Keep a note of what port was assigned 1042 self.port = self.serv.socket.getsockname()[1] 1043 serv_args = (self.serv, self.serv_evt, self.client_evt) 1044 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1045 self.thread.start() 1046 1047 # wait until server thread has assigned a port number 1048 self.serv_evt.wait() 1049 self.serv_evt.clear() 1050 1051 def tearDown(self): 1052 socket.getfqdn = self.real_getfqdn 1053 # indicate that the client is finished 1054 self.client_evt.set() 1055 # wait for the server thread to terminate 1056 self.serv_evt.wait() 1057 threading_helper.join_thread(self.thread) 1058 del self.thread 1059 self.doCleanups() 1060 threading_helper.threading_cleanup(*self.thread_key) 1061 1062 def testBasic(self): 1063 # smoke test 1064 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1065 timeout=support.LOOPBACK_TIMEOUT) 1066 smtp.quit() 1067 1068 def testEHLO(self): 1069 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1070 timeout=support.LOOPBACK_TIMEOUT) 1071 1072 # no features should be present before the EHLO 1073 self.assertEqual(smtp.esmtp_features, {}) 1074 1075 # features expected from the test server 1076 expected_features = {'expn':'', 1077 'size': '20000000', 1078 'starttls': '', 1079 'deliverby': '', 1080 'help': '', 1081 } 1082 1083 smtp.ehlo() 1084 self.assertEqual(smtp.esmtp_features, expected_features) 1085 for k in expected_features: 1086 self.assertTrue(smtp.has_extn(k)) 1087 self.assertFalse(smtp.has_extn('unsupported-feature')) 1088 smtp.quit() 1089 1090 def testVRFY(self): 1091 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1092 timeout=support.LOOPBACK_TIMEOUT) 1093 1094 for addr_spec, name in sim_users.items(): 1095 expected_known = (250, bytes('%s %s' % 1096 (name, smtplib.quoteaddr(addr_spec)), 1097 "ascii")) 1098 self.assertEqual(smtp.vrfy(addr_spec), expected_known) 1099 1100 u = 'nobody@nowhere.com' 1101 expected_unknown = (550, ('No such user: %s' % u).encode('ascii')) 1102 self.assertEqual(smtp.vrfy(u), expected_unknown) 1103 smtp.quit() 1104 1105 def testEXPN(self): 1106 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1107 timeout=support.LOOPBACK_TIMEOUT) 1108 1109 for listname, members in sim_lists.items(): 1110 users = [] 1111 for m in members: 1112 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m))) 1113 expected_known = (250, bytes('\n'.join(users), "ascii")) 1114 self.assertEqual(smtp.expn(listname), expected_known) 1115 1116 u = 'PSU-Members-List' 1117 expected_unknown = (550, b'No access for you!') 1118 self.assertEqual(smtp.expn(u), expected_unknown) 1119 smtp.quit() 1120 1121 def testAUTH_PLAIN(self): 1122 self.serv.add_feature("AUTH PLAIN") 1123 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1124 timeout=support.LOOPBACK_TIMEOUT) 1125 resp = smtp.login(sim_auth[0], sim_auth[1]) 1126 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1127 smtp.close() 1128 1129 def testAUTH_LOGIN(self): 1130 self.serv.add_feature("AUTH LOGIN") 1131 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1132 timeout=support.LOOPBACK_TIMEOUT) 1133 resp = smtp.login(sim_auth[0], sim_auth[1]) 1134 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1135 smtp.close() 1136 1137 def testAUTH_LOGIN_initial_response_ok(self): 1138 self.serv.add_feature("AUTH LOGIN") 1139 with smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1140 timeout=support.LOOPBACK_TIMEOUT) as smtp: 1141 smtp.user, smtp.password = sim_auth 1142 smtp.ehlo("test_auth_login") 1143 resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=True) 1144 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1145 1146 def testAUTH_LOGIN_initial_response_notok(self): 1147 self.serv.add_feature("AUTH LOGIN") 1148 with smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1149 timeout=support.LOOPBACK_TIMEOUT) as smtp: 1150 smtp.user, smtp.password = sim_auth 1151 smtp.ehlo("test_auth_login") 1152 resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=False) 1153 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1154 1155 def testAUTH_BUGGY(self): 1156 self.serv.add_feature("AUTH BUGGY") 1157 1158 def auth_buggy(challenge=None): 1159 self.assertEqual(b"BuGgYbUgGy", challenge) 1160 return "\0" 1161 1162 smtp = smtplib.SMTP( 1163 HOST, self.port, local_hostname='localhost', 1164 timeout=support.LOOPBACK_TIMEOUT 1165 ) 1166 try: 1167 smtp.user, smtp.password = sim_auth 1168 smtp.ehlo("test_auth_buggy") 1169 expect = r"^Server AUTH mechanism infinite loop.*" 1170 with self.assertRaisesRegex(smtplib.SMTPException, expect) as cm: 1171 smtp.auth("BUGGY", auth_buggy, initial_response_ok=False) 1172 finally: 1173 smtp.close() 1174 1175 @hashlib_helper.requires_hashdigest('md5', openssl=True) 1176 def testAUTH_CRAM_MD5(self): 1177 self.serv.add_feature("AUTH CRAM-MD5") 1178 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1179 timeout=support.LOOPBACK_TIMEOUT) 1180 resp = smtp.login(sim_auth[0], sim_auth[1]) 1181 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1182 smtp.close() 1183 1184 @hashlib_helper.requires_hashdigest('md5', openssl=True) 1185 def testAUTH_multiple(self): 1186 # Test that multiple authentication methods are tried. 1187 self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5") 1188 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1189 timeout=support.LOOPBACK_TIMEOUT) 1190 resp = smtp.login(sim_auth[0], sim_auth[1]) 1191 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1192 smtp.close() 1193 1194 def test_auth_function(self): 1195 supported = {'PLAIN', 'LOGIN'} 1196 try: 1197 hashlib.md5() 1198 except ValueError: 1199 pass 1200 else: 1201 supported.add('CRAM-MD5') 1202 for mechanism in supported: 1203 self.serv.add_feature("AUTH {}".format(mechanism)) 1204 for mechanism in supported: 1205 with self.subTest(mechanism=mechanism): 1206 smtp = smtplib.SMTP(HOST, self.port, 1207 local_hostname='localhost', 1208 timeout=support.LOOPBACK_TIMEOUT) 1209 smtp.ehlo('foo') 1210 smtp.user, smtp.password = sim_auth[0], sim_auth[1] 1211 method = 'auth_' + mechanism.lower().replace('-', '_') 1212 resp = smtp.auth(mechanism, getattr(smtp, method)) 1213 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1214 smtp.close() 1215 1216 def test_quit_resets_greeting(self): 1217 smtp = smtplib.SMTP(HOST, self.port, 1218 local_hostname='localhost', 1219 timeout=support.LOOPBACK_TIMEOUT) 1220 code, message = smtp.ehlo() 1221 self.assertEqual(code, 250) 1222 self.assertIn('size', smtp.esmtp_features) 1223 smtp.quit() 1224 self.assertNotIn('size', smtp.esmtp_features) 1225 smtp.connect(HOST, self.port) 1226 self.assertNotIn('size', smtp.esmtp_features) 1227 smtp.ehlo_or_helo_if_needed() 1228 self.assertIn('size', smtp.esmtp_features) 1229 smtp.quit() 1230 1231 def test_with_statement(self): 1232 with smtplib.SMTP(HOST, self.port) as smtp: 1233 code, message = smtp.noop() 1234 self.assertEqual(code, 250) 1235 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1236 with smtplib.SMTP(HOST, self.port) as smtp: 1237 smtp.close() 1238 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1239 1240 def test_with_statement_QUIT_failure(self): 1241 with self.assertRaises(smtplib.SMTPResponseException) as error: 1242 with smtplib.SMTP(HOST, self.port) as smtp: 1243 smtp.noop() 1244 self.serv._SMTPchannel.quit_response = '421 QUIT FAILED' 1245 self.assertEqual(error.exception.smtp_code, 421) 1246 self.assertEqual(error.exception.smtp_error, b'QUIT FAILED') 1247 1248 #TODO: add tests for correct AUTH method fallback now that the 1249 #test infrastructure can support it. 1250 1251 # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception 1252 def test__rest_from_mail_cmd(self): 1253 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1254 timeout=support.LOOPBACK_TIMEOUT) 1255 smtp.noop() 1256 self.serv._SMTPchannel.mail_response = '451 Requested action aborted' 1257 self.serv._SMTPchannel.disconnect = True 1258 with self.assertRaises(smtplib.SMTPSenderRefused): 1259 smtp.sendmail('John', 'Sally', 'test message') 1260 self.assertIsNone(smtp.sock) 1261 1262 # Issue 5713: make sure close, not rset, is called if we get a 421 error 1263 def test_421_from_mail_cmd(self): 1264 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1265 timeout=support.LOOPBACK_TIMEOUT) 1266 smtp.noop() 1267 self.serv._SMTPchannel.mail_response = '421 closing connection' 1268 with self.assertRaises(smtplib.SMTPSenderRefused): 1269 smtp.sendmail('John', 'Sally', 'test message') 1270 self.assertIsNone(smtp.sock) 1271 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1272 1273 def test_421_from_rcpt_cmd(self): 1274 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1275 timeout=support.LOOPBACK_TIMEOUT) 1276 smtp.noop() 1277 self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing'] 1278 with self.assertRaises(smtplib.SMTPRecipientsRefused) as r: 1279 smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message') 1280 self.assertIsNone(smtp.sock) 1281 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1282 self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')}) 1283 1284 def test_421_from_data_cmd(self): 1285 class MySimSMTPChannel(SimSMTPChannel): 1286 def found_terminator(self): 1287 if self.smtp_state == self.DATA: 1288 self.push('421 closing') 1289 else: 1290 super().found_terminator() 1291 self.serv.channel_class = MySimSMTPChannel 1292 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1293 timeout=support.LOOPBACK_TIMEOUT) 1294 smtp.noop() 1295 with self.assertRaises(smtplib.SMTPDataError): 1296 smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message') 1297 self.assertIsNone(smtp.sock) 1298 self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0) 1299 1300 def test_smtputf8_NotSupportedError_if_no_server_support(self): 1301 smtp = smtplib.SMTP( 1302 HOST, self.port, local_hostname='localhost', 1303 timeout=support.LOOPBACK_TIMEOUT) 1304 self.addCleanup(smtp.close) 1305 smtp.ehlo() 1306 self.assertTrue(smtp.does_esmtp) 1307 self.assertFalse(smtp.has_extn('smtputf8')) 1308 self.assertRaises( 1309 smtplib.SMTPNotSupportedError, 1310 smtp.sendmail, 1311 'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1312 self.assertRaises( 1313 smtplib.SMTPNotSupportedError, 1314 smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8']) 1315 1316 def test_send_unicode_without_SMTPUTF8(self): 1317 smtp = smtplib.SMTP( 1318 HOST, self.port, local_hostname='localhost', 1319 timeout=support.LOOPBACK_TIMEOUT) 1320 self.addCleanup(smtp.close) 1321 self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '') 1322 self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice') 1323 1324 def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self): 1325 # This test is located here and not in the SMTPUTF8SimTests 1326 # class because it needs a "regular" SMTP server to work 1327 msg = EmailMessage() 1328 msg['From'] = "Páolo <főo@bar.com>" 1329 msg['To'] = 'Dinsdale' 1330 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1331 smtp = smtplib.SMTP( 1332 HOST, self.port, local_hostname='localhost', 1333 timeout=support.LOOPBACK_TIMEOUT) 1334 self.addCleanup(smtp.close) 1335 with self.assertRaises(smtplib.SMTPNotSupportedError): 1336 smtp.send_message(msg) 1337 1338 def test_name_field_not_included_in_envelop_addresses(self): 1339 smtp = smtplib.SMTP( 1340 HOST, self.port, local_hostname='localhost', 1341 timeout=support.LOOPBACK_TIMEOUT) 1342 self.addCleanup(smtp.close) 1343 1344 message = EmailMessage() 1345 message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com')) 1346 message['To'] = email.utils.formataddr(('René', 'rene@example.com')) 1347 1348 self.assertDictEqual(smtp.send_message(message), {}) 1349 1350 self.assertEqual(self.serv._addresses['from'], 'michael@example.com') 1351 self.assertEqual(self.serv._addresses['tos'], ['rene@example.com']) 1352 1353 def test_lowercase_mail_from_rcpt_to(self): 1354 m = 'A test message' 1355 smtp = smtplib.SMTP( 1356 HOST, self.port, local_hostname='localhost', 1357 timeout=support.LOOPBACK_TIMEOUT) 1358 self.addCleanup(smtp.close) 1359 1360 smtp.sendmail('John', 'Sally', m) 1361 1362 self.assertIn(['mail from:<John> size=14'], self.serv._SMTPchannel.all_received_lines) 1363 self.assertIn(['rcpt to:<Sally>'], self.serv._SMTPchannel.all_received_lines) 1364 1365 1366class SimSMTPUTF8Server(SimSMTPServer): 1367 1368 def __init__(self, *args, **kw): 1369 # The base SMTP server turns these on automatically, but our test 1370 # server is set up to munge the EHLO response, so we need to provide 1371 # them as well. And yes, the call is to SMTPServer not SimSMTPServer. 1372 self._extra_features = ['SMTPUTF8', '8BITMIME'] 1373 smtpd.SMTPServer.__init__(self, *args, **kw) 1374 1375 def handle_accepted(self, conn, addr): 1376 self._SMTPchannel = self.channel_class( 1377 self._extra_features, self, conn, addr, 1378 decode_data=self._decode_data, 1379 enable_SMTPUTF8=self.enable_SMTPUTF8, 1380 ) 1381 1382 def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None, 1383 rcpt_options=None): 1384 self.last_peer = peer 1385 self.last_mailfrom = mailfrom 1386 self.last_rcpttos = rcpttos 1387 self.last_message = data 1388 self.last_mail_options = mail_options 1389 self.last_rcpt_options = rcpt_options 1390 1391 1392class SMTPUTF8SimTests(unittest.TestCase): 1393 1394 maxDiff = None 1395 1396 def setUp(self): 1397 self.thread_key = threading_helper.threading_setup() 1398 self.real_getfqdn = socket.getfqdn 1399 socket.getfqdn = mock_socket.getfqdn 1400 self.serv_evt = threading.Event() 1401 self.client_evt = threading.Event() 1402 # Pick a random unused port by passing 0 for the port number 1403 self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1), 1404 decode_data=False, 1405 enable_SMTPUTF8=True) 1406 # Keep a note of what port was assigned 1407 self.port = self.serv.socket.getsockname()[1] 1408 serv_args = (self.serv, self.serv_evt, self.client_evt) 1409 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1410 self.thread.start() 1411 1412 # wait until server thread has assigned a port number 1413 self.serv_evt.wait() 1414 self.serv_evt.clear() 1415 1416 def tearDown(self): 1417 socket.getfqdn = self.real_getfqdn 1418 # indicate that the client is finished 1419 self.client_evt.set() 1420 # wait for the server thread to terminate 1421 self.serv_evt.wait() 1422 threading_helper.join_thread(self.thread) 1423 del self.thread 1424 self.doCleanups() 1425 threading_helper.threading_cleanup(*self.thread_key) 1426 1427 def test_test_server_supports_extensions(self): 1428 smtp = smtplib.SMTP( 1429 HOST, self.port, local_hostname='localhost', 1430 timeout=support.LOOPBACK_TIMEOUT) 1431 self.addCleanup(smtp.close) 1432 smtp.ehlo() 1433 self.assertTrue(smtp.does_esmtp) 1434 self.assertTrue(smtp.has_extn('smtputf8')) 1435 1436 def test_send_unicode_with_SMTPUTF8_via_sendmail(self): 1437 m = '¡a test message containing unicode!'.encode('utf-8') 1438 smtp = smtplib.SMTP( 1439 HOST, self.port, local_hostname='localhost', 1440 timeout=support.LOOPBACK_TIMEOUT) 1441 self.addCleanup(smtp.close) 1442 smtp.sendmail('Jőhn', 'Sálly', m, 1443 mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1444 self.assertEqual(self.serv.last_mailfrom, 'Jőhn') 1445 self.assertEqual(self.serv.last_rcpttos, ['Sálly']) 1446 self.assertEqual(self.serv.last_message, m) 1447 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1448 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1449 self.assertEqual(self.serv.last_rcpt_options, []) 1450 1451 def test_send_unicode_with_SMTPUTF8_via_low_level_API(self): 1452 m = '¡a test message containing unicode!'.encode('utf-8') 1453 smtp = smtplib.SMTP( 1454 HOST, self.port, local_hostname='localhost', 1455 timeout=support.LOOPBACK_TIMEOUT) 1456 self.addCleanup(smtp.close) 1457 smtp.ehlo() 1458 self.assertEqual( 1459 smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']), 1460 (250, b'OK')) 1461 self.assertEqual(smtp.rcpt('János'), (250, b'OK')) 1462 self.assertEqual(smtp.data(m), (250, b'OK')) 1463 self.assertEqual(self.serv.last_mailfrom, 'Jő') 1464 self.assertEqual(self.serv.last_rcpttos, ['János']) 1465 self.assertEqual(self.serv.last_message, m) 1466 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1467 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1468 self.assertEqual(self.serv.last_rcpt_options, []) 1469 1470 def test_send_message_uses_smtputf8_if_addrs_non_ascii(self): 1471 msg = EmailMessage() 1472 msg['From'] = "Páolo <főo@bar.com>" 1473 msg['To'] = 'Dinsdale' 1474 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1475 # XXX I don't know why I need two \n's here, but this is an existing 1476 # bug (if it is one) and not a problem with the new functionality. 1477 msg.set_content("oh là là, know what I mean, know what I mean?\n\n") 1478 # XXX smtpd converts received /r/n to /n, so we can't easily test that 1479 # we are successfully sending /r/n :(. 1480 expected = textwrap.dedent("""\ 1481 From: Páolo <főo@bar.com> 1482 To: Dinsdale 1483 Subject: Nudge nudge, wink, wink \u1F609 1484 Content-Type: text/plain; charset="utf-8" 1485 Content-Transfer-Encoding: 8bit 1486 MIME-Version: 1.0 1487 1488 oh là là, know what I mean, know what I mean? 1489 """) 1490 smtp = smtplib.SMTP( 1491 HOST, self.port, local_hostname='localhost', 1492 timeout=support.LOOPBACK_TIMEOUT) 1493 self.addCleanup(smtp.close) 1494 self.assertEqual(smtp.send_message(msg), {}) 1495 self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com') 1496 self.assertEqual(self.serv.last_rcpttos, ['Dinsdale']) 1497 self.assertEqual(self.serv.last_message.decode(), expected) 1498 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1499 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1500 self.assertEqual(self.serv.last_rcpt_options, []) 1501 1502 1503EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='') 1504 1505class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel): 1506 def smtp_AUTH(self, arg): 1507 # RFC 4954's AUTH command allows for an optional initial-response. 1508 # Not all AUTH methods support this; some require a challenge. AUTH 1509 # PLAIN does those, so test that here. See issue #15014. 1510 args = arg.split() 1511 if args[0].lower() == 'plain': 1512 if len(args) == 2: 1513 # AUTH PLAIN <initial-response> with the response base 64 1514 # encoded. Hard code the expected response for the test. 1515 if args[1] == EXPECTED_RESPONSE: 1516 self.push('235 Ok') 1517 return 1518 self.push('571 Bad authentication') 1519 1520class SimSMTPAUTHInitialResponseServer(SimSMTPServer): 1521 channel_class = SimSMTPAUTHInitialResponseChannel 1522 1523 1524class SMTPAUTHInitialResponseSimTests(unittest.TestCase): 1525 def setUp(self): 1526 self.thread_key = threading_helper.threading_setup() 1527 self.real_getfqdn = socket.getfqdn 1528 socket.getfqdn = mock_socket.getfqdn 1529 self.serv_evt = threading.Event() 1530 self.client_evt = threading.Event() 1531 # Pick a random unused port by passing 0 for the port number 1532 self.serv = SimSMTPAUTHInitialResponseServer( 1533 (HOST, 0), ('nowhere', -1), decode_data=True) 1534 # Keep a note of what port was assigned 1535 self.port = self.serv.socket.getsockname()[1] 1536 serv_args = (self.serv, self.serv_evt, self.client_evt) 1537 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1538 self.thread.start() 1539 1540 # wait until server thread has assigned a port number 1541 self.serv_evt.wait() 1542 self.serv_evt.clear() 1543 1544 def tearDown(self): 1545 socket.getfqdn = self.real_getfqdn 1546 # indicate that the client is finished 1547 self.client_evt.set() 1548 # wait for the server thread to terminate 1549 self.serv_evt.wait() 1550 threading_helper.join_thread(self.thread) 1551 del self.thread 1552 self.doCleanups() 1553 threading_helper.threading_cleanup(*self.thread_key) 1554 1555 def testAUTH_PLAIN_initial_response_login(self): 1556 self.serv.add_feature('AUTH PLAIN') 1557 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1558 timeout=support.LOOPBACK_TIMEOUT) 1559 smtp.login('psu', 'doesnotexist') 1560 smtp.close() 1561 1562 def testAUTH_PLAIN_initial_response_auth(self): 1563 self.serv.add_feature('AUTH PLAIN') 1564 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1565 timeout=support.LOOPBACK_TIMEOUT) 1566 smtp.user = 'psu' 1567 smtp.password = 'doesnotexist' 1568 code, response = smtp.auth('plain', smtp.auth_plain) 1569 smtp.close() 1570 self.assertEqual(code, 235) 1571 1572 1573if __name__ == '__main__': 1574 unittest.main() 1575