1import asyncore 2import base64 3import email.mime.text 4from email.message import EmailMessage 5from email.base64mime import body_encode as encode_base64 6import email.utils 7import hashlib 8import hmac 9import socket 10import smtpd 11import smtplib 12import io 13import re 14import sys 15import time 16import select 17import errno 18import textwrap 19import threading 20 21import unittest 22from test import support, mock_socket 23from test.support import hashlib_helper 24from test.support import socket_helper 25from test.support import threading_setup, threading_cleanup, join_thread 26from unittest.mock import Mock 27 28HOST = socket_helper.HOST 29 30if sys.platform == 'darwin': 31 # select.poll returns a select.POLLHUP at the end of the tests 32 # on darwin, so just ignore it 33 def handle_expt(self): 34 pass 35 smtpd.SMTPChannel.handle_expt = handle_expt 36 37 38def server(evt, buf, serv): 39 serv.listen() 40 evt.set() 41 try: 42 conn, addr = serv.accept() 43 except socket.timeout: 44 pass 45 else: 46 n = 500 47 while buf and n > 0: 48 r, w, e = select.select([], [conn], []) 49 if w: 50 sent = conn.send(buf) 51 buf = buf[sent:] 52 53 n -= 1 54 55 conn.close() 56 finally: 57 serv.close() 58 evt.set() 59 60class GeneralTests: 61 62 def setUp(self): 63 smtplib.socket = mock_socket 64 self.port = 25 65 66 def tearDown(self): 67 smtplib.socket = socket 68 69 # This method is no longer used but is retained for backward compatibility, 70 # so test to make sure it still works. 71 def testQuoteData(self): 72 teststr = "abc\n.jkl\rfoo\r\n..blue" 73 expected = "abc\r\n..jkl\r\nfoo\r\n...blue" 74 self.assertEqual(expected, smtplib.quotedata(teststr)) 75 76 def testBasic1(self): 77 mock_socket.reply_with(b"220 Hola mundo") 78 # connects 79 client = self.client(HOST, self.port) 80 client.close() 81 82 def testSourceAddress(self): 83 mock_socket.reply_with(b"220 Hola mundo") 84 # connects 85 client = self.client(HOST, self.port, 86 source_address=('127.0.0.1',19876)) 87 self.assertEqual(client.source_address, ('127.0.0.1', 19876)) 88 client.close() 89 90 def testBasic2(self): 91 mock_socket.reply_with(b"220 Hola mundo") 92 # connects, include port in host name 93 client = self.client("%s:%s" % (HOST, self.port)) 94 client.close() 95 96 def testLocalHostName(self): 97 mock_socket.reply_with(b"220 Hola mundo") 98 # check that supplied local_hostname is used 99 client = self.client(HOST, self.port, local_hostname="testhost") 100 self.assertEqual(client.local_hostname, "testhost") 101 client.close() 102 103 def testTimeoutDefault(self): 104 mock_socket.reply_with(b"220 Hola mundo") 105 self.assertIsNone(mock_socket.getdefaulttimeout()) 106 mock_socket.setdefaulttimeout(30) 107 self.assertEqual(mock_socket.getdefaulttimeout(), 30) 108 try: 109 client = self.client(HOST, self.port) 110 finally: 111 mock_socket.setdefaulttimeout(None) 112 self.assertEqual(client.sock.gettimeout(), 30) 113 client.close() 114 115 def testTimeoutNone(self): 116 mock_socket.reply_with(b"220 Hola mundo") 117 self.assertIsNone(socket.getdefaulttimeout()) 118 socket.setdefaulttimeout(30) 119 try: 120 client = self.client(HOST, self.port, timeout=None) 121 finally: 122 socket.setdefaulttimeout(None) 123 self.assertIsNone(client.sock.gettimeout()) 124 client.close() 125 126 def testTimeoutZero(self): 127 mock_socket.reply_with(b"220 Hola mundo") 128 with self.assertRaises(ValueError): 129 self.client(HOST, self.port, timeout=0) 130 131 def testTimeoutValue(self): 132 mock_socket.reply_with(b"220 Hola mundo") 133 client = self.client(HOST, self.port, timeout=30) 134 self.assertEqual(client.sock.gettimeout(), 30) 135 client.close() 136 137 def test_debuglevel(self): 138 mock_socket.reply_with(b"220 Hello world") 139 client = self.client() 140 client.set_debuglevel(1) 141 with support.captured_stderr() as stderr: 142 client.connect(HOST, self.port) 143 client.close() 144 expected = re.compile(r"^connect:", re.MULTILINE) 145 self.assertRegex(stderr.getvalue(), expected) 146 147 def test_debuglevel_2(self): 148 mock_socket.reply_with(b"220 Hello world") 149 client = self.client() 150 client.set_debuglevel(2) 151 with support.captured_stderr() as stderr: 152 client.connect(HOST, self.port) 153 client.close() 154 expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ", 155 re.MULTILINE) 156 self.assertRegex(stderr.getvalue(), expected) 157 158 159class SMTPGeneralTests(GeneralTests, unittest.TestCase): 160 161 client = smtplib.SMTP 162 163 164class LMTPGeneralTests(GeneralTests, unittest.TestCase): 165 166 client = smtplib.LMTP 167 168 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), "test requires Unix domain socket") 169 def testUnixDomainSocketTimeoutDefault(self): 170 local_host = '/some/local/lmtp/delivery/program' 171 mock_socket.reply_with(b"220 Hello world") 172 try: 173 client = self.client(local_host, self.port) 174 finally: 175 mock_socket.setdefaulttimeout(None) 176 self.assertIsNone(client.sock.gettimeout()) 177 client.close() 178 179 def testTimeoutZero(self): 180 super().testTimeoutZero() 181 local_host = '/some/local/lmtp/delivery/program' 182 with self.assertRaises(ValueError): 183 self.client(local_host, timeout=0) 184 185# Test server thread using the specified SMTP server class 186def debugging_server(serv, serv_evt, client_evt): 187 serv_evt.set() 188 189 try: 190 if hasattr(select, 'poll'): 191 poll_fun = asyncore.poll2 192 else: 193 poll_fun = asyncore.poll 194 195 n = 1000 196 while asyncore.socket_map and n > 0: 197 poll_fun(0.01, asyncore.socket_map) 198 199 # when the client conversation is finished, it will 200 # set client_evt, and it's then ok to kill the server 201 if client_evt.is_set(): 202 serv.close() 203 break 204 205 n -= 1 206 207 except socket.timeout: 208 pass 209 finally: 210 if not client_evt.is_set(): 211 # allow some time for the client to read the result 212 time.sleep(0.5) 213 serv.close() 214 asyncore.close_all() 215 serv_evt.set() 216 217MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n' 218MSG_END = '------------ END MESSAGE ------------\n' 219 220# NOTE: Some SMTP objects in the tests below are created with a non-default 221# local_hostname argument to the constructor, since (on some systems) the FQDN 222# lookup caused by the default local_hostname sometimes takes so long that the 223# test server times out, causing the test to fail. 224 225# Test behavior of smtpd.DebuggingServer 226class DebuggingServerTests(unittest.TestCase): 227 228 maxDiff = None 229 230 def setUp(self): 231 self.thread_key = threading_setup() 232 self.real_getfqdn = socket.getfqdn 233 socket.getfqdn = mock_socket.getfqdn 234 # temporarily replace sys.stdout to capture DebuggingServer output 235 self.old_stdout = sys.stdout 236 self.output = io.StringIO() 237 sys.stdout = self.output 238 239 self.serv_evt = threading.Event() 240 self.client_evt = threading.Event() 241 # Capture SMTPChannel debug output 242 self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM 243 smtpd.DEBUGSTREAM = io.StringIO() 244 # Pick a random unused port by passing 0 for the port number 245 self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1), 246 decode_data=True) 247 # Keep a note of what server host and port were assigned 248 self.host, self.port = self.serv.socket.getsockname()[:2] 249 serv_args = (self.serv, self.serv_evt, self.client_evt) 250 self.thread = threading.Thread(target=debugging_server, args=serv_args) 251 self.thread.start() 252 253 # wait until server thread has assigned a port number 254 self.serv_evt.wait() 255 self.serv_evt.clear() 256 257 def tearDown(self): 258 socket.getfqdn = self.real_getfqdn 259 # indicate that the client is finished 260 self.client_evt.set() 261 # wait for the server thread to terminate 262 self.serv_evt.wait() 263 join_thread(self.thread) 264 # restore sys.stdout 265 sys.stdout = self.old_stdout 266 # restore DEBUGSTREAM 267 smtpd.DEBUGSTREAM.close() 268 smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM 269 del self.thread 270 self.doCleanups() 271 threading_cleanup(*self.thread_key) 272 273 def get_output_without_xpeer(self): 274 test_output = self.output.getvalue() 275 return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2', 276 test_output, flags=re.MULTILINE|re.DOTALL) 277 278 def testBasic(self): 279 # connect 280 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 281 timeout=support.LOOPBACK_TIMEOUT) 282 smtp.quit() 283 284 def testSourceAddress(self): 285 # connect 286 src_port = socket_helper.find_unused_port() 287 try: 288 smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost', 289 timeout=support.LOOPBACK_TIMEOUT, 290 source_address=(self.host, src_port)) 291 self.addCleanup(smtp.close) 292 self.assertEqual(smtp.source_address, (self.host, src_port)) 293 self.assertEqual(smtp.local_hostname, 'localhost') 294 smtp.quit() 295 except OSError as e: 296 if e.errno == errno.EADDRINUSE: 297 self.skipTest("couldn't bind to source port %d" % src_port) 298 raise 299 300 def testNOOP(self): 301 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 302 timeout=support.LOOPBACK_TIMEOUT) 303 self.addCleanup(smtp.close) 304 expected = (250, b'OK') 305 self.assertEqual(smtp.noop(), expected) 306 smtp.quit() 307 308 def testRSET(self): 309 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 310 timeout=support.LOOPBACK_TIMEOUT) 311 self.addCleanup(smtp.close) 312 expected = (250, b'OK') 313 self.assertEqual(smtp.rset(), expected) 314 smtp.quit() 315 316 def testELHO(self): 317 # EHLO isn't implemented in DebuggingServer 318 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 319 timeout=support.LOOPBACK_TIMEOUT) 320 self.addCleanup(smtp.close) 321 expected = (250, b'\nSIZE 33554432\nHELP') 322 self.assertEqual(smtp.ehlo(), expected) 323 smtp.quit() 324 325 def testEXPNNotImplemented(self): 326 # EXPN isn't implemented in DebuggingServer 327 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 328 timeout=support.LOOPBACK_TIMEOUT) 329 self.addCleanup(smtp.close) 330 expected = (502, b'EXPN not implemented') 331 smtp.putcmd('EXPN') 332 self.assertEqual(smtp.getreply(), expected) 333 smtp.quit() 334 335 def testVRFY(self): 336 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 337 timeout=support.LOOPBACK_TIMEOUT) 338 self.addCleanup(smtp.close) 339 expected = (252, b'Cannot VRFY user, but will accept message ' + \ 340 b'and attempt delivery') 341 self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected) 342 self.assertEqual(smtp.verify('nobody@nowhere.com'), expected) 343 smtp.quit() 344 345 def testSecondHELO(self): 346 # check that a second HELO returns a message that it's a duplicate 347 # (this behavior is specific to smtpd.SMTPChannel) 348 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 349 timeout=support.LOOPBACK_TIMEOUT) 350 self.addCleanup(smtp.close) 351 smtp.helo() 352 expected = (503, b'Duplicate HELO/EHLO') 353 self.assertEqual(smtp.helo(), expected) 354 smtp.quit() 355 356 def testHELP(self): 357 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 358 timeout=support.LOOPBACK_TIMEOUT) 359 self.addCleanup(smtp.close) 360 self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \ 361 b'RCPT DATA RSET NOOP QUIT VRFY') 362 smtp.quit() 363 364 def testSend(self): 365 # connect and send mail 366 m = 'A test message' 367 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 368 timeout=support.LOOPBACK_TIMEOUT) 369 self.addCleanup(smtp.close) 370 smtp.sendmail('John', 'Sally', m) 371 # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor 372 # in asyncore. This sleep might help, but should really be fixed 373 # properly by using an Event variable. 374 time.sleep(0.01) 375 smtp.quit() 376 377 self.client_evt.set() 378 self.serv_evt.wait() 379 self.output.flush() 380 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 381 self.assertEqual(self.output.getvalue(), mexpect) 382 383 def testSendBinary(self): 384 m = b'A test message' 385 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 386 timeout=support.LOOPBACK_TIMEOUT) 387 self.addCleanup(smtp.close) 388 smtp.sendmail('John', 'Sally', m) 389 # XXX (see comment in testSend) 390 time.sleep(0.01) 391 smtp.quit() 392 393 self.client_evt.set() 394 self.serv_evt.wait() 395 self.output.flush() 396 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END) 397 self.assertEqual(self.output.getvalue(), mexpect) 398 399 def testSendNeedingDotQuote(self): 400 # Issue 12283 401 m = '.A test\n.mes.sage.' 402 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 403 timeout=support.LOOPBACK_TIMEOUT) 404 self.addCleanup(smtp.close) 405 smtp.sendmail('John', 'Sally', m) 406 # XXX (see comment in testSend) 407 time.sleep(0.01) 408 smtp.quit() 409 410 self.client_evt.set() 411 self.serv_evt.wait() 412 self.output.flush() 413 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 414 self.assertEqual(self.output.getvalue(), mexpect) 415 416 def testSendNullSender(self): 417 m = 'A test message' 418 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 419 timeout=support.LOOPBACK_TIMEOUT) 420 self.addCleanup(smtp.close) 421 smtp.sendmail('<>', 'Sally', m) 422 # XXX (see comment in testSend) 423 time.sleep(0.01) 424 smtp.quit() 425 426 self.client_evt.set() 427 self.serv_evt.wait() 428 self.output.flush() 429 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 430 self.assertEqual(self.output.getvalue(), mexpect) 431 debugout = smtpd.DEBUGSTREAM.getvalue() 432 sender = re.compile("^sender: <>$", re.MULTILINE) 433 self.assertRegex(debugout, sender) 434 435 def testSendMessage(self): 436 m = email.mime.text.MIMEText('A test message') 437 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 438 timeout=support.LOOPBACK_TIMEOUT) 439 self.addCleanup(smtp.close) 440 smtp.send_message(m, from_addr='John', to_addrs='Sally') 441 # XXX (see comment in testSend) 442 time.sleep(0.01) 443 smtp.quit() 444 445 self.client_evt.set() 446 self.serv_evt.wait() 447 self.output.flush() 448 # Remove the X-Peer header that DebuggingServer adds as figuring out 449 # exactly what IP address format is put there is not easy (and 450 # irrelevant to our test). Typically 127.0.0.1 or ::1, but it is 451 # not always the same as socket.gethostbyname(HOST). :( 452 test_output = self.get_output_without_xpeer() 453 del m['X-Peer'] 454 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 455 self.assertEqual(test_output, mexpect) 456 457 def testSendMessageWithAddresses(self): 458 m = email.mime.text.MIMEText('A test message') 459 m['From'] = 'foo@bar.com' 460 m['To'] = 'John' 461 m['CC'] = 'Sally, Fred' 462 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 463 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 464 timeout=support.LOOPBACK_TIMEOUT) 465 self.addCleanup(smtp.close) 466 smtp.send_message(m) 467 # XXX (see comment in testSend) 468 time.sleep(0.01) 469 smtp.quit() 470 # make sure the Bcc header is still in the message. 471 self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" ' 472 '<warped@silly.walks.com>') 473 474 self.client_evt.set() 475 self.serv_evt.wait() 476 self.output.flush() 477 # Remove the X-Peer header that DebuggingServer adds. 478 test_output = self.get_output_without_xpeer() 479 del m['X-Peer'] 480 # The Bcc header should not be transmitted. 481 del m['Bcc'] 482 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 483 self.assertEqual(test_output, mexpect) 484 debugout = smtpd.DEBUGSTREAM.getvalue() 485 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 486 self.assertRegex(debugout, sender) 487 for addr in ('John', 'Sally', 'Fred', 'root@localhost', 488 'warped@silly.walks.com'): 489 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 490 re.MULTILINE) 491 self.assertRegex(debugout, to_addr) 492 493 def testSendMessageWithSomeAddresses(self): 494 # Make sure nothing breaks if not all of the three 'to' headers exist 495 m = email.mime.text.MIMEText('A test message') 496 m['From'] = 'foo@bar.com' 497 m['To'] = 'John, Dinsdale' 498 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 499 timeout=support.LOOPBACK_TIMEOUT) 500 self.addCleanup(smtp.close) 501 smtp.send_message(m) 502 # XXX (see comment in testSend) 503 time.sleep(0.01) 504 smtp.quit() 505 506 self.client_evt.set() 507 self.serv_evt.wait() 508 self.output.flush() 509 # Remove the X-Peer header that DebuggingServer adds. 510 test_output = self.get_output_without_xpeer() 511 del m['X-Peer'] 512 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 513 self.assertEqual(test_output, mexpect) 514 debugout = smtpd.DEBUGSTREAM.getvalue() 515 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 516 self.assertRegex(debugout, sender) 517 for addr in ('John', 'Dinsdale'): 518 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 519 re.MULTILINE) 520 self.assertRegex(debugout, to_addr) 521 522 def testSendMessageWithSpecifiedAddresses(self): 523 # Make sure addresses specified in call override those in message. 524 m = email.mime.text.MIMEText('A test message') 525 m['From'] = 'foo@bar.com' 526 m['To'] = 'John, Dinsdale' 527 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 528 timeout=support.LOOPBACK_TIMEOUT) 529 self.addCleanup(smtp.close) 530 smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net') 531 # XXX (see comment in testSend) 532 time.sleep(0.01) 533 smtp.quit() 534 535 self.client_evt.set() 536 self.serv_evt.wait() 537 self.output.flush() 538 # Remove the X-Peer header that DebuggingServer adds. 539 test_output = self.get_output_without_xpeer() 540 del m['X-Peer'] 541 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 542 self.assertEqual(test_output, mexpect) 543 debugout = smtpd.DEBUGSTREAM.getvalue() 544 sender = re.compile("^sender: joe@example.com$", re.MULTILINE) 545 self.assertRegex(debugout, sender) 546 for addr in ('John', 'Dinsdale'): 547 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 548 re.MULTILINE) 549 self.assertNotRegex(debugout, to_addr) 550 recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE) 551 self.assertRegex(debugout, recip) 552 553 def testSendMessageWithMultipleFrom(self): 554 # Sender overrides To 555 m = email.mime.text.MIMEText('A test message') 556 m['From'] = 'Bernard, Bianca' 557 m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com' 558 m['To'] = 'John, Dinsdale' 559 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 560 timeout=support.LOOPBACK_TIMEOUT) 561 self.addCleanup(smtp.close) 562 smtp.send_message(m) 563 # XXX (see comment in testSend) 564 time.sleep(0.01) 565 smtp.quit() 566 567 self.client_evt.set() 568 self.serv_evt.wait() 569 self.output.flush() 570 # Remove the X-Peer header that DebuggingServer adds. 571 test_output = self.get_output_without_xpeer() 572 del m['X-Peer'] 573 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 574 self.assertEqual(test_output, mexpect) 575 debugout = smtpd.DEBUGSTREAM.getvalue() 576 sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE) 577 self.assertRegex(debugout, sender) 578 for addr in ('John', 'Dinsdale'): 579 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 580 re.MULTILINE) 581 self.assertRegex(debugout, to_addr) 582 583 def testSendMessageResent(self): 584 m = email.mime.text.MIMEText('A test message') 585 m['From'] = 'foo@bar.com' 586 m['To'] = 'John' 587 m['CC'] = 'Sally, Fred' 588 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 589 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 590 m['Resent-From'] = 'holy@grail.net' 591 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 592 m['Resent-Bcc'] = 'doe@losthope.net' 593 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 594 timeout=support.LOOPBACK_TIMEOUT) 595 self.addCleanup(smtp.close) 596 smtp.send_message(m) 597 # XXX (see comment in testSend) 598 time.sleep(0.01) 599 smtp.quit() 600 601 self.client_evt.set() 602 self.serv_evt.wait() 603 self.output.flush() 604 # The Resent-Bcc headers are deleted before serialization. 605 del m['Bcc'] 606 del m['Resent-Bcc'] 607 # Remove the X-Peer header that DebuggingServer adds. 608 test_output = self.get_output_without_xpeer() 609 del m['X-Peer'] 610 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 611 self.assertEqual(test_output, mexpect) 612 debugout = smtpd.DEBUGSTREAM.getvalue() 613 sender = re.compile("^sender: holy@grail.net$", re.MULTILINE) 614 self.assertRegex(debugout, sender) 615 for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'): 616 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 617 re.MULTILINE) 618 self.assertRegex(debugout, to_addr) 619 620 def testSendMessageMultipleResentRaises(self): 621 m = email.mime.text.MIMEText('A test message') 622 m['From'] = 'foo@bar.com' 623 m['To'] = 'John' 624 m['CC'] = 'Sally, Fred' 625 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 626 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 627 m['Resent-From'] = 'holy@grail.net' 628 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 629 m['Resent-Bcc'] = 'doe@losthope.net' 630 m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000' 631 m['Resent-To'] = 'holy@grail.net' 632 m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff' 633 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 634 timeout=support.LOOPBACK_TIMEOUT) 635 self.addCleanup(smtp.close) 636 with self.assertRaises(ValueError): 637 smtp.send_message(m) 638 smtp.close() 639 640class NonConnectingTests(unittest.TestCase): 641 642 def testNotConnected(self): 643 # Test various operations on an unconnected SMTP object that 644 # should raise exceptions (at present the attempt in SMTP.send 645 # to reference the nonexistent 'sock' attribute of the SMTP object 646 # causes an AttributeError) 647 smtp = smtplib.SMTP() 648 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo) 649 self.assertRaises(smtplib.SMTPServerDisconnected, 650 smtp.send, 'test msg') 651 652 def testNonnumericPort(self): 653 # check that non-numeric port raises OSError 654 self.assertRaises(OSError, smtplib.SMTP, 655 "localhost", "bogus") 656 self.assertRaises(OSError, smtplib.SMTP, 657 "localhost:bogus") 658 659 def testSockAttributeExists(self): 660 # check that sock attribute is present outside of a connect() call 661 # (regression test, the previous behavior raised an 662 # AttributeError: 'SMTP' object has no attribute 'sock') 663 with smtplib.SMTP() as smtp: 664 self.assertIsNone(smtp.sock) 665 666 667class DefaultArgumentsTests(unittest.TestCase): 668 669 def setUp(self): 670 self.msg = EmailMessage() 671 self.msg['From'] = 'Páolo <főo@bar.com>' 672 self.smtp = smtplib.SMTP() 673 self.smtp.ehlo = Mock(return_value=(200, 'OK')) 674 self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock() 675 676 def testSendMessage(self): 677 expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME') 678 self.smtp.send_message(self.msg) 679 self.smtp.send_message(self.msg) 680 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 681 expected_mail_options) 682 self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3], 683 expected_mail_options) 684 685 def testSendMessageWithMailOptions(self): 686 mail_options = ['STARTTLS'] 687 expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME') 688 self.smtp.send_message(self.msg, None, None, mail_options) 689 self.assertEqual(mail_options, ['STARTTLS']) 690 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 691 expected_mail_options) 692 693 694# test response of client to a non-successful HELO message 695class BadHELOServerTests(unittest.TestCase): 696 697 def setUp(self): 698 smtplib.socket = mock_socket 699 mock_socket.reply_with(b"199 no hello for you!") 700 self.old_stdout = sys.stdout 701 self.output = io.StringIO() 702 sys.stdout = self.output 703 self.port = 25 704 705 def tearDown(self): 706 smtplib.socket = socket 707 sys.stdout = self.old_stdout 708 709 def testFailingHELO(self): 710 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, 711 HOST, self.port, 'localhost', 3) 712 713 714class TooLongLineTests(unittest.TestCase): 715 respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n' 716 717 def setUp(self): 718 self.thread_key = threading_setup() 719 self.old_stdout = sys.stdout 720 self.output = io.StringIO() 721 sys.stdout = self.output 722 723 self.evt = threading.Event() 724 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 725 self.sock.settimeout(15) 726 self.port = socket_helper.bind_port(self.sock) 727 servargs = (self.evt, self.respdata, self.sock) 728 self.thread = threading.Thread(target=server, args=servargs) 729 self.thread.start() 730 self.evt.wait() 731 self.evt.clear() 732 733 def tearDown(self): 734 self.evt.wait() 735 sys.stdout = self.old_stdout 736 join_thread(self.thread) 737 del self.thread 738 self.doCleanups() 739 threading_cleanup(*self.thread_key) 740 741 def testLineTooLong(self): 742 self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP, 743 HOST, self.port, 'localhost', 3) 744 745 746sim_users = {'Mr.A@somewhere.com':'John A', 747 'Ms.B@xn--fo-fka.com':'Sally B', 748 'Mrs.C@somewhereesle.com':'Ruth C', 749 } 750 751sim_auth = ('Mr.A@somewhere.com', 'somepassword') 752sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn' 753 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=') 754sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'], 755 'list-2':['Ms.B@xn--fo-fka.com',], 756 } 757 758# Simulated SMTP channel & server 759class ResponseException(Exception): pass 760class SimSMTPChannel(smtpd.SMTPChannel): 761 762 quit_response = None 763 mail_response = None 764 rcpt_response = None 765 data_response = None 766 rcpt_count = 0 767 rset_count = 0 768 disconnect = 0 769 AUTH = 99 # Add protocol state to enable auth testing. 770 authenticated_user = None 771 772 def __init__(self, extra_features, *args, **kw): 773 self._extrafeatures = ''.join( 774 [ "250-{0}\r\n".format(x) for x in extra_features ]) 775 super(SimSMTPChannel, self).__init__(*args, **kw) 776 777 # AUTH related stuff. It would be nice if support for this were in smtpd. 778 def found_terminator(self): 779 if self.smtp_state == self.AUTH: 780 line = self._emptystring.join(self.received_lines) 781 print('Data:', repr(line), file=smtpd.DEBUGSTREAM) 782 self.received_lines = [] 783 try: 784 self.auth_object(line) 785 except ResponseException as e: 786 self.smtp_state = self.COMMAND 787 self.push('%s %s' % (e.smtp_code, e.smtp_error)) 788 return 789 super().found_terminator() 790 791 792 def smtp_AUTH(self, arg): 793 if not self.seen_greeting: 794 self.push('503 Error: send EHLO first') 795 return 796 if not self.extended_smtp or 'AUTH' not in self._extrafeatures: 797 self.push('500 Error: command "AUTH" not recognized') 798 return 799 if self.authenticated_user is not None: 800 self.push( 801 '503 Bad sequence of commands: already authenticated') 802 return 803 args = arg.split() 804 if len(args) not in [1, 2]: 805 self.push('501 Syntax: AUTH <mechanism> [initial-response]') 806 return 807 auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_') 808 try: 809 self.auth_object = getattr(self, auth_object_name) 810 except AttributeError: 811 self.push('504 Command parameter not implemented: unsupported ' 812 ' authentication mechanism {!r}'.format(auth_object_name)) 813 return 814 self.smtp_state = self.AUTH 815 self.auth_object(args[1] if len(args) == 2 else None) 816 817 def _authenticated(self, user, valid): 818 if valid: 819 self.authenticated_user = user 820 self.push('235 Authentication Succeeded') 821 else: 822 self.push('535 Authentication credentials invalid') 823 self.smtp_state = self.COMMAND 824 825 def _decode_base64(self, string): 826 return base64.decodebytes(string.encode('ascii')).decode('utf-8') 827 828 def _auth_plain(self, arg=None): 829 if arg is None: 830 self.push('334 ') 831 else: 832 logpass = self._decode_base64(arg) 833 try: 834 *_, user, password = logpass.split('\0') 835 except ValueError as e: 836 self.push('535 Splitting response {!r} into user and password' 837 ' failed: {}'.format(logpass, e)) 838 return 839 self._authenticated(user, password == sim_auth[1]) 840 841 def _auth_login(self, arg=None): 842 if arg is None: 843 # base64 encoded 'Username:' 844 self.push('334 VXNlcm5hbWU6') 845 elif not hasattr(self, '_auth_login_user'): 846 self._auth_login_user = self._decode_base64(arg) 847 # base64 encoded 'Password:' 848 self.push('334 UGFzc3dvcmQ6') 849 else: 850 password = self._decode_base64(arg) 851 self._authenticated(self._auth_login_user, password == sim_auth[1]) 852 del self._auth_login_user 853 854 def _auth_cram_md5(self, arg=None): 855 if arg is None: 856 self.push('334 {}'.format(sim_cram_md5_challenge)) 857 else: 858 logpass = self._decode_base64(arg) 859 try: 860 user, hashed_pass = logpass.split() 861 except ValueError as e: 862 self.push('535 Splitting response {!r} into user and password ' 863 'failed: {}'.format(logpass, e)) 864 return False 865 valid_hashed_pass = hmac.HMAC( 866 sim_auth[1].encode('ascii'), 867 self._decode_base64(sim_cram_md5_challenge).encode('ascii'), 868 'md5').hexdigest() 869 self._authenticated(user, hashed_pass == valid_hashed_pass) 870 # end AUTH related stuff. 871 872 def smtp_EHLO(self, arg): 873 resp = ('250-testhost\r\n' 874 '250-EXPN\r\n' 875 '250-SIZE 20000000\r\n' 876 '250-STARTTLS\r\n' 877 '250-DELIVERBY\r\n') 878 resp = resp + self._extrafeatures + '250 HELP' 879 self.push(resp) 880 self.seen_greeting = arg 881 self.extended_smtp = True 882 883 def smtp_VRFY(self, arg): 884 # For max compatibility smtplib should be sending the raw address. 885 if arg in sim_users: 886 self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg))) 887 else: 888 self.push('550 No such user: %s' % arg) 889 890 def smtp_EXPN(self, arg): 891 list_name = arg.lower() 892 if list_name in sim_lists: 893 user_list = sim_lists[list_name] 894 for n, user_email in enumerate(user_list): 895 quoted_addr = smtplib.quoteaddr(user_email) 896 if n < len(user_list) - 1: 897 self.push('250-%s %s' % (sim_users[user_email], quoted_addr)) 898 else: 899 self.push('250 %s %s' % (sim_users[user_email], quoted_addr)) 900 else: 901 self.push('550 No access for you!') 902 903 def smtp_QUIT(self, arg): 904 if self.quit_response is None: 905 super(SimSMTPChannel, self).smtp_QUIT(arg) 906 else: 907 self.push(self.quit_response) 908 self.close_when_done() 909 910 def smtp_MAIL(self, arg): 911 if self.mail_response is None: 912 super().smtp_MAIL(arg) 913 else: 914 self.push(self.mail_response) 915 if self.disconnect: 916 self.close_when_done() 917 918 def smtp_RCPT(self, arg): 919 if self.rcpt_response is None: 920 super().smtp_RCPT(arg) 921 return 922 self.rcpt_count += 1 923 self.push(self.rcpt_response[self.rcpt_count-1]) 924 925 def smtp_RSET(self, arg): 926 self.rset_count += 1 927 super().smtp_RSET(arg) 928 929 def smtp_DATA(self, arg): 930 if self.data_response is None: 931 super().smtp_DATA(arg) 932 else: 933 self.push(self.data_response) 934 935 def handle_error(self): 936 raise 937 938 939class SimSMTPServer(smtpd.SMTPServer): 940 941 channel_class = SimSMTPChannel 942 943 def __init__(self, *args, **kw): 944 self._extra_features = [] 945 self._addresses = {} 946 smtpd.SMTPServer.__init__(self, *args, **kw) 947 948 def handle_accepted(self, conn, addr): 949 self._SMTPchannel = self.channel_class( 950 self._extra_features, self, conn, addr, 951 decode_data=self._decode_data) 952 953 def process_message(self, peer, mailfrom, rcpttos, data): 954 self._addresses['from'] = mailfrom 955 self._addresses['tos'] = rcpttos 956 957 def add_feature(self, feature): 958 self._extra_features.append(feature) 959 960 def handle_error(self): 961 raise 962 963 964# Test various SMTP & ESMTP commands/behaviors that require a simulated server 965# (i.e., something with more features than DebuggingServer) 966class SMTPSimTests(unittest.TestCase): 967 968 def setUp(self): 969 self.thread_key = threading_setup() 970 self.real_getfqdn = socket.getfqdn 971 socket.getfqdn = mock_socket.getfqdn 972 self.serv_evt = threading.Event() 973 self.client_evt = threading.Event() 974 # Pick a random unused port by passing 0 for the port number 975 self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True) 976 # Keep a note of what port was assigned 977 self.port = self.serv.socket.getsockname()[1] 978 serv_args = (self.serv, self.serv_evt, self.client_evt) 979 self.thread = threading.Thread(target=debugging_server, args=serv_args) 980 self.thread.start() 981 982 # wait until server thread has assigned a port number 983 self.serv_evt.wait() 984 self.serv_evt.clear() 985 986 def tearDown(self): 987 socket.getfqdn = self.real_getfqdn 988 # indicate that the client is finished 989 self.client_evt.set() 990 # wait for the server thread to terminate 991 self.serv_evt.wait() 992 join_thread(self.thread) 993 del self.thread 994 self.doCleanups() 995 threading_cleanup(*self.thread_key) 996 997 def testBasic(self): 998 # smoke test 999 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1000 timeout=support.LOOPBACK_TIMEOUT) 1001 smtp.quit() 1002 1003 def testEHLO(self): 1004 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1005 timeout=support.LOOPBACK_TIMEOUT) 1006 1007 # no features should be present before the EHLO 1008 self.assertEqual(smtp.esmtp_features, {}) 1009 1010 # features expected from the test server 1011 expected_features = {'expn':'', 1012 'size': '20000000', 1013 'starttls': '', 1014 'deliverby': '', 1015 'help': '', 1016 } 1017 1018 smtp.ehlo() 1019 self.assertEqual(smtp.esmtp_features, expected_features) 1020 for k in expected_features: 1021 self.assertTrue(smtp.has_extn(k)) 1022 self.assertFalse(smtp.has_extn('unsupported-feature')) 1023 smtp.quit() 1024 1025 def testVRFY(self): 1026 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1027 timeout=support.LOOPBACK_TIMEOUT) 1028 1029 for addr_spec, name in sim_users.items(): 1030 expected_known = (250, bytes('%s %s' % 1031 (name, smtplib.quoteaddr(addr_spec)), 1032 "ascii")) 1033 self.assertEqual(smtp.vrfy(addr_spec), expected_known) 1034 1035 u = 'nobody@nowhere.com' 1036 expected_unknown = (550, ('No such user: %s' % u).encode('ascii')) 1037 self.assertEqual(smtp.vrfy(u), expected_unknown) 1038 smtp.quit() 1039 1040 def testEXPN(self): 1041 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1042 timeout=support.LOOPBACK_TIMEOUT) 1043 1044 for listname, members in sim_lists.items(): 1045 users = [] 1046 for m in members: 1047 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m))) 1048 expected_known = (250, bytes('\n'.join(users), "ascii")) 1049 self.assertEqual(smtp.expn(listname), expected_known) 1050 1051 u = 'PSU-Members-List' 1052 expected_unknown = (550, b'No access for you!') 1053 self.assertEqual(smtp.expn(u), expected_unknown) 1054 smtp.quit() 1055 1056 def testAUTH_PLAIN(self): 1057 self.serv.add_feature("AUTH PLAIN") 1058 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1059 timeout=support.LOOPBACK_TIMEOUT) 1060 resp = smtp.login(sim_auth[0], sim_auth[1]) 1061 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1062 smtp.close() 1063 1064 def testAUTH_LOGIN(self): 1065 self.serv.add_feature("AUTH LOGIN") 1066 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1067 timeout=support.LOOPBACK_TIMEOUT) 1068 resp = smtp.login(sim_auth[0], sim_auth[1]) 1069 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1070 smtp.close() 1071 1072 @hashlib_helper.requires_hashdigest('md5') 1073 def testAUTH_CRAM_MD5(self): 1074 self.serv.add_feature("AUTH CRAM-MD5") 1075 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1076 timeout=support.LOOPBACK_TIMEOUT) 1077 resp = smtp.login(sim_auth[0], sim_auth[1]) 1078 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1079 smtp.close() 1080 1081 @hashlib_helper.requires_hashdigest('md5') 1082 def testAUTH_multiple(self): 1083 # Test that multiple authentication methods are tried. 1084 self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5") 1085 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1086 timeout=support.LOOPBACK_TIMEOUT) 1087 resp = smtp.login(sim_auth[0], sim_auth[1]) 1088 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1089 smtp.close() 1090 1091 def test_auth_function(self): 1092 supported = {'PLAIN', 'LOGIN'} 1093 try: 1094 hashlib.md5() 1095 except ValueError: 1096 pass 1097 else: 1098 supported.add('CRAM-MD5') 1099 for mechanism in supported: 1100 self.serv.add_feature("AUTH {}".format(mechanism)) 1101 for mechanism in supported: 1102 with self.subTest(mechanism=mechanism): 1103 smtp = smtplib.SMTP(HOST, self.port, 1104 local_hostname='localhost', 1105 timeout=support.LOOPBACK_TIMEOUT) 1106 smtp.ehlo('foo') 1107 smtp.user, smtp.password = sim_auth[0], sim_auth[1] 1108 method = 'auth_' + mechanism.lower().replace('-', '_') 1109 resp = smtp.auth(mechanism, getattr(smtp, method)) 1110 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1111 smtp.close() 1112 1113 def test_quit_resets_greeting(self): 1114 smtp = smtplib.SMTP(HOST, self.port, 1115 local_hostname='localhost', 1116 timeout=support.LOOPBACK_TIMEOUT) 1117 code, message = smtp.ehlo() 1118 self.assertEqual(code, 250) 1119 self.assertIn('size', smtp.esmtp_features) 1120 smtp.quit() 1121 self.assertNotIn('size', smtp.esmtp_features) 1122 smtp.connect(HOST, self.port) 1123 self.assertNotIn('size', smtp.esmtp_features) 1124 smtp.ehlo_or_helo_if_needed() 1125 self.assertIn('size', smtp.esmtp_features) 1126 smtp.quit() 1127 1128 def test_with_statement(self): 1129 with smtplib.SMTP(HOST, self.port) as smtp: 1130 code, message = smtp.noop() 1131 self.assertEqual(code, 250) 1132 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1133 with smtplib.SMTP(HOST, self.port) as smtp: 1134 smtp.close() 1135 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1136 1137 def test_with_statement_QUIT_failure(self): 1138 with self.assertRaises(smtplib.SMTPResponseException) as error: 1139 with smtplib.SMTP(HOST, self.port) as smtp: 1140 smtp.noop() 1141 self.serv._SMTPchannel.quit_response = '421 QUIT FAILED' 1142 self.assertEqual(error.exception.smtp_code, 421) 1143 self.assertEqual(error.exception.smtp_error, b'QUIT FAILED') 1144 1145 #TODO: add tests for correct AUTH method fallback now that the 1146 #test infrastructure can support it. 1147 1148 # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception 1149 def test__rest_from_mail_cmd(self): 1150 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1151 timeout=support.LOOPBACK_TIMEOUT) 1152 smtp.noop() 1153 self.serv._SMTPchannel.mail_response = '451 Requested action aborted' 1154 self.serv._SMTPchannel.disconnect = True 1155 with self.assertRaises(smtplib.SMTPSenderRefused): 1156 smtp.sendmail('John', 'Sally', 'test message') 1157 self.assertIsNone(smtp.sock) 1158 1159 # Issue 5713: make sure close, not rset, is called if we get a 421 error 1160 def test_421_from_mail_cmd(self): 1161 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1162 timeout=support.LOOPBACK_TIMEOUT) 1163 smtp.noop() 1164 self.serv._SMTPchannel.mail_response = '421 closing connection' 1165 with self.assertRaises(smtplib.SMTPSenderRefused): 1166 smtp.sendmail('John', 'Sally', 'test message') 1167 self.assertIsNone(smtp.sock) 1168 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1169 1170 def test_421_from_rcpt_cmd(self): 1171 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1172 timeout=support.LOOPBACK_TIMEOUT) 1173 smtp.noop() 1174 self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing'] 1175 with self.assertRaises(smtplib.SMTPRecipientsRefused) as r: 1176 smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message') 1177 self.assertIsNone(smtp.sock) 1178 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1179 self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')}) 1180 1181 def test_421_from_data_cmd(self): 1182 class MySimSMTPChannel(SimSMTPChannel): 1183 def found_terminator(self): 1184 if self.smtp_state == self.DATA: 1185 self.push('421 closing') 1186 else: 1187 super().found_terminator() 1188 self.serv.channel_class = MySimSMTPChannel 1189 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1190 timeout=support.LOOPBACK_TIMEOUT) 1191 smtp.noop() 1192 with self.assertRaises(smtplib.SMTPDataError): 1193 smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message') 1194 self.assertIsNone(smtp.sock) 1195 self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0) 1196 1197 def test_smtputf8_NotSupportedError_if_no_server_support(self): 1198 smtp = smtplib.SMTP( 1199 HOST, self.port, local_hostname='localhost', 1200 timeout=support.LOOPBACK_TIMEOUT) 1201 self.addCleanup(smtp.close) 1202 smtp.ehlo() 1203 self.assertTrue(smtp.does_esmtp) 1204 self.assertFalse(smtp.has_extn('smtputf8')) 1205 self.assertRaises( 1206 smtplib.SMTPNotSupportedError, 1207 smtp.sendmail, 1208 'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1209 self.assertRaises( 1210 smtplib.SMTPNotSupportedError, 1211 smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8']) 1212 1213 def test_send_unicode_without_SMTPUTF8(self): 1214 smtp = smtplib.SMTP( 1215 HOST, self.port, local_hostname='localhost', 1216 timeout=support.LOOPBACK_TIMEOUT) 1217 self.addCleanup(smtp.close) 1218 self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '') 1219 self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice') 1220 1221 def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self): 1222 # This test is located here and not in the SMTPUTF8SimTests 1223 # class because it needs a "regular" SMTP server to work 1224 msg = EmailMessage() 1225 msg['From'] = "Páolo <főo@bar.com>" 1226 msg['To'] = 'Dinsdale' 1227 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1228 smtp = smtplib.SMTP( 1229 HOST, self.port, local_hostname='localhost', 1230 timeout=support.LOOPBACK_TIMEOUT) 1231 self.addCleanup(smtp.close) 1232 with self.assertRaises(smtplib.SMTPNotSupportedError): 1233 smtp.send_message(msg) 1234 1235 def test_name_field_not_included_in_envelop_addresses(self): 1236 smtp = smtplib.SMTP( 1237 HOST, self.port, local_hostname='localhost', 1238 timeout=support.LOOPBACK_TIMEOUT) 1239 self.addCleanup(smtp.close) 1240 1241 message = EmailMessage() 1242 message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com')) 1243 message['To'] = email.utils.formataddr(('René', 'rene@example.com')) 1244 1245 self.assertDictEqual(smtp.send_message(message), {}) 1246 1247 self.assertEqual(self.serv._addresses['from'], 'michael@example.com') 1248 self.assertEqual(self.serv._addresses['tos'], ['rene@example.com']) 1249 1250 1251class SimSMTPUTF8Server(SimSMTPServer): 1252 1253 def __init__(self, *args, **kw): 1254 # The base SMTP server turns these on automatically, but our test 1255 # server is set up to munge the EHLO response, so we need to provide 1256 # them as well. And yes, the call is to SMTPServer not SimSMTPServer. 1257 self._extra_features = ['SMTPUTF8', '8BITMIME'] 1258 smtpd.SMTPServer.__init__(self, *args, **kw) 1259 1260 def handle_accepted(self, conn, addr): 1261 self._SMTPchannel = self.channel_class( 1262 self._extra_features, self, conn, addr, 1263 decode_data=self._decode_data, 1264 enable_SMTPUTF8=self.enable_SMTPUTF8, 1265 ) 1266 1267 def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None, 1268 rcpt_options=None): 1269 self.last_peer = peer 1270 self.last_mailfrom = mailfrom 1271 self.last_rcpttos = rcpttos 1272 self.last_message = data 1273 self.last_mail_options = mail_options 1274 self.last_rcpt_options = rcpt_options 1275 1276 1277class SMTPUTF8SimTests(unittest.TestCase): 1278 1279 maxDiff = None 1280 1281 def setUp(self): 1282 self.thread_key = threading_setup() 1283 self.real_getfqdn = socket.getfqdn 1284 socket.getfqdn = mock_socket.getfqdn 1285 self.serv_evt = threading.Event() 1286 self.client_evt = threading.Event() 1287 # Pick a random unused port by passing 0 for the port number 1288 self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1), 1289 decode_data=False, 1290 enable_SMTPUTF8=True) 1291 # Keep a note of what port was assigned 1292 self.port = self.serv.socket.getsockname()[1] 1293 serv_args = (self.serv, self.serv_evt, self.client_evt) 1294 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1295 self.thread.start() 1296 1297 # wait until server thread has assigned a port number 1298 self.serv_evt.wait() 1299 self.serv_evt.clear() 1300 1301 def tearDown(self): 1302 socket.getfqdn = self.real_getfqdn 1303 # indicate that the client is finished 1304 self.client_evt.set() 1305 # wait for the server thread to terminate 1306 self.serv_evt.wait() 1307 join_thread(self.thread) 1308 del self.thread 1309 self.doCleanups() 1310 threading_cleanup(*self.thread_key) 1311 1312 def test_test_server_supports_extensions(self): 1313 smtp = smtplib.SMTP( 1314 HOST, self.port, local_hostname='localhost', 1315 timeout=support.LOOPBACK_TIMEOUT) 1316 self.addCleanup(smtp.close) 1317 smtp.ehlo() 1318 self.assertTrue(smtp.does_esmtp) 1319 self.assertTrue(smtp.has_extn('smtputf8')) 1320 1321 def test_send_unicode_with_SMTPUTF8_via_sendmail(self): 1322 m = '¡a test message containing unicode!'.encode('utf-8') 1323 smtp = smtplib.SMTP( 1324 HOST, self.port, local_hostname='localhost', 1325 timeout=support.LOOPBACK_TIMEOUT) 1326 self.addCleanup(smtp.close) 1327 smtp.sendmail('Jőhn', 'Sálly', m, 1328 mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1329 self.assertEqual(self.serv.last_mailfrom, 'Jőhn') 1330 self.assertEqual(self.serv.last_rcpttos, ['Sálly']) 1331 self.assertEqual(self.serv.last_message, m) 1332 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1333 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1334 self.assertEqual(self.serv.last_rcpt_options, []) 1335 1336 def test_send_unicode_with_SMTPUTF8_via_low_level_API(self): 1337 m = '¡a test message containing unicode!'.encode('utf-8') 1338 smtp = smtplib.SMTP( 1339 HOST, self.port, local_hostname='localhost', 1340 timeout=support.LOOPBACK_TIMEOUT) 1341 self.addCleanup(smtp.close) 1342 smtp.ehlo() 1343 self.assertEqual( 1344 smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']), 1345 (250, b'OK')) 1346 self.assertEqual(smtp.rcpt('János'), (250, b'OK')) 1347 self.assertEqual(smtp.data(m), (250, b'OK')) 1348 self.assertEqual(self.serv.last_mailfrom, 'Jő') 1349 self.assertEqual(self.serv.last_rcpttos, ['János']) 1350 self.assertEqual(self.serv.last_message, m) 1351 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1352 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1353 self.assertEqual(self.serv.last_rcpt_options, []) 1354 1355 def test_send_message_uses_smtputf8_if_addrs_non_ascii(self): 1356 msg = EmailMessage() 1357 msg['From'] = "Páolo <főo@bar.com>" 1358 msg['To'] = 'Dinsdale' 1359 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1360 # XXX I don't know why I need two \n's here, but this is an existing 1361 # bug (if it is one) and not a problem with the new functionality. 1362 msg.set_content("oh là là, know what I mean, know what I mean?\n\n") 1363 # XXX smtpd converts received /r/n to /n, so we can't easily test that 1364 # we are successfully sending /r/n :(. 1365 expected = textwrap.dedent("""\ 1366 From: Páolo <főo@bar.com> 1367 To: Dinsdale 1368 Subject: Nudge nudge, wink, wink \u1F609 1369 Content-Type: text/plain; charset="utf-8" 1370 Content-Transfer-Encoding: 8bit 1371 MIME-Version: 1.0 1372 1373 oh là là, know what I mean, know what I mean? 1374 """) 1375 smtp = smtplib.SMTP( 1376 HOST, self.port, local_hostname='localhost', 1377 timeout=support.LOOPBACK_TIMEOUT) 1378 self.addCleanup(smtp.close) 1379 self.assertEqual(smtp.send_message(msg), {}) 1380 self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com') 1381 self.assertEqual(self.serv.last_rcpttos, ['Dinsdale']) 1382 self.assertEqual(self.serv.last_message.decode(), expected) 1383 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1384 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1385 self.assertEqual(self.serv.last_rcpt_options, []) 1386 1387 1388EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='') 1389 1390class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel): 1391 def smtp_AUTH(self, arg): 1392 # RFC 4954's AUTH command allows for an optional initial-response. 1393 # Not all AUTH methods support this; some require a challenge. AUTH 1394 # PLAIN does those, so test that here. See issue #15014. 1395 args = arg.split() 1396 if args[0].lower() == 'plain': 1397 if len(args) == 2: 1398 # AUTH PLAIN <initial-response> with the response base 64 1399 # encoded. Hard code the expected response for the test. 1400 if args[1] == EXPECTED_RESPONSE: 1401 self.push('235 Ok') 1402 return 1403 self.push('571 Bad authentication') 1404 1405class SimSMTPAUTHInitialResponseServer(SimSMTPServer): 1406 channel_class = SimSMTPAUTHInitialResponseChannel 1407 1408 1409class SMTPAUTHInitialResponseSimTests(unittest.TestCase): 1410 def setUp(self): 1411 self.thread_key = threading_setup() 1412 self.real_getfqdn = socket.getfqdn 1413 socket.getfqdn = mock_socket.getfqdn 1414 self.serv_evt = threading.Event() 1415 self.client_evt = threading.Event() 1416 # Pick a random unused port by passing 0 for the port number 1417 self.serv = SimSMTPAUTHInitialResponseServer( 1418 (HOST, 0), ('nowhere', -1), decode_data=True) 1419 # Keep a note of what port was assigned 1420 self.port = self.serv.socket.getsockname()[1] 1421 serv_args = (self.serv, self.serv_evt, self.client_evt) 1422 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1423 self.thread.start() 1424 1425 # wait until server thread has assigned a port number 1426 self.serv_evt.wait() 1427 self.serv_evt.clear() 1428 1429 def tearDown(self): 1430 socket.getfqdn = self.real_getfqdn 1431 # indicate that the client is finished 1432 self.client_evt.set() 1433 # wait for the server thread to terminate 1434 self.serv_evt.wait() 1435 join_thread(self.thread) 1436 del self.thread 1437 self.doCleanups() 1438 threading_cleanup(*self.thread_key) 1439 1440 def testAUTH_PLAIN_initial_response_login(self): 1441 self.serv.add_feature('AUTH PLAIN') 1442 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1443 timeout=support.LOOPBACK_TIMEOUT) 1444 smtp.login('psu', 'doesnotexist') 1445 smtp.close() 1446 1447 def testAUTH_PLAIN_initial_response_auth(self): 1448 self.serv.add_feature('AUTH PLAIN') 1449 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1450 timeout=support.LOOPBACK_TIMEOUT) 1451 smtp.user = 'psu' 1452 smtp.password = 'doesnotexist' 1453 code, response = smtp.auth('plain', smtp.auth_plain) 1454 smtp.close() 1455 self.assertEqual(code, 235) 1456 1457 1458if __name__ == '__main__': 1459 unittest.main() 1460