1import asyncore 2import base64 3import email.mime.text 4from email.message import EmailMessage 5from email.base64mime import body_encode as encode_base64 6import email.utils 7import hashlib 8import hmac 9import socket 10import smtpd 11import smtplib 12import io 13import re 14import sys 15import time 16import select 17import errno 18import textwrap 19import threading 20 21import unittest 22from test import support, mock_socket 23from test.support import hashlib_helper 24from test.support import socket_helper 25from test.support import threading_setup, threading_cleanup, join_thread 26from unittest.mock import Mock 27 28HOST = socket_helper.HOST 29 30if sys.platform == 'darwin': 31 # select.poll returns a select.POLLHUP at the end of the tests 32 # on darwin, so just ignore it 33 def handle_expt(self): 34 pass 35 smtpd.SMTPChannel.handle_expt = handle_expt 36 37 38def server(evt, buf, serv): 39 serv.listen() 40 evt.set() 41 try: 42 conn, addr = serv.accept() 43 except socket.timeout: 44 pass 45 else: 46 n = 500 47 while buf and n > 0: 48 r, w, e = select.select([], [conn], []) 49 if w: 50 sent = conn.send(buf) 51 buf = buf[sent:] 52 53 n -= 1 54 55 conn.close() 56 finally: 57 serv.close() 58 evt.set() 59 60class GeneralTests: 61 62 def setUp(self): 63 smtplib.socket = mock_socket 64 self.port = 25 65 66 def tearDown(self): 67 smtplib.socket = socket 68 69 # This method is no longer used but is retained for backward compatibility, 70 # so test to make sure it still works. 71 def testQuoteData(self): 72 teststr = "abc\n.jkl\rfoo\r\n..blue" 73 expected = "abc\r\n..jkl\r\nfoo\r\n...blue" 74 self.assertEqual(expected, smtplib.quotedata(teststr)) 75 76 def testBasic1(self): 77 mock_socket.reply_with(b"220 Hola mundo") 78 # connects 79 client = self.client(HOST, self.port) 80 client.close() 81 82 def testSourceAddress(self): 83 mock_socket.reply_with(b"220 Hola mundo") 84 # connects 85 client = self.client(HOST, self.port, 86 source_address=('127.0.0.1',19876)) 87 self.assertEqual(client.source_address, ('127.0.0.1', 19876)) 88 client.close() 89 90 def testBasic2(self): 91 mock_socket.reply_with(b"220 Hola mundo") 92 # connects, include port in host name 93 client = self.client("%s:%s" % (HOST, self.port)) 94 client.close() 95 96 def testLocalHostName(self): 97 mock_socket.reply_with(b"220 Hola mundo") 98 # check that supplied local_hostname is used 99 client = self.client(HOST, self.port, local_hostname="testhost") 100 self.assertEqual(client.local_hostname, "testhost") 101 client.close() 102 103 def testTimeoutDefault(self): 104 mock_socket.reply_with(b"220 Hola mundo") 105 self.assertIsNone(mock_socket.getdefaulttimeout()) 106 mock_socket.setdefaulttimeout(30) 107 self.assertEqual(mock_socket.getdefaulttimeout(), 30) 108 try: 109 client = self.client(HOST, self.port) 110 finally: 111 mock_socket.setdefaulttimeout(None) 112 self.assertEqual(client.sock.gettimeout(), 30) 113 client.close() 114 115 def testTimeoutNone(self): 116 mock_socket.reply_with(b"220 Hola mundo") 117 self.assertIsNone(socket.getdefaulttimeout()) 118 socket.setdefaulttimeout(30) 119 try: 120 client = self.client(HOST, self.port, timeout=None) 121 finally: 122 socket.setdefaulttimeout(None) 123 self.assertIsNone(client.sock.gettimeout()) 124 client.close() 125 126 def testTimeoutZero(self): 127 mock_socket.reply_with(b"220 Hola mundo") 128 with self.assertRaises(ValueError): 129 self.client(HOST, self.port, timeout=0) 130 131 def testTimeoutValue(self): 132 mock_socket.reply_with(b"220 Hola mundo") 133 client = self.client(HOST, self.port, timeout=30) 134 self.assertEqual(client.sock.gettimeout(), 30) 135 client.close() 136 137 def test_debuglevel(self): 138 mock_socket.reply_with(b"220 Hello world") 139 client = self.client() 140 client.set_debuglevel(1) 141 with support.captured_stderr() as stderr: 142 client.connect(HOST, self.port) 143 client.close() 144 expected = re.compile(r"^connect:", re.MULTILINE) 145 self.assertRegex(stderr.getvalue(), expected) 146 147 def test_debuglevel_2(self): 148 mock_socket.reply_with(b"220 Hello world") 149 client = self.client() 150 client.set_debuglevel(2) 151 with support.captured_stderr() as stderr: 152 client.connect(HOST, self.port) 153 client.close() 154 expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ", 155 re.MULTILINE) 156 self.assertRegex(stderr.getvalue(), expected) 157 158 159class SMTPGeneralTests(GeneralTests, unittest.TestCase): 160 161 client = smtplib.SMTP 162 163 164class LMTPGeneralTests(GeneralTests, unittest.TestCase): 165 166 client = smtplib.LMTP 167 168 def testTimeoutZero(self): 169 super().testTimeoutZero() 170 local_host = '/some/local/lmtp/delivery/program' 171 with self.assertRaises(ValueError): 172 self.client(local_host, timeout=0) 173 174# Test server thread using the specified SMTP server class 175def debugging_server(serv, serv_evt, client_evt): 176 serv_evt.set() 177 178 try: 179 if hasattr(select, 'poll'): 180 poll_fun = asyncore.poll2 181 else: 182 poll_fun = asyncore.poll 183 184 n = 1000 185 while asyncore.socket_map and n > 0: 186 poll_fun(0.01, asyncore.socket_map) 187 188 # when the client conversation is finished, it will 189 # set client_evt, and it's then ok to kill the server 190 if client_evt.is_set(): 191 serv.close() 192 break 193 194 n -= 1 195 196 except socket.timeout: 197 pass 198 finally: 199 if not client_evt.is_set(): 200 # allow some time for the client to read the result 201 time.sleep(0.5) 202 serv.close() 203 asyncore.close_all() 204 serv_evt.set() 205 206MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n' 207MSG_END = '------------ END MESSAGE ------------\n' 208 209# NOTE: Some SMTP objects in the tests below are created with a non-default 210# local_hostname argument to the constructor, since (on some systems) the FQDN 211# lookup caused by the default local_hostname sometimes takes so long that the 212# test server times out, causing the test to fail. 213 214# Test behavior of smtpd.DebuggingServer 215class DebuggingServerTests(unittest.TestCase): 216 217 maxDiff = None 218 219 def setUp(self): 220 self.thread_key = threading_setup() 221 self.real_getfqdn = socket.getfqdn 222 socket.getfqdn = mock_socket.getfqdn 223 # temporarily replace sys.stdout to capture DebuggingServer output 224 self.old_stdout = sys.stdout 225 self.output = io.StringIO() 226 sys.stdout = self.output 227 228 self.serv_evt = threading.Event() 229 self.client_evt = threading.Event() 230 # Capture SMTPChannel debug output 231 self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM 232 smtpd.DEBUGSTREAM = io.StringIO() 233 # Pick a random unused port by passing 0 for the port number 234 self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1), 235 decode_data=True) 236 # Keep a note of what server host and port were assigned 237 self.host, self.port = self.serv.socket.getsockname()[:2] 238 serv_args = (self.serv, self.serv_evt, self.client_evt) 239 self.thread = threading.Thread(target=debugging_server, args=serv_args) 240 self.thread.start() 241 242 # wait until server thread has assigned a port number 243 self.serv_evt.wait() 244 self.serv_evt.clear() 245 246 def tearDown(self): 247 socket.getfqdn = self.real_getfqdn 248 # indicate that the client is finished 249 self.client_evt.set() 250 # wait for the server thread to terminate 251 self.serv_evt.wait() 252 join_thread(self.thread) 253 # restore sys.stdout 254 sys.stdout = self.old_stdout 255 # restore DEBUGSTREAM 256 smtpd.DEBUGSTREAM.close() 257 smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM 258 del self.thread 259 self.doCleanups() 260 threading_cleanup(*self.thread_key) 261 262 def get_output_without_xpeer(self): 263 test_output = self.output.getvalue() 264 return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2', 265 test_output, flags=re.MULTILINE|re.DOTALL) 266 267 def testBasic(self): 268 # connect 269 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 270 timeout=support.LOOPBACK_TIMEOUT) 271 smtp.quit() 272 273 def testSourceAddress(self): 274 # connect 275 src_port = socket_helper.find_unused_port() 276 try: 277 smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost', 278 timeout=support.LOOPBACK_TIMEOUT, 279 source_address=(self.host, src_port)) 280 self.addCleanup(smtp.close) 281 self.assertEqual(smtp.source_address, (self.host, src_port)) 282 self.assertEqual(smtp.local_hostname, 'localhost') 283 smtp.quit() 284 except OSError as e: 285 if e.errno == errno.EADDRINUSE: 286 self.skipTest("couldn't bind to source port %d" % src_port) 287 raise 288 289 def testNOOP(self): 290 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 291 timeout=support.LOOPBACK_TIMEOUT) 292 self.addCleanup(smtp.close) 293 expected = (250, b'OK') 294 self.assertEqual(smtp.noop(), expected) 295 smtp.quit() 296 297 def testRSET(self): 298 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 299 timeout=support.LOOPBACK_TIMEOUT) 300 self.addCleanup(smtp.close) 301 expected = (250, b'OK') 302 self.assertEqual(smtp.rset(), expected) 303 smtp.quit() 304 305 def testELHO(self): 306 # EHLO isn't implemented in DebuggingServer 307 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 308 timeout=support.LOOPBACK_TIMEOUT) 309 self.addCleanup(smtp.close) 310 expected = (250, b'\nSIZE 33554432\nHELP') 311 self.assertEqual(smtp.ehlo(), expected) 312 smtp.quit() 313 314 def testEXPNNotImplemented(self): 315 # EXPN isn't implemented in DebuggingServer 316 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 317 timeout=support.LOOPBACK_TIMEOUT) 318 self.addCleanup(smtp.close) 319 expected = (502, b'EXPN not implemented') 320 smtp.putcmd('EXPN') 321 self.assertEqual(smtp.getreply(), expected) 322 smtp.quit() 323 324 def testVRFY(self): 325 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 326 timeout=support.LOOPBACK_TIMEOUT) 327 self.addCleanup(smtp.close) 328 expected = (252, b'Cannot VRFY user, but will accept message ' + \ 329 b'and attempt delivery') 330 self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected) 331 self.assertEqual(smtp.verify('nobody@nowhere.com'), expected) 332 smtp.quit() 333 334 def testSecondHELO(self): 335 # check that a second HELO returns a message that it's a duplicate 336 # (this behavior is specific to smtpd.SMTPChannel) 337 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 338 timeout=support.LOOPBACK_TIMEOUT) 339 self.addCleanup(smtp.close) 340 smtp.helo() 341 expected = (503, b'Duplicate HELO/EHLO') 342 self.assertEqual(smtp.helo(), expected) 343 smtp.quit() 344 345 def testHELP(self): 346 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 347 timeout=support.LOOPBACK_TIMEOUT) 348 self.addCleanup(smtp.close) 349 self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \ 350 b'RCPT DATA RSET NOOP QUIT VRFY') 351 smtp.quit() 352 353 def testSend(self): 354 # connect and send mail 355 m = 'A test message' 356 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 357 timeout=support.LOOPBACK_TIMEOUT) 358 self.addCleanup(smtp.close) 359 smtp.sendmail('John', 'Sally', m) 360 # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor 361 # in asyncore. This sleep might help, but should really be fixed 362 # properly by using an Event variable. 363 time.sleep(0.01) 364 smtp.quit() 365 366 self.client_evt.set() 367 self.serv_evt.wait() 368 self.output.flush() 369 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 370 self.assertEqual(self.output.getvalue(), mexpect) 371 372 def testSendBinary(self): 373 m = b'A test message' 374 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 375 timeout=support.LOOPBACK_TIMEOUT) 376 self.addCleanup(smtp.close) 377 smtp.sendmail('John', 'Sally', m) 378 # XXX (see comment in testSend) 379 time.sleep(0.01) 380 smtp.quit() 381 382 self.client_evt.set() 383 self.serv_evt.wait() 384 self.output.flush() 385 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END) 386 self.assertEqual(self.output.getvalue(), mexpect) 387 388 def testSendNeedingDotQuote(self): 389 # Issue 12283 390 m = '.A test\n.mes.sage.' 391 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 392 timeout=support.LOOPBACK_TIMEOUT) 393 self.addCleanup(smtp.close) 394 smtp.sendmail('John', 'Sally', m) 395 # XXX (see comment in testSend) 396 time.sleep(0.01) 397 smtp.quit() 398 399 self.client_evt.set() 400 self.serv_evt.wait() 401 self.output.flush() 402 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 403 self.assertEqual(self.output.getvalue(), mexpect) 404 405 def testSendNullSender(self): 406 m = 'A test message' 407 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 408 timeout=support.LOOPBACK_TIMEOUT) 409 self.addCleanup(smtp.close) 410 smtp.sendmail('<>', 'Sally', m) 411 # XXX (see comment in testSend) 412 time.sleep(0.01) 413 smtp.quit() 414 415 self.client_evt.set() 416 self.serv_evt.wait() 417 self.output.flush() 418 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 419 self.assertEqual(self.output.getvalue(), mexpect) 420 debugout = smtpd.DEBUGSTREAM.getvalue() 421 sender = re.compile("^sender: <>$", re.MULTILINE) 422 self.assertRegex(debugout, sender) 423 424 def testSendMessage(self): 425 m = email.mime.text.MIMEText('A test message') 426 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 427 timeout=support.LOOPBACK_TIMEOUT) 428 self.addCleanup(smtp.close) 429 smtp.send_message(m, from_addr='John', to_addrs='Sally') 430 # XXX (see comment in testSend) 431 time.sleep(0.01) 432 smtp.quit() 433 434 self.client_evt.set() 435 self.serv_evt.wait() 436 self.output.flush() 437 # Remove the X-Peer header that DebuggingServer adds as figuring out 438 # exactly what IP address format is put there is not easy (and 439 # irrelevant to our test). Typically 127.0.0.1 or ::1, but it is 440 # not always the same as socket.gethostbyname(HOST). :( 441 test_output = self.get_output_without_xpeer() 442 del m['X-Peer'] 443 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 444 self.assertEqual(test_output, mexpect) 445 446 def testSendMessageWithAddresses(self): 447 m = email.mime.text.MIMEText('A test message') 448 m['From'] = 'foo@bar.com' 449 m['To'] = 'John' 450 m['CC'] = 'Sally, Fred' 451 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 452 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 453 timeout=support.LOOPBACK_TIMEOUT) 454 self.addCleanup(smtp.close) 455 smtp.send_message(m) 456 # XXX (see comment in testSend) 457 time.sleep(0.01) 458 smtp.quit() 459 # make sure the Bcc header is still in the message. 460 self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" ' 461 '<warped@silly.walks.com>') 462 463 self.client_evt.set() 464 self.serv_evt.wait() 465 self.output.flush() 466 # Remove the X-Peer header that DebuggingServer adds. 467 test_output = self.get_output_without_xpeer() 468 del m['X-Peer'] 469 # The Bcc header should not be transmitted. 470 del m['Bcc'] 471 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 472 self.assertEqual(test_output, mexpect) 473 debugout = smtpd.DEBUGSTREAM.getvalue() 474 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 475 self.assertRegex(debugout, sender) 476 for addr in ('John', 'Sally', 'Fred', 'root@localhost', 477 'warped@silly.walks.com'): 478 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 479 re.MULTILINE) 480 self.assertRegex(debugout, to_addr) 481 482 def testSendMessageWithSomeAddresses(self): 483 # Make sure nothing breaks if not all of the three 'to' headers exist 484 m = email.mime.text.MIMEText('A test message') 485 m['From'] = 'foo@bar.com' 486 m['To'] = 'John, Dinsdale' 487 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 488 timeout=support.LOOPBACK_TIMEOUT) 489 self.addCleanup(smtp.close) 490 smtp.send_message(m) 491 # XXX (see comment in testSend) 492 time.sleep(0.01) 493 smtp.quit() 494 495 self.client_evt.set() 496 self.serv_evt.wait() 497 self.output.flush() 498 # Remove the X-Peer header that DebuggingServer adds. 499 test_output = self.get_output_without_xpeer() 500 del m['X-Peer'] 501 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 502 self.assertEqual(test_output, mexpect) 503 debugout = smtpd.DEBUGSTREAM.getvalue() 504 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 505 self.assertRegex(debugout, sender) 506 for addr in ('John', 'Dinsdale'): 507 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 508 re.MULTILINE) 509 self.assertRegex(debugout, to_addr) 510 511 def testSendMessageWithSpecifiedAddresses(self): 512 # Make sure addresses specified in call override those in message. 513 m = email.mime.text.MIMEText('A test message') 514 m['From'] = 'foo@bar.com' 515 m['To'] = 'John, Dinsdale' 516 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 517 timeout=support.LOOPBACK_TIMEOUT) 518 self.addCleanup(smtp.close) 519 smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net') 520 # XXX (see comment in testSend) 521 time.sleep(0.01) 522 smtp.quit() 523 524 self.client_evt.set() 525 self.serv_evt.wait() 526 self.output.flush() 527 # Remove the X-Peer header that DebuggingServer adds. 528 test_output = self.get_output_without_xpeer() 529 del m['X-Peer'] 530 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 531 self.assertEqual(test_output, mexpect) 532 debugout = smtpd.DEBUGSTREAM.getvalue() 533 sender = re.compile("^sender: joe@example.com$", re.MULTILINE) 534 self.assertRegex(debugout, sender) 535 for addr in ('John', 'Dinsdale'): 536 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 537 re.MULTILINE) 538 self.assertNotRegex(debugout, to_addr) 539 recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE) 540 self.assertRegex(debugout, recip) 541 542 def testSendMessageWithMultipleFrom(self): 543 # Sender overrides To 544 m = email.mime.text.MIMEText('A test message') 545 m['From'] = 'Bernard, Bianca' 546 m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com' 547 m['To'] = 'John, Dinsdale' 548 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 549 timeout=support.LOOPBACK_TIMEOUT) 550 self.addCleanup(smtp.close) 551 smtp.send_message(m) 552 # XXX (see comment in testSend) 553 time.sleep(0.01) 554 smtp.quit() 555 556 self.client_evt.set() 557 self.serv_evt.wait() 558 self.output.flush() 559 # Remove the X-Peer header that DebuggingServer adds. 560 test_output = self.get_output_without_xpeer() 561 del m['X-Peer'] 562 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 563 self.assertEqual(test_output, mexpect) 564 debugout = smtpd.DEBUGSTREAM.getvalue() 565 sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE) 566 self.assertRegex(debugout, sender) 567 for addr in ('John', 'Dinsdale'): 568 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 569 re.MULTILINE) 570 self.assertRegex(debugout, to_addr) 571 572 def testSendMessageResent(self): 573 m = email.mime.text.MIMEText('A test message') 574 m['From'] = 'foo@bar.com' 575 m['To'] = 'John' 576 m['CC'] = 'Sally, Fred' 577 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 578 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 579 m['Resent-From'] = 'holy@grail.net' 580 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 581 m['Resent-Bcc'] = 'doe@losthope.net' 582 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 583 timeout=support.LOOPBACK_TIMEOUT) 584 self.addCleanup(smtp.close) 585 smtp.send_message(m) 586 # XXX (see comment in testSend) 587 time.sleep(0.01) 588 smtp.quit() 589 590 self.client_evt.set() 591 self.serv_evt.wait() 592 self.output.flush() 593 # The Resent-Bcc headers are deleted before serialization. 594 del m['Bcc'] 595 del m['Resent-Bcc'] 596 # Remove the X-Peer header that DebuggingServer adds. 597 test_output = self.get_output_without_xpeer() 598 del m['X-Peer'] 599 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 600 self.assertEqual(test_output, mexpect) 601 debugout = smtpd.DEBUGSTREAM.getvalue() 602 sender = re.compile("^sender: holy@grail.net$", re.MULTILINE) 603 self.assertRegex(debugout, sender) 604 for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'): 605 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 606 re.MULTILINE) 607 self.assertRegex(debugout, to_addr) 608 609 def testSendMessageMultipleResentRaises(self): 610 m = email.mime.text.MIMEText('A test message') 611 m['From'] = 'foo@bar.com' 612 m['To'] = 'John' 613 m['CC'] = 'Sally, Fred' 614 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 615 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 616 m['Resent-From'] = 'holy@grail.net' 617 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 618 m['Resent-Bcc'] = 'doe@losthope.net' 619 m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000' 620 m['Resent-To'] = 'holy@grail.net' 621 m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff' 622 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 623 timeout=support.LOOPBACK_TIMEOUT) 624 self.addCleanup(smtp.close) 625 with self.assertRaises(ValueError): 626 smtp.send_message(m) 627 smtp.close() 628 629class NonConnectingTests(unittest.TestCase): 630 631 def testNotConnected(self): 632 # Test various operations on an unconnected SMTP object that 633 # should raise exceptions (at present the attempt in SMTP.send 634 # to reference the nonexistent 'sock' attribute of the SMTP object 635 # causes an AttributeError) 636 smtp = smtplib.SMTP() 637 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo) 638 self.assertRaises(smtplib.SMTPServerDisconnected, 639 smtp.send, 'test msg') 640 641 def testNonnumericPort(self): 642 # check that non-numeric port raises OSError 643 self.assertRaises(OSError, smtplib.SMTP, 644 "localhost", "bogus") 645 self.assertRaises(OSError, smtplib.SMTP, 646 "localhost:bogus") 647 648 def testSockAttributeExists(self): 649 # check that sock attribute is present outside of a connect() call 650 # (regression test, the previous behavior raised an 651 # AttributeError: 'SMTP' object has no attribute 'sock') 652 with smtplib.SMTP() as smtp: 653 self.assertIsNone(smtp.sock) 654 655 656class DefaultArgumentsTests(unittest.TestCase): 657 658 def setUp(self): 659 self.msg = EmailMessage() 660 self.msg['From'] = 'Páolo <főo@bar.com>' 661 self.smtp = smtplib.SMTP() 662 self.smtp.ehlo = Mock(return_value=(200, 'OK')) 663 self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock() 664 665 def testSendMessage(self): 666 expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME') 667 self.smtp.send_message(self.msg) 668 self.smtp.send_message(self.msg) 669 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 670 expected_mail_options) 671 self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3], 672 expected_mail_options) 673 674 def testSendMessageWithMailOptions(self): 675 mail_options = ['STARTTLS'] 676 expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME') 677 self.smtp.send_message(self.msg, None, None, mail_options) 678 self.assertEqual(mail_options, ['STARTTLS']) 679 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 680 expected_mail_options) 681 682 683# test response of client to a non-successful HELO message 684class BadHELOServerTests(unittest.TestCase): 685 686 def setUp(self): 687 smtplib.socket = mock_socket 688 mock_socket.reply_with(b"199 no hello for you!") 689 self.old_stdout = sys.stdout 690 self.output = io.StringIO() 691 sys.stdout = self.output 692 self.port = 25 693 694 def tearDown(self): 695 smtplib.socket = socket 696 sys.stdout = self.old_stdout 697 698 def testFailingHELO(self): 699 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, 700 HOST, self.port, 'localhost', 3) 701 702 703class TooLongLineTests(unittest.TestCase): 704 respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n' 705 706 def setUp(self): 707 self.thread_key = threading_setup() 708 self.old_stdout = sys.stdout 709 self.output = io.StringIO() 710 sys.stdout = self.output 711 712 self.evt = threading.Event() 713 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 714 self.sock.settimeout(15) 715 self.port = socket_helper.bind_port(self.sock) 716 servargs = (self.evt, self.respdata, self.sock) 717 self.thread = threading.Thread(target=server, args=servargs) 718 self.thread.start() 719 self.evt.wait() 720 self.evt.clear() 721 722 def tearDown(self): 723 self.evt.wait() 724 sys.stdout = self.old_stdout 725 join_thread(self.thread) 726 del self.thread 727 self.doCleanups() 728 threading_cleanup(*self.thread_key) 729 730 def testLineTooLong(self): 731 self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP, 732 HOST, self.port, 'localhost', 3) 733 734 735sim_users = {'Mr.A@somewhere.com':'John A', 736 'Ms.B@xn--fo-fka.com':'Sally B', 737 'Mrs.C@somewhereesle.com':'Ruth C', 738 } 739 740sim_auth = ('Mr.A@somewhere.com', 'somepassword') 741sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn' 742 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=') 743sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'], 744 'list-2':['Ms.B@xn--fo-fka.com',], 745 } 746 747# Simulated SMTP channel & server 748class ResponseException(Exception): pass 749class SimSMTPChannel(smtpd.SMTPChannel): 750 751 quit_response = None 752 mail_response = None 753 rcpt_response = None 754 data_response = None 755 rcpt_count = 0 756 rset_count = 0 757 disconnect = 0 758 AUTH = 99 # Add protocol state to enable auth testing. 759 authenticated_user = None 760 761 def __init__(self, extra_features, *args, **kw): 762 self._extrafeatures = ''.join( 763 [ "250-{0}\r\n".format(x) for x in extra_features ]) 764 super(SimSMTPChannel, self).__init__(*args, **kw) 765 766 # AUTH related stuff. It would be nice if support for this were in smtpd. 767 def found_terminator(self): 768 if self.smtp_state == self.AUTH: 769 line = self._emptystring.join(self.received_lines) 770 print('Data:', repr(line), file=smtpd.DEBUGSTREAM) 771 self.received_lines = [] 772 try: 773 self.auth_object(line) 774 except ResponseException as e: 775 self.smtp_state = self.COMMAND 776 self.push('%s %s' % (e.smtp_code, e.smtp_error)) 777 return 778 super().found_terminator() 779 780 781 def smtp_AUTH(self, arg): 782 if not self.seen_greeting: 783 self.push('503 Error: send EHLO first') 784 return 785 if not self.extended_smtp or 'AUTH' not in self._extrafeatures: 786 self.push('500 Error: command "AUTH" not recognized') 787 return 788 if self.authenticated_user is not None: 789 self.push( 790 '503 Bad sequence of commands: already authenticated') 791 return 792 args = arg.split() 793 if len(args) not in [1, 2]: 794 self.push('501 Syntax: AUTH <mechanism> [initial-response]') 795 return 796 auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_') 797 try: 798 self.auth_object = getattr(self, auth_object_name) 799 except AttributeError: 800 self.push('504 Command parameter not implemented: unsupported ' 801 ' authentication mechanism {!r}'.format(auth_object_name)) 802 return 803 self.smtp_state = self.AUTH 804 self.auth_object(args[1] if len(args) == 2 else None) 805 806 def _authenticated(self, user, valid): 807 if valid: 808 self.authenticated_user = user 809 self.push('235 Authentication Succeeded') 810 else: 811 self.push('535 Authentication credentials invalid') 812 self.smtp_state = self.COMMAND 813 814 def _decode_base64(self, string): 815 return base64.decodebytes(string.encode('ascii')).decode('utf-8') 816 817 def _auth_plain(self, arg=None): 818 if arg is None: 819 self.push('334 ') 820 else: 821 logpass = self._decode_base64(arg) 822 try: 823 *_, user, password = logpass.split('\0') 824 except ValueError as e: 825 self.push('535 Splitting response {!r} into user and password' 826 ' failed: {}'.format(logpass, e)) 827 return 828 self._authenticated(user, password == sim_auth[1]) 829 830 def _auth_login(self, arg=None): 831 if arg is None: 832 # base64 encoded 'Username:' 833 self.push('334 VXNlcm5hbWU6') 834 elif not hasattr(self, '_auth_login_user'): 835 self._auth_login_user = self._decode_base64(arg) 836 # base64 encoded 'Password:' 837 self.push('334 UGFzc3dvcmQ6') 838 else: 839 password = self._decode_base64(arg) 840 self._authenticated(self._auth_login_user, password == sim_auth[1]) 841 del self._auth_login_user 842 843 def _auth_cram_md5(self, arg=None): 844 if arg is None: 845 self.push('334 {}'.format(sim_cram_md5_challenge)) 846 else: 847 logpass = self._decode_base64(arg) 848 try: 849 user, hashed_pass = logpass.split() 850 except ValueError as e: 851 self.push('535 Splitting response {!r} into user and password ' 852 'failed: {}'.format(logpass, e)) 853 return False 854 valid_hashed_pass = hmac.HMAC( 855 sim_auth[1].encode('ascii'), 856 self._decode_base64(sim_cram_md5_challenge).encode('ascii'), 857 'md5').hexdigest() 858 self._authenticated(user, hashed_pass == valid_hashed_pass) 859 # end AUTH related stuff. 860 861 def smtp_EHLO(self, arg): 862 resp = ('250-testhost\r\n' 863 '250-EXPN\r\n' 864 '250-SIZE 20000000\r\n' 865 '250-STARTTLS\r\n' 866 '250-DELIVERBY\r\n') 867 resp = resp + self._extrafeatures + '250 HELP' 868 self.push(resp) 869 self.seen_greeting = arg 870 self.extended_smtp = True 871 872 def smtp_VRFY(self, arg): 873 # For max compatibility smtplib should be sending the raw address. 874 if arg in sim_users: 875 self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg))) 876 else: 877 self.push('550 No such user: %s' % arg) 878 879 def smtp_EXPN(self, arg): 880 list_name = arg.lower() 881 if list_name in sim_lists: 882 user_list = sim_lists[list_name] 883 for n, user_email in enumerate(user_list): 884 quoted_addr = smtplib.quoteaddr(user_email) 885 if n < len(user_list) - 1: 886 self.push('250-%s %s' % (sim_users[user_email], quoted_addr)) 887 else: 888 self.push('250 %s %s' % (sim_users[user_email], quoted_addr)) 889 else: 890 self.push('550 No access for you!') 891 892 def smtp_QUIT(self, arg): 893 if self.quit_response is None: 894 super(SimSMTPChannel, self).smtp_QUIT(arg) 895 else: 896 self.push(self.quit_response) 897 self.close_when_done() 898 899 def smtp_MAIL(self, arg): 900 if self.mail_response is None: 901 super().smtp_MAIL(arg) 902 else: 903 self.push(self.mail_response) 904 if self.disconnect: 905 self.close_when_done() 906 907 def smtp_RCPT(self, arg): 908 if self.rcpt_response is None: 909 super().smtp_RCPT(arg) 910 return 911 self.rcpt_count += 1 912 self.push(self.rcpt_response[self.rcpt_count-1]) 913 914 def smtp_RSET(self, arg): 915 self.rset_count += 1 916 super().smtp_RSET(arg) 917 918 def smtp_DATA(self, arg): 919 if self.data_response is None: 920 super().smtp_DATA(arg) 921 else: 922 self.push(self.data_response) 923 924 def handle_error(self): 925 raise 926 927 928class SimSMTPServer(smtpd.SMTPServer): 929 930 channel_class = SimSMTPChannel 931 932 def __init__(self, *args, **kw): 933 self._extra_features = [] 934 self._addresses = {} 935 smtpd.SMTPServer.__init__(self, *args, **kw) 936 937 def handle_accepted(self, conn, addr): 938 self._SMTPchannel = self.channel_class( 939 self._extra_features, self, conn, addr, 940 decode_data=self._decode_data) 941 942 def process_message(self, peer, mailfrom, rcpttos, data): 943 self._addresses['from'] = mailfrom 944 self._addresses['tos'] = rcpttos 945 946 def add_feature(self, feature): 947 self._extra_features.append(feature) 948 949 def handle_error(self): 950 raise 951 952 953# Test various SMTP & ESMTP commands/behaviors that require a simulated server 954# (i.e., something with more features than DebuggingServer) 955class SMTPSimTests(unittest.TestCase): 956 957 def setUp(self): 958 self.thread_key = threading_setup() 959 self.real_getfqdn = socket.getfqdn 960 socket.getfqdn = mock_socket.getfqdn 961 self.serv_evt = threading.Event() 962 self.client_evt = threading.Event() 963 # Pick a random unused port by passing 0 for the port number 964 self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True) 965 # Keep a note of what port was assigned 966 self.port = self.serv.socket.getsockname()[1] 967 serv_args = (self.serv, self.serv_evt, self.client_evt) 968 self.thread = threading.Thread(target=debugging_server, args=serv_args) 969 self.thread.start() 970 971 # wait until server thread has assigned a port number 972 self.serv_evt.wait() 973 self.serv_evt.clear() 974 975 def tearDown(self): 976 socket.getfqdn = self.real_getfqdn 977 # indicate that the client is finished 978 self.client_evt.set() 979 # wait for the server thread to terminate 980 self.serv_evt.wait() 981 join_thread(self.thread) 982 del self.thread 983 self.doCleanups() 984 threading_cleanup(*self.thread_key) 985 986 def testBasic(self): 987 # smoke test 988 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 989 timeout=support.LOOPBACK_TIMEOUT) 990 smtp.quit() 991 992 def testEHLO(self): 993 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 994 timeout=support.LOOPBACK_TIMEOUT) 995 996 # no features should be present before the EHLO 997 self.assertEqual(smtp.esmtp_features, {}) 998 999 # features expected from the test server 1000 expected_features = {'expn':'', 1001 'size': '20000000', 1002 'starttls': '', 1003 'deliverby': '', 1004 'help': '', 1005 } 1006 1007 smtp.ehlo() 1008 self.assertEqual(smtp.esmtp_features, expected_features) 1009 for k in expected_features: 1010 self.assertTrue(smtp.has_extn(k)) 1011 self.assertFalse(smtp.has_extn('unsupported-feature')) 1012 smtp.quit() 1013 1014 def testVRFY(self): 1015 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1016 timeout=support.LOOPBACK_TIMEOUT) 1017 1018 for addr_spec, name in sim_users.items(): 1019 expected_known = (250, bytes('%s %s' % 1020 (name, smtplib.quoteaddr(addr_spec)), 1021 "ascii")) 1022 self.assertEqual(smtp.vrfy(addr_spec), expected_known) 1023 1024 u = 'nobody@nowhere.com' 1025 expected_unknown = (550, ('No such user: %s' % u).encode('ascii')) 1026 self.assertEqual(smtp.vrfy(u), expected_unknown) 1027 smtp.quit() 1028 1029 def testEXPN(self): 1030 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1031 timeout=support.LOOPBACK_TIMEOUT) 1032 1033 for listname, members in sim_lists.items(): 1034 users = [] 1035 for m in members: 1036 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m))) 1037 expected_known = (250, bytes('\n'.join(users), "ascii")) 1038 self.assertEqual(smtp.expn(listname), expected_known) 1039 1040 u = 'PSU-Members-List' 1041 expected_unknown = (550, b'No access for you!') 1042 self.assertEqual(smtp.expn(u), expected_unknown) 1043 smtp.quit() 1044 1045 def testAUTH_PLAIN(self): 1046 self.serv.add_feature("AUTH PLAIN") 1047 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1048 timeout=support.LOOPBACK_TIMEOUT) 1049 resp = smtp.login(sim_auth[0], sim_auth[1]) 1050 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1051 smtp.close() 1052 1053 def testAUTH_LOGIN(self): 1054 self.serv.add_feature("AUTH LOGIN") 1055 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1056 timeout=support.LOOPBACK_TIMEOUT) 1057 resp = smtp.login(sim_auth[0], sim_auth[1]) 1058 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1059 smtp.close() 1060 1061 @hashlib_helper.requires_hashdigest('md5') 1062 def testAUTH_CRAM_MD5(self): 1063 self.serv.add_feature("AUTH CRAM-MD5") 1064 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1065 timeout=support.LOOPBACK_TIMEOUT) 1066 resp = smtp.login(sim_auth[0], sim_auth[1]) 1067 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1068 smtp.close() 1069 1070 @hashlib_helper.requires_hashdigest('md5') 1071 def testAUTH_multiple(self): 1072 # Test that multiple authentication methods are tried. 1073 self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5") 1074 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1075 timeout=support.LOOPBACK_TIMEOUT) 1076 resp = smtp.login(sim_auth[0], sim_auth[1]) 1077 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1078 smtp.close() 1079 1080 def test_auth_function(self): 1081 supported = {'PLAIN', 'LOGIN'} 1082 try: 1083 hashlib.md5() 1084 except ValueError: 1085 pass 1086 else: 1087 supported.add('CRAM-MD5') 1088 for mechanism in supported: 1089 self.serv.add_feature("AUTH {}".format(mechanism)) 1090 for mechanism in supported: 1091 with self.subTest(mechanism=mechanism): 1092 smtp = smtplib.SMTP(HOST, self.port, 1093 local_hostname='localhost', 1094 timeout=support.LOOPBACK_TIMEOUT) 1095 smtp.ehlo('foo') 1096 smtp.user, smtp.password = sim_auth[0], sim_auth[1] 1097 method = 'auth_' + mechanism.lower().replace('-', '_') 1098 resp = smtp.auth(mechanism, getattr(smtp, method)) 1099 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1100 smtp.close() 1101 1102 def test_quit_resets_greeting(self): 1103 smtp = smtplib.SMTP(HOST, self.port, 1104 local_hostname='localhost', 1105 timeout=support.LOOPBACK_TIMEOUT) 1106 code, message = smtp.ehlo() 1107 self.assertEqual(code, 250) 1108 self.assertIn('size', smtp.esmtp_features) 1109 smtp.quit() 1110 self.assertNotIn('size', smtp.esmtp_features) 1111 smtp.connect(HOST, self.port) 1112 self.assertNotIn('size', smtp.esmtp_features) 1113 smtp.ehlo_or_helo_if_needed() 1114 self.assertIn('size', smtp.esmtp_features) 1115 smtp.quit() 1116 1117 def test_with_statement(self): 1118 with smtplib.SMTP(HOST, self.port) as smtp: 1119 code, message = smtp.noop() 1120 self.assertEqual(code, 250) 1121 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1122 with smtplib.SMTP(HOST, self.port) as smtp: 1123 smtp.close() 1124 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1125 1126 def test_with_statement_QUIT_failure(self): 1127 with self.assertRaises(smtplib.SMTPResponseException) as error: 1128 with smtplib.SMTP(HOST, self.port) as smtp: 1129 smtp.noop() 1130 self.serv._SMTPchannel.quit_response = '421 QUIT FAILED' 1131 self.assertEqual(error.exception.smtp_code, 421) 1132 self.assertEqual(error.exception.smtp_error, b'QUIT FAILED') 1133 1134 #TODO: add tests for correct AUTH method fallback now that the 1135 #test infrastructure can support it. 1136 1137 # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception 1138 def test__rest_from_mail_cmd(self): 1139 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1140 timeout=support.LOOPBACK_TIMEOUT) 1141 smtp.noop() 1142 self.serv._SMTPchannel.mail_response = '451 Requested action aborted' 1143 self.serv._SMTPchannel.disconnect = True 1144 with self.assertRaises(smtplib.SMTPSenderRefused): 1145 smtp.sendmail('John', 'Sally', 'test message') 1146 self.assertIsNone(smtp.sock) 1147 1148 # Issue 5713: make sure close, not rset, is called if we get a 421 error 1149 def test_421_from_mail_cmd(self): 1150 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1151 timeout=support.LOOPBACK_TIMEOUT) 1152 smtp.noop() 1153 self.serv._SMTPchannel.mail_response = '421 closing connection' 1154 with self.assertRaises(smtplib.SMTPSenderRefused): 1155 smtp.sendmail('John', 'Sally', 'test message') 1156 self.assertIsNone(smtp.sock) 1157 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1158 1159 def test_421_from_rcpt_cmd(self): 1160 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1161 timeout=support.LOOPBACK_TIMEOUT) 1162 smtp.noop() 1163 self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing'] 1164 with self.assertRaises(smtplib.SMTPRecipientsRefused) as r: 1165 smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message') 1166 self.assertIsNone(smtp.sock) 1167 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1168 self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')}) 1169 1170 def test_421_from_data_cmd(self): 1171 class MySimSMTPChannel(SimSMTPChannel): 1172 def found_terminator(self): 1173 if self.smtp_state == self.DATA: 1174 self.push('421 closing') 1175 else: 1176 super().found_terminator() 1177 self.serv.channel_class = MySimSMTPChannel 1178 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1179 timeout=support.LOOPBACK_TIMEOUT) 1180 smtp.noop() 1181 with self.assertRaises(smtplib.SMTPDataError): 1182 smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message') 1183 self.assertIsNone(smtp.sock) 1184 self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0) 1185 1186 def test_smtputf8_NotSupportedError_if_no_server_support(self): 1187 smtp = smtplib.SMTP( 1188 HOST, self.port, local_hostname='localhost', 1189 timeout=support.LOOPBACK_TIMEOUT) 1190 self.addCleanup(smtp.close) 1191 smtp.ehlo() 1192 self.assertTrue(smtp.does_esmtp) 1193 self.assertFalse(smtp.has_extn('smtputf8')) 1194 self.assertRaises( 1195 smtplib.SMTPNotSupportedError, 1196 smtp.sendmail, 1197 'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1198 self.assertRaises( 1199 smtplib.SMTPNotSupportedError, 1200 smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8']) 1201 1202 def test_send_unicode_without_SMTPUTF8(self): 1203 smtp = smtplib.SMTP( 1204 HOST, self.port, local_hostname='localhost', 1205 timeout=support.LOOPBACK_TIMEOUT) 1206 self.addCleanup(smtp.close) 1207 self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '') 1208 self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice') 1209 1210 def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self): 1211 # This test is located here and not in the SMTPUTF8SimTests 1212 # class because it needs a "regular" SMTP server to work 1213 msg = EmailMessage() 1214 msg['From'] = "Páolo <főo@bar.com>" 1215 msg['To'] = 'Dinsdale' 1216 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1217 smtp = smtplib.SMTP( 1218 HOST, self.port, local_hostname='localhost', 1219 timeout=support.LOOPBACK_TIMEOUT) 1220 self.addCleanup(smtp.close) 1221 with self.assertRaises(smtplib.SMTPNotSupportedError): 1222 smtp.send_message(msg) 1223 1224 def test_name_field_not_included_in_envelop_addresses(self): 1225 smtp = smtplib.SMTP( 1226 HOST, self.port, local_hostname='localhost', 1227 timeout=support.LOOPBACK_TIMEOUT) 1228 self.addCleanup(smtp.close) 1229 1230 message = EmailMessage() 1231 message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com')) 1232 message['To'] = email.utils.formataddr(('René', 'rene@example.com')) 1233 1234 self.assertDictEqual(smtp.send_message(message), {}) 1235 1236 self.assertEqual(self.serv._addresses['from'], 'michael@example.com') 1237 self.assertEqual(self.serv._addresses['tos'], ['rene@example.com']) 1238 1239 1240class SimSMTPUTF8Server(SimSMTPServer): 1241 1242 def __init__(self, *args, **kw): 1243 # The base SMTP server turns these on automatically, but our test 1244 # server is set up to munge the EHLO response, so we need to provide 1245 # them as well. And yes, the call is to SMTPServer not SimSMTPServer. 1246 self._extra_features = ['SMTPUTF8', '8BITMIME'] 1247 smtpd.SMTPServer.__init__(self, *args, **kw) 1248 1249 def handle_accepted(self, conn, addr): 1250 self._SMTPchannel = self.channel_class( 1251 self._extra_features, self, conn, addr, 1252 decode_data=self._decode_data, 1253 enable_SMTPUTF8=self.enable_SMTPUTF8, 1254 ) 1255 1256 def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None, 1257 rcpt_options=None): 1258 self.last_peer = peer 1259 self.last_mailfrom = mailfrom 1260 self.last_rcpttos = rcpttos 1261 self.last_message = data 1262 self.last_mail_options = mail_options 1263 self.last_rcpt_options = rcpt_options 1264 1265 1266class SMTPUTF8SimTests(unittest.TestCase): 1267 1268 maxDiff = None 1269 1270 def setUp(self): 1271 self.thread_key = threading_setup() 1272 self.real_getfqdn = socket.getfqdn 1273 socket.getfqdn = mock_socket.getfqdn 1274 self.serv_evt = threading.Event() 1275 self.client_evt = threading.Event() 1276 # Pick a random unused port by passing 0 for the port number 1277 self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1), 1278 decode_data=False, 1279 enable_SMTPUTF8=True) 1280 # Keep a note of what port was assigned 1281 self.port = self.serv.socket.getsockname()[1] 1282 serv_args = (self.serv, self.serv_evt, self.client_evt) 1283 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1284 self.thread.start() 1285 1286 # wait until server thread has assigned a port number 1287 self.serv_evt.wait() 1288 self.serv_evt.clear() 1289 1290 def tearDown(self): 1291 socket.getfqdn = self.real_getfqdn 1292 # indicate that the client is finished 1293 self.client_evt.set() 1294 # wait for the server thread to terminate 1295 self.serv_evt.wait() 1296 join_thread(self.thread) 1297 del self.thread 1298 self.doCleanups() 1299 threading_cleanup(*self.thread_key) 1300 1301 def test_test_server_supports_extensions(self): 1302 smtp = smtplib.SMTP( 1303 HOST, self.port, local_hostname='localhost', 1304 timeout=support.LOOPBACK_TIMEOUT) 1305 self.addCleanup(smtp.close) 1306 smtp.ehlo() 1307 self.assertTrue(smtp.does_esmtp) 1308 self.assertTrue(smtp.has_extn('smtputf8')) 1309 1310 def test_send_unicode_with_SMTPUTF8_via_sendmail(self): 1311 m = '¡a test message containing unicode!'.encode('utf-8') 1312 smtp = smtplib.SMTP( 1313 HOST, self.port, local_hostname='localhost', 1314 timeout=support.LOOPBACK_TIMEOUT) 1315 self.addCleanup(smtp.close) 1316 smtp.sendmail('Jőhn', 'Sálly', m, 1317 mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1318 self.assertEqual(self.serv.last_mailfrom, 'Jőhn') 1319 self.assertEqual(self.serv.last_rcpttos, ['Sálly']) 1320 self.assertEqual(self.serv.last_message, m) 1321 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1322 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1323 self.assertEqual(self.serv.last_rcpt_options, []) 1324 1325 def test_send_unicode_with_SMTPUTF8_via_low_level_API(self): 1326 m = '¡a test message containing unicode!'.encode('utf-8') 1327 smtp = smtplib.SMTP( 1328 HOST, self.port, local_hostname='localhost', 1329 timeout=support.LOOPBACK_TIMEOUT) 1330 self.addCleanup(smtp.close) 1331 smtp.ehlo() 1332 self.assertEqual( 1333 smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']), 1334 (250, b'OK')) 1335 self.assertEqual(smtp.rcpt('János'), (250, b'OK')) 1336 self.assertEqual(smtp.data(m), (250, b'OK')) 1337 self.assertEqual(self.serv.last_mailfrom, 'Jő') 1338 self.assertEqual(self.serv.last_rcpttos, ['János']) 1339 self.assertEqual(self.serv.last_message, m) 1340 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1341 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1342 self.assertEqual(self.serv.last_rcpt_options, []) 1343 1344 def test_send_message_uses_smtputf8_if_addrs_non_ascii(self): 1345 msg = EmailMessage() 1346 msg['From'] = "Páolo <főo@bar.com>" 1347 msg['To'] = 'Dinsdale' 1348 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1349 # XXX I don't know why I need two \n's here, but this is an existing 1350 # bug (if it is one) and not a problem with the new functionality. 1351 msg.set_content("oh là là, know what I mean, know what I mean?\n\n") 1352 # XXX smtpd converts received /r/n to /n, so we can't easily test that 1353 # we are successfully sending /r/n :(. 1354 expected = textwrap.dedent("""\ 1355 From: Páolo <főo@bar.com> 1356 To: Dinsdale 1357 Subject: Nudge nudge, wink, wink \u1F609 1358 Content-Type: text/plain; charset="utf-8" 1359 Content-Transfer-Encoding: 8bit 1360 MIME-Version: 1.0 1361 1362 oh là là, know what I mean, know what I mean? 1363 """) 1364 smtp = smtplib.SMTP( 1365 HOST, self.port, local_hostname='localhost', 1366 timeout=support.LOOPBACK_TIMEOUT) 1367 self.addCleanup(smtp.close) 1368 self.assertEqual(smtp.send_message(msg), {}) 1369 self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com') 1370 self.assertEqual(self.serv.last_rcpttos, ['Dinsdale']) 1371 self.assertEqual(self.serv.last_message.decode(), expected) 1372 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1373 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1374 self.assertEqual(self.serv.last_rcpt_options, []) 1375 1376 1377EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='') 1378 1379class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel): 1380 def smtp_AUTH(self, arg): 1381 # RFC 4954's AUTH command allows for an optional initial-response. 1382 # Not all AUTH methods support this; some require a challenge. AUTH 1383 # PLAIN does those, so test that here. See issue #15014. 1384 args = arg.split() 1385 if args[0].lower() == 'plain': 1386 if len(args) == 2: 1387 # AUTH PLAIN <initial-response> with the response base 64 1388 # encoded. Hard code the expected response for the test. 1389 if args[1] == EXPECTED_RESPONSE: 1390 self.push('235 Ok') 1391 return 1392 self.push('571 Bad authentication') 1393 1394class SimSMTPAUTHInitialResponseServer(SimSMTPServer): 1395 channel_class = SimSMTPAUTHInitialResponseChannel 1396 1397 1398class SMTPAUTHInitialResponseSimTests(unittest.TestCase): 1399 def setUp(self): 1400 self.thread_key = threading_setup() 1401 self.real_getfqdn = socket.getfqdn 1402 socket.getfqdn = mock_socket.getfqdn 1403 self.serv_evt = threading.Event() 1404 self.client_evt = threading.Event() 1405 # Pick a random unused port by passing 0 for the port number 1406 self.serv = SimSMTPAUTHInitialResponseServer( 1407 (HOST, 0), ('nowhere', -1), decode_data=True) 1408 # Keep a note of what port was assigned 1409 self.port = self.serv.socket.getsockname()[1] 1410 serv_args = (self.serv, self.serv_evt, self.client_evt) 1411 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1412 self.thread.start() 1413 1414 # wait until server thread has assigned a port number 1415 self.serv_evt.wait() 1416 self.serv_evt.clear() 1417 1418 def tearDown(self): 1419 socket.getfqdn = self.real_getfqdn 1420 # indicate that the client is finished 1421 self.client_evt.set() 1422 # wait for the server thread to terminate 1423 self.serv_evt.wait() 1424 join_thread(self.thread) 1425 del self.thread 1426 self.doCleanups() 1427 threading_cleanup(*self.thread_key) 1428 1429 def testAUTH_PLAIN_initial_response_login(self): 1430 self.serv.add_feature('AUTH PLAIN') 1431 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1432 timeout=support.LOOPBACK_TIMEOUT) 1433 smtp.login('psu', 'doesnotexist') 1434 smtp.close() 1435 1436 def testAUTH_PLAIN_initial_response_auth(self): 1437 self.serv.add_feature('AUTH PLAIN') 1438 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1439 timeout=support.LOOPBACK_TIMEOUT) 1440 smtp.user = 'psu' 1441 smtp.password = 'doesnotexist' 1442 code, response = smtp.auth('plain', smtp.auth_plain) 1443 smtp.close() 1444 self.assertEqual(code, 235) 1445 1446 1447if __name__ == '__main__': 1448 unittest.main() 1449