1import asyncore 2import base64 3import email.mime.text 4from email.message import EmailMessage 5from email.base64mime import body_encode as encode_base64 6import email.utils 7import hashlib 8import hmac 9import socket 10import smtpd 11import smtplib 12import io 13import re 14import sys 15import time 16import select 17import errno 18import textwrap 19import threading 20 21import unittest 22from test import support, mock_socket 23from test.support import HOST 24from test.support import threading_setup, threading_cleanup, join_thread 25from test.support import requires_hashdigest 26from unittest.mock import Mock 27 28 29if sys.platform == 'darwin': 30 # select.poll returns a select.POLLHUP at the end of the tests 31 # on darwin, so just ignore it 32 def handle_expt(self): 33 pass 34 smtpd.SMTPChannel.handle_expt = handle_expt 35 36 37def server(evt, buf, serv): 38 serv.listen() 39 evt.set() 40 try: 41 conn, addr = serv.accept() 42 except socket.timeout: 43 pass 44 else: 45 n = 500 46 while buf and n > 0: 47 r, w, e = select.select([], [conn], []) 48 if w: 49 sent = conn.send(buf) 50 buf = buf[sent:] 51 52 n -= 1 53 54 conn.close() 55 finally: 56 serv.close() 57 evt.set() 58 59class GeneralTests(unittest.TestCase): 60 61 def setUp(self): 62 smtplib.socket = mock_socket 63 self.port = 25 64 65 def tearDown(self): 66 smtplib.socket = socket 67 68 # This method is no longer used but is retained for backward compatibility, 69 # so test to make sure it still works. 70 def testQuoteData(self): 71 teststr = "abc\n.jkl\rfoo\r\n..blue" 72 expected = "abc\r\n..jkl\r\nfoo\r\n...blue" 73 self.assertEqual(expected, smtplib.quotedata(teststr)) 74 75 def testBasic1(self): 76 mock_socket.reply_with(b"220 Hola mundo") 77 # connects 78 smtp = smtplib.SMTP(HOST, self.port) 79 smtp.close() 80 81 def testSourceAddress(self): 82 mock_socket.reply_with(b"220 Hola mundo") 83 # connects 84 smtp = smtplib.SMTP(HOST, self.port, 85 source_address=('127.0.0.1',19876)) 86 self.assertEqual(smtp.source_address, ('127.0.0.1', 19876)) 87 smtp.close() 88 89 def testBasic2(self): 90 mock_socket.reply_with(b"220 Hola mundo") 91 # connects, include port in host name 92 smtp = smtplib.SMTP("%s:%s" % (HOST, self.port)) 93 smtp.close() 94 95 def testLocalHostName(self): 96 mock_socket.reply_with(b"220 Hola mundo") 97 # check that supplied local_hostname is used 98 smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost") 99 self.assertEqual(smtp.local_hostname, "testhost") 100 smtp.close() 101 102 def testTimeoutDefault(self): 103 mock_socket.reply_with(b"220 Hola mundo") 104 self.assertIsNone(mock_socket.getdefaulttimeout()) 105 mock_socket.setdefaulttimeout(30) 106 self.assertEqual(mock_socket.getdefaulttimeout(), 30) 107 try: 108 smtp = smtplib.SMTP(HOST, self.port) 109 finally: 110 mock_socket.setdefaulttimeout(None) 111 self.assertEqual(smtp.sock.gettimeout(), 30) 112 smtp.close() 113 114 def testTimeoutNone(self): 115 mock_socket.reply_with(b"220 Hola mundo") 116 self.assertIsNone(socket.getdefaulttimeout()) 117 socket.setdefaulttimeout(30) 118 try: 119 smtp = smtplib.SMTP(HOST, self.port, timeout=None) 120 finally: 121 socket.setdefaulttimeout(None) 122 self.assertIsNone(smtp.sock.gettimeout()) 123 smtp.close() 124 125 def testTimeoutValue(self): 126 mock_socket.reply_with(b"220 Hola mundo") 127 smtp = smtplib.SMTP(HOST, self.port, timeout=30) 128 self.assertEqual(smtp.sock.gettimeout(), 30) 129 smtp.close() 130 131 def test_debuglevel(self): 132 mock_socket.reply_with(b"220 Hello world") 133 smtp = smtplib.SMTP() 134 smtp.set_debuglevel(1) 135 with support.captured_stderr() as stderr: 136 smtp.connect(HOST, self.port) 137 smtp.close() 138 expected = re.compile(r"^connect:", re.MULTILINE) 139 self.assertRegex(stderr.getvalue(), expected) 140 141 def test_debuglevel_2(self): 142 mock_socket.reply_with(b"220 Hello world") 143 smtp = smtplib.SMTP() 144 smtp.set_debuglevel(2) 145 with support.captured_stderr() as stderr: 146 smtp.connect(HOST, self.port) 147 smtp.close() 148 expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ", 149 re.MULTILINE) 150 self.assertRegex(stderr.getvalue(), expected) 151 152 153# Test server thread using the specified SMTP server class 154def debugging_server(serv, serv_evt, client_evt): 155 serv_evt.set() 156 157 try: 158 if hasattr(select, 'poll'): 159 poll_fun = asyncore.poll2 160 else: 161 poll_fun = asyncore.poll 162 163 n = 1000 164 while asyncore.socket_map and n > 0: 165 poll_fun(0.01, asyncore.socket_map) 166 167 # when the client conversation is finished, it will 168 # set client_evt, and it's then ok to kill the server 169 if client_evt.is_set(): 170 serv.close() 171 break 172 173 n -= 1 174 175 except socket.timeout: 176 pass 177 finally: 178 if not client_evt.is_set(): 179 # allow some time for the client to read the result 180 time.sleep(0.5) 181 serv.close() 182 asyncore.close_all() 183 serv_evt.set() 184 185MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n' 186MSG_END = '------------ END MESSAGE ------------\n' 187 188# NOTE: Some SMTP objects in the tests below are created with a non-default 189# local_hostname argument to the constructor, since (on some systems) the FQDN 190# lookup caused by the default local_hostname sometimes takes so long that the 191# test server times out, causing the test to fail. 192 193# Test behavior of smtpd.DebuggingServer 194class DebuggingServerTests(unittest.TestCase): 195 196 maxDiff = None 197 198 def setUp(self): 199 self.thread_key = threading_setup() 200 self.real_getfqdn = socket.getfqdn 201 socket.getfqdn = mock_socket.getfqdn 202 # temporarily replace sys.stdout to capture DebuggingServer output 203 self.old_stdout = sys.stdout 204 self.output = io.StringIO() 205 sys.stdout = self.output 206 207 self.serv_evt = threading.Event() 208 self.client_evt = threading.Event() 209 # Capture SMTPChannel debug output 210 self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM 211 smtpd.DEBUGSTREAM = io.StringIO() 212 # Pick a random unused port by passing 0 for the port number 213 self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1), 214 decode_data=True) 215 # Keep a note of what server host and port were assigned 216 self.host, self.port = self.serv.socket.getsockname()[:2] 217 serv_args = (self.serv, self.serv_evt, self.client_evt) 218 self.thread = threading.Thread(target=debugging_server, args=serv_args) 219 self.thread.start() 220 221 # wait until server thread has assigned a port number 222 self.serv_evt.wait() 223 self.serv_evt.clear() 224 225 def tearDown(self): 226 socket.getfqdn = self.real_getfqdn 227 # indicate that the client is finished 228 self.client_evt.set() 229 # wait for the server thread to terminate 230 self.serv_evt.wait() 231 join_thread(self.thread) 232 # restore sys.stdout 233 sys.stdout = self.old_stdout 234 # restore DEBUGSTREAM 235 smtpd.DEBUGSTREAM.close() 236 smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM 237 del self.thread 238 self.doCleanups() 239 threading_cleanup(*self.thread_key) 240 241 def get_output_without_xpeer(self): 242 test_output = self.output.getvalue() 243 return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2', 244 test_output, flags=re.MULTILINE|re.DOTALL) 245 246 def testBasic(self): 247 # connect 248 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 249 smtp.quit() 250 251 def testSourceAddress(self): 252 # connect 253 src_port = support.find_unused_port() 254 try: 255 smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost', 256 timeout=3, source_address=(self.host, src_port)) 257 self.addCleanup(smtp.close) 258 self.assertEqual(smtp.source_address, (self.host, src_port)) 259 self.assertEqual(smtp.local_hostname, 'localhost') 260 smtp.quit() 261 except OSError as e: 262 if e.errno == errno.EADDRINUSE: 263 self.skipTest("couldn't bind to source port %d" % src_port) 264 raise 265 266 def testNOOP(self): 267 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 268 self.addCleanup(smtp.close) 269 expected = (250, b'OK') 270 self.assertEqual(smtp.noop(), expected) 271 smtp.quit() 272 273 def testRSET(self): 274 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 275 self.addCleanup(smtp.close) 276 expected = (250, b'OK') 277 self.assertEqual(smtp.rset(), expected) 278 smtp.quit() 279 280 def testELHO(self): 281 # EHLO isn't implemented in DebuggingServer 282 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 283 self.addCleanup(smtp.close) 284 expected = (250, b'\nSIZE 33554432\nHELP') 285 self.assertEqual(smtp.ehlo(), expected) 286 smtp.quit() 287 288 def testEXPNNotImplemented(self): 289 # EXPN isn't implemented in DebuggingServer 290 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 291 self.addCleanup(smtp.close) 292 expected = (502, b'EXPN not implemented') 293 smtp.putcmd('EXPN') 294 self.assertEqual(smtp.getreply(), expected) 295 smtp.quit() 296 297 def testVRFY(self): 298 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 299 self.addCleanup(smtp.close) 300 expected = (252, b'Cannot VRFY user, but will accept message ' + \ 301 b'and attempt delivery') 302 self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected) 303 self.assertEqual(smtp.verify('nobody@nowhere.com'), expected) 304 smtp.quit() 305 306 def testSecondHELO(self): 307 # check that a second HELO returns a message that it's a duplicate 308 # (this behavior is specific to smtpd.SMTPChannel) 309 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 310 self.addCleanup(smtp.close) 311 smtp.helo() 312 expected = (503, b'Duplicate HELO/EHLO') 313 self.assertEqual(smtp.helo(), expected) 314 smtp.quit() 315 316 def testHELP(self): 317 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 318 self.addCleanup(smtp.close) 319 self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \ 320 b'RCPT DATA RSET NOOP QUIT VRFY') 321 smtp.quit() 322 323 def testSend(self): 324 # connect and send mail 325 m = 'A test message' 326 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 327 self.addCleanup(smtp.close) 328 smtp.sendmail('John', 'Sally', m) 329 # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor 330 # in asyncore. This sleep might help, but should really be fixed 331 # properly by using an Event variable. 332 time.sleep(0.01) 333 smtp.quit() 334 335 self.client_evt.set() 336 self.serv_evt.wait() 337 self.output.flush() 338 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 339 self.assertEqual(self.output.getvalue(), mexpect) 340 341 def testSendBinary(self): 342 m = b'A test message' 343 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 344 self.addCleanup(smtp.close) 345 smtp.sendmail('John', 'Sally', m) 346 # XXX (see comment in testSend) 347 time.sleep(0.01) 348 smtp.quit() 349 350 self.client_evt.set() 351 self.serv_evt.wait() 352 self.output.flush() 353 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END) 354 self.assertEqual(self.output.getvalue(), mexpect) 355 356 def testSendNeedingDotQuote(self): 357 # Issue 12283 358 m = '.A test\n.mes.sage.' 359 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 360 self.addCleanup(smtp.close) 361 smtp.sendmail('John', 'Sally', m) 362 # XXX (see comment in testSend) 363 time.sleep(0.01) 364 smtp.quit() 365 366 self.client_evt.set() 367 self.serv_evt.wait() 368 self.output.flush() 369 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 370 self.assertEqual(self.output.getvalue(), mexpect) 371 372 def testSendNullSender(self): 373 m = 'A test message' 374 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 375 self.addCleanup(smtp.close) 376 smtp.sendmail('<>', 'Sally', m) 377 # XXX (see comment in testSend) 378 time.sleep(0.01) 379 smtp.quit() 380 381 self.client_evt.set() 382 self.serv_evt.wait() 383 self.output.flush() 384 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 385 self.assertEqual(self.output.getvalue(), mexpect) 386 debugout = smtpd.DEBUGSTREAM.getvalue() 387 sender = re.compile("^sender: <>$", re.MULTILINE) 388 self.assertRegex(debugout, sender) 389 390 def testSendMessage(self): 391 m = email.mime.text.MIMEText('A test message') 392 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 393 self.addCleanup(smtp.close) 394 smtp.send_message(m, from_addr='John', to_addrs='Sally') 395 # XXX (see comment in testSend) 396 time.sleep(0.01) 397 smtp.quit() 398 399 self.client_evt.set() 400 self.serv_evt.wait() 401 self.output.flush() 402 # Remove the X-Peer header that DebuggingServer adds as figuring out 403 # exactly what IP address format is put there is not easy (and 404 # irrelevant to our test). Typically 127.0.0.1 or ::1, but it is 405 # not always the same as socket.gethostbyname(HOST). :( 406 test_output = self.get_output_without_xpeer() 407 del m['X-Peer'] 408 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 409 self.assertEqual(test_output, mexpect) 410 411 def testSendMessageWithAddresses(self): 412 m = email.mime.text.MIMEText('A test message') 413 m['From'] = 'foo@bar.com' 414 m['To'] = 'John' 415 m['CC'] = 'Sally, Fred' 416 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 417 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 418 self.addCleanup(smtp.close) 419 smtp.send_message(m) 420 # XXX (see comment in testSend) 421 time.sleep(0.01) 422 smtp.quit() 423 # make sure the Bcc header is still in the message. 424 self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" ' 425 '<warped@silly.walks.com>') 426 427 self.client_evt.set() 428 self.serv_evt.wait() 429 self.output.flush() 430 # Remove the X-Peer header that DebuggingServer adds. 431 test_output = self.get_output_without_xpeer() 432 del m['X-Peer'] 433 # The Bcc header should not be transmitted. 434 del m['Bcc'] 435 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 436 self.assertEqual(test_output, mexpect) 437 debugout = smtpd.DEBUGSTREAM.getvalue() 438 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 439 self.assertRegex(debugout, sender) 440 for addr in ('John', 'Sally', 'Fred', 'root@localhost', 441 'warped@silly.walks.com'): 442 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 443 re.MULTILINE) 444 self.assertRegex(debugout, to_addr) 445 446 def testSendMessageWithSomeAddresses(self): 447 # Make sure nothing breaks if not all of the three 'to' headers exist 448 m = email.mime.text.MIMEText('A test message') 449 m['From'] = 'foo@bar.com' 450 m['To'] = 'John, Dinsdale' 451 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 452 self.addCleanup(smtp.close) 453 smtp.send_message(m) 454 # XXX (see comment in testSend) 455 time.sleep(0.01) 456 smtp.quit() 457 458 self.client_evt.set() 459 self.serv_evt.wait() 460 self.output.flush() 461 # Remove the X-Peer header that DebuggingServer adds. 462 test_output = self.get_output_without_xpeer() 463 del m['X-Peer'] 464 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 465 self.assertEqual(test_output, mexpect) 466 debugout = smtpd.DEBUGSTREAM.getvalue() 467 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 468 self.assertRegex(debugout, sender) 469 for addr in ('John', 'Dinsdale'): 470 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 471 re.MULTILINE) 472 self.assertRegex(debugout, to_addr) 473 474 def testSendMessageWithSpecifiedAddresses(self): 475 # Make sure addresses specified in call override those in message. 476 m = email.mime.text.MIMEText('A test message') 477 m['From'] = 'foo@bar.com' 478 m['To'] = 'John, Dinsdale' 479 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 480 self.addCleanup(smtp.close) 481 smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net') 482 # XXX (see comment in testSend) 483 time.sleep(0.01) 484 smtp.quit() 485 486 self.client_evt.set() 487 self.serv_evt.wait() 488 self.output.flush() 489 # Remove the X-Peer header that DebuggingServer adds. 490 test_output = self.get_output_without_xpeer() 491 del m['X-Peer'] 492 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 493 self.assertEqual(test_output, mexpect) 494 debugout = smtpd.DEBUGSTREAM.getvalue() 495 sender = re.compile("^sender: joe@example.com$", re.MULTILINE) 496 self.assertRegex(debugout, sender) 497 for addr in ('John', 'Dinsdale'): 498 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 499 re.MULTILINE) 500 self.assertNotRegex(debugout, to_addr) 501 recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE) 502 self.assertRegex(debugout, recip) 503 504 def testSendMessageWithMultipleFrom(self): 505 # Sender overrides To 506 m = email.mime.text.MIMEText('A test message') 507 m['From'] = 'Bernard, Bianca' 508 m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com' 509 m['To'] = 'John, Dinsdale' 510 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 511 self.addCleanup(smtp.close) 512 smtp.send_message(m) 513 # XXX (see comment in testSend) 514 time.sleep(0.01) 515 smtp.quit() 516 517 self.client_evt.set() 518 self.serv_evt.wait() 519 self.output.flush() 520 # Remove the X-Peer header that DebuggingServer adds. 521 test_output = self.get_output_without_xpeer() 522 del m['X-Peer'] 523 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 524 self.assertEqual(test_output, mexpect) 525 debugout = smtpd.DEBUGSTREAM.getvalue() 526 sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE) 527 self.assertRegex(debugout, sender) 528 for addr in ('John', 'Dinsdale'): 529 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 530 re.MULTILINE) 531 self.assertRegex(debugout, to_addr) 532 533 def testSendMessageResent(self): 534 m = email.mime.text.MIMEText('A test message') 535 m['From'] = 'foo@bar.com' 536 m['To'] = 'John' 537 m['CC'] = 'Sally, Fred' 538 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 539 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 540 m['Resent-From'] = 'holy@grail.net' 541 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 542 m['Resent-Bcc'] = 'doe@losthope.net' 543 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 544 self.addCleanup(smtp.close) 545 smtp.send_message(m) 546 # XXX (see comment in testSend) 547 time.sleep(0.01) 548 smtp.quit() 549 550 self.client_evt.set() 551 self.serv_evt.wait() 552 self.output.flush() 553 # The Resent-Bcc headers are deleted before serialization. 554 del m['Bcc'] 555 del m['Resent-Bcc'] 556 # Remove the X-Peer header that DebuggingServer adds. 557 test_output = self.get_output_without_xpeer() 558 del m['X-Peer'] 559 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 560 self.assertEqual(test_output, mexpect) 561 debugout = smtpd.DEBUGSTREAM.getvalue() 562 sender = re.compile("^sender: holy@grail.net$", re.MULTILINE) 563 self.assertRegex(debugout, sender) 564 for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'): 565 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 566 re.MULTILINE) 567 self.assertRegex(debugout, to_addr) 568 569 def testSendMessageMultipleResentRaises(self): 570 m = email.mime.text.MIMEText('A test message') 571 m['From'] = 'foo@bar.com' 572 m['To'] = 'John' 573 m['CC'] = 'Sally, Fred' 574 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 575 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 576 m['Resent-From'] = 'holy@grail.net' 577 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 578 m['Resent-Bcc'] = 'doe@losthope.net' 579 m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000' 580 m['Resent-To'] = 'holy@grail.net' 581 m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff' 582 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 583 self.addCleanup(smtp.close) 584 with self.assertRaises(ValueError): 585 smtp.send_message(m) 586 smtp.close() 587 588class NonConnectingTests(unittest.TestCase): 589 590 def testNotConnected(self): 591 # Test various operations on an unconnected SMTP object that 592 # should raise exceptions (at present the attempt in SMTP.send 593 # to reference the nonexistent 'sock' attribute of the SMTP object 594 # causes an AttributeError) 595 smtp = smtplib.SMTP() 596 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo) 597 self.assertRaises(smtplib.SMTPServerDisconnected, 598 smtp.send, 'test msg') 599 600 def testNonnumericPort(self): 601 # check that non-numeric port raises OSError 602 self.assertRaises(OSError, smtplib.SMTP, 603 "localhost", "bogus") 604 self.assertRaises(OSError, smtplib.SMTP, 605 "localhost:bogus") 606 607 def testSockAttributeExists(self): 608 # check that sock attribute is present outside of a connect() call 609 # (regression test, the previous behavior raised an 610 # AttributeError: 'SMTP' object has no attribute 'sock') 611 with smtplib.SMTP() as smtp: 612 self.assertIsNone(smtp.sock) 613 614 615class DefaultArgumentsTests(unittest.TestCase): 616 617 def setUp(self): 618 self.msg = EmailMessage() 619 self.msg['From'] = 'Páolo <főo@bar.com>' 620 self.smtp = smtplib.SMTP() 621 self.smtp.ehlo = Mock(return_value=(200, 'OK')) 622 self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock() 623 624 def testSendMessage(self): 625 expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME') 626 self.smtp.send_message(self.msg) 627 self.smtp.send_message(self.msg) 628 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 629 expected_mail_options) 630 self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3], 631 expected_mail_options) 632 633 def testSendMessageWithMailOptions(self): 634 mail_options = ['STARTTLS'] 635 expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME') 636 self.smtp.send_message(self.msg, None, None, mail_options) 637 self.assertEqual(mail_options, ['STARTTLS']) 638 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 639 expected_mail_options) 640 641 642# test response of client to a non-successful HELO message 643class BadHELOServerTests(unittest.TestCase): 644 645 def setUp(self): 646 smtplib.socket = mock_socket 647 mock_socket.reply_with(b"199 no hello for you!") 648 self.old_stdout = sys.stdout 649 self.output = io.StringIO() 650 sys.stdout = self.output 651 self.port = 25 652 653 def tearDown(self): 654 smtplib.socket = socket 655 sys.stdout = self.old_stdout 656 657 def testFailingHELO(self): 658 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, 659 HOST, self.port, 'localhost', 3) 660 661 662class TooLongLineTests(unittest.TestCase): 663 respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n' 664 665 def setUp(self): 666 self.thread_key = threading_setup() 667 self.old_stdout = sys.stdout 668 self.output = io.StringIO() 669 sys.stdout = self.output 670 671 self.evt = threading.Event() 672 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 673 self.sock.settimeout(15) 674 self.port = support.bind_port(self.sock) 675 servargs = (self.evt, self.respdata, self.sock) 676 self.thread = threading.Thread(target=server, args=servargs) 677 self.thread.start() 678 self.evt.wait() 679 self.evt.clear() 680 681 def tearDown(self): 682 self.evt.wait() 683 sys.stdout = self.old_stdout 684 join_thread(self.thread) 685 del self.thread 686 self.doCleanups() 687 threading_cleanup(*self.thread_key) 688 689 def testLineTooLong(self): 690 self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP, 691 HOST, self.port, 'localhost', 3) 692 693 694sim_users = {'Mr.A@somewhere.com':'John A', 695 'Ms.B@xn--fo-fka.com':'Sally B', 696 'Mrs.C@somewhereesle.com':'Ruth C', 697 } 698 699sim_auth = ('Mr.A@somewhere.com', 'somepassword') 700sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn' 701 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=') 702sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'], 703 'list-2':['Ms.B@xn--fo-fka.com',], 704 } 705 706# Simulated SMTP channel & server 707class ResponseException(Exception): pass 708class SimSMTPChannel(smtpd.SMTPChannel): 709 710 quit_response = None 711 mail_response = None 712 rcpt_response = None 713 data_response = None 714 rcpt_count = 0 715 rset_count = 0 716 disconnect = 0 717 AUTH = 99 # Add protocol state to enable auth testing. 718 authenticated_user = None 719 720 def __init__(self, extra_features, *args, **kw): 721 self._extrafeatures = ''.join( 722 [ "250-{0}\r\n".format(x) for x in extra_features ]) 723 super(SimSMTPChannel, self).__init__(*args, **kw) 724 725 # AUTH related stuff. It would be nice if support for this were in smtpd. 726 def found_terminator(self): 727 if self.smtp_state == self.AUTH: 728 line = self._emptystring.join(self.received_lines) 729 print('Data:', repr(line), file=smtpd.DEBUGSTREAM) 730 self.received_lines = [] 731 try: 732 self.auth_object(line) 733 except ResponseException as e: 734 self.smtp_state = self.COMMAND 735 self.push('%s %s' % (e.smtp_code, e.smtp_error)) 736 return 737 super().found_terminator() 738 739 740 def smtp_AUTH(self, arg): 741 if not self.seen_greeting: 742 self.push('503 Error: send EHLO first') 743 return 744 if not self.extended_smtp or 'AUTH' not in self._extrafeatures: 745 self.push('500 Error: command "AUTH" not recognized') 746 return 747 if self.authenticated_user is not None: 748 self.push( 749 '503 Bad sequence of commands: already authenticated') 750 return 751 args = arg.split() 752 if len(args) not in [1, 2]: 753 self.push('501 Syntax: AUTH <mechanism> [initial-response]') 754 return 755 auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_') 756 try: 757 self.auth_object = getattr(self, auth_object_name) 758 except AttributeError: 759 self.push('504 Command parameter not implemented: unsupported ' 760 ' authentication mechanism {!r}'.format(auth_object_name)) 761 return 762 self.smtp_state = self.AUTH 763 self.auth_object(args[1] if len(args) == 2 else None) 764 765 def _authenticated(self, user, valid): 766 if valid: 767 self.authenticated_user = user 768 self.push('235 Authentication Succeeded') 769 else: 770 self.push('535 Authentication credentials invalid') 771 self.smtp_state = self.COMMAND 772 773 def _decode_base64(self, string): 774 return base64.decodebytes(string.encode('ascii')).decode('utf-8') 775 776 def _auth_plain(self, arg=None): 777 if arg is None: 778 self.push('334 ') 779 else: 780 logpass = self._decode_base64(arg) 781 try: 782 *_, user, password = logpass.split('\0') 783 except ValueError as e: 784 self.push('535 Splitting response {!r} into user and password' 785 ' failed: {}'.format(logpass, e)) 786 return 787 self._authenticated(user, password == sim_auth[1]) 788 789 def _auth_login(self, arg=None): 790 if arg is None: 791 # base64 encoded 'Username:' 792 self.push('334 VXNlcm5hbWU6') 793 elif not hasattr(self, '_auth_login_user'): 794 self._auth_login_user = self._decode_base64(arg) 795 # base64 encoded 'Password:' 796 self.push('334 UGFzc3dvcmQ6') 797 else: 798 password = self._decode_base64(arg) 799 self._authenticated(self._auth_login_user, password == sim_auth[1]) 800 del self._auth_login_user 801 802 def _auth_cram_md5(self, arg=None): 803 if arg is None: 804 self.push('334 {}'.format(sim_cram_md5_challenge)) 805 else: 806 logpass = self._decode_base64(arg) 807 try: 808 user, hashed_pass = logpass.split() 809 except ValueError as e: 810 self.push('535 Splitting response {!r} into user and password ' 811 'failed: {}'.format(logpass, e)) 812 return False 813 valid_hashed_pass = hmac.HMAC( 814 sim_auth[1].encode('ascii'), 815 self._decode_base64(sim_cram_md5_challenge).encode('ascii'), 816 'md5').hexdigest() 817 self._authenticated(user, hashed_pass == valid_hashed_pass) 818 # end AUTH related stuff. 819 820 def smtp_EHLO(self, arg): 821 resp = ('250-testhost\r\n' 822 '250-EXPN\r\n' 823 '250-SIZE 20000000\r\n' 824 '250-STARTTLS\r\n' 825 '250-DELIVERBY\r\n') 826 resp = resp + self._extrafeatures + '250 HELP' 827 self.push(resp) 828 self.seen_greeting = arg 829 self.extended_smtp = True 830 831 def smtp_VRFY(self, arg): 832 # For max compatibility smtplib should be sending the raw address. 833 if arg in sim_users: 834 self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg))) 835 else: 836 self.push('550 No such user: %s' % arg) 837 838 def smtp_EXPN(self, arg): 839 list_name = arg.lower() 840 if list_name in sim_lists: 841 user_list = sim_lists[list_name] 842 for n, user_email in enumerate(user_list): 843 quoted_addr = smtplib.quoteaddr(user_email) 844 if n < len(user_list) - 1: 845 self.push('250-%s %s' % (sim_users[user_email], quoted_addr)) 846 else: 847 self.push('250 %s %s' % (sim_users[user_email], quoted_addr)) 848 else: 849 self.push('550 No access for you!') 850 851 def smtp_QUIT(self, arg): 852 if self.quit_response is None: 853 super(SimSMTPChannel, self).smtp_QUIT(arg) 854 else: 855 self.push(self.quit_response) 856 self.close_when_done() 857 858 def smtp_MAIL(self, arg): 859 if self.mail_response is None: 860 super().smtp_MAIL(arg) 861 else: 862 self.push(self.mail_response) 863 if self.disconnect: 864 self.close_when_done() 865 866 def smtp_RCPT(self, arg): 867 if self.rcpt_response is None: 868 super().smtp_RCPT(arg) 869 return 870 self.rcpt_count += 1 871 self.push(self.rcpt_response[self.rcpt_count-1]) 872 873 def smtp_RSET(self, arg): 874 self.rset_count += 1 875 super().smtp_RSET(arg) 876 877 def smtp_DATA(self, arg): 878 if self.data_response is None: 879 super().smtp_DATA(arg) 880 else: 881 self.push(self.data_response) 882 883 def handle_error(self): 884 raise 885 886 887class SimSMTPServer(smtpd.SMTPServer): 888 889 channel_class = SimSMTPChannel 890 891 def __init__(self, *args, **kw): 892 self._extra_features = [] 893 self._addresses = {} 894 smtpd.SMTPServer.__init__(self, *args, **kw) 895 896 def handle_accepted(self, conn, addr): 897 self._SMTPchannel = self.channel_class( 898 self._extra_features, self, conn, addr, 899 decode_data=self._decode_data) 900 901 def process_message(self, peer, mailfrom, rcpttos, data): 902 self._addresses['from'] = mailfrom 903 self._addresses['tos'] = rcpttos 904 905 def add_feature(self, feature): 906 self._extra_features.append(feature) 907 908 def handle_error(self): 909 raise 910 911 912# Test various SMTP & ESMTP commands/behaviors that require a simulated server 913# (i.e., something with more features than DebuggingServer) 914class SMTPSimTests(unittest.TestCase): 915 916 def setUp(self): 917 self.thread_key = threading_setup() 918 self.real_getfqdn = socket.getfqdn 919 socket.getfqdn = mock_socket.getfqdn 920 self.serv_evt = threading.Event() 921 self.client_evt = threading.Event() 922 # Pick a random unused port by passing 0 for the port number 923 self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True) 924 # Keep a note of what port was assigned 925 self.port = self.serv.socket.getsockname()[1] 926 serv_args = (self.serv, self.serv_evt, self.client_evt) 927 self.thread = threading.Thread(target=debugging_server, args=serv_args) 928 self.thread.start() 929 930 # wait until server thread has assigned a port number 931 self.serv_evt.wait() 932 self.serv_evt.clear() 933 934 def tearDown(self): 935 socket.getfqdn = self.real_getfqdn 936 # indicate that the client is finished 937 self.client_evt.set() 938 # wait for the server thread to terminate 939 self.serv_evt.wait() 940 join_thread(self.thread) 941 del self.thread 942 self.doCleanups() 943 threading_cleanup(*self.thread_key) 944 945 def testBasic(self): 946 # smoke test 947 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 948 smtp.quit() 949 950 def testEHLO(self): 951 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 952 953 # no features should be present before the EHLO 954 self.assertEqual(smtp.esmtp_features, {}) 955 956 # features expected from the test server 957 expected_features = {'expn':'', 958 'size': '20000000', 959 'starttls': '', 960 'deliverby': '', 961 'help': '', 962 } 963 964 smtp.ehlo() 965 self.assertEqual(smtp.esmtp_features, expected_features) 966 for k in expected_features: 967 self.assertTrue(smtp.has_extn(k)) 968 self.assertFalse(smtp.has_extn('unsupported-feature')) 969 smtp.quit() 970 971 def testVRFY(self): 972 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 973 974 for addr_spec, name in sim_users.items(): 975 expected_known = (250, bytes('%s %s' % 976 (name, smtplib.quoteaddr(addr_spec)), 977 "ascii")) 978 self.assertEqual(smtp.vrfy(addr_spec), expected_known) 979 980 u = 'nobody@nowhere.com' 981 expected_unknown = (550, ('No such user: %s' % u).encode('ascii')) 982 self.assertEqual(smtp.vrfy(u), expected_unknown) 983 smtp.quit() 984 985 def testEXPN(self): 986 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 987 988 for listname, members in sim_lists.items(): 989 users = [] 990 for m in members: 991 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m))) 992 expected_known = (250, bytes('\n'.join(users), "ascii")) 993 self.assertEqual(smtp.expn(listname), expected_known) 994 995 u = 'PSU-Members-List' 996 expected_unknown = (550, b'No access for you!') 997 self.assertEqual(smtp.expn(u), expected_unknown) 998 smtp.quit() 999 1000 def testAUTH_PLAIN(self): 1001 self.serv.add_feature("AUTH PLAIN") 1002 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1003 resp = smtp.login(sim_auth[0], sim_auth[1]) 1004 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1005 smtp.close() 1006 1007 def testAUTH_LOGIN(self): 1008 self.serv.add_feature("AUTH LOGIN") 1009 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1010 resp = smtp.login(sim_auth[0], sim_auth[1]) 1011 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1012 smtp.close() 1013 1014 @requires_hashdigest('md5') 1015 def testAUTH_CRAM_MD5(self): 1016 self.serv.add_feature("AUTH CRAM-MD5") 1017 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1018 resp = smtp.login(sim_auth[0], sim_auth[1]) 1019 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1020 smtp.close() 1021 1022 def testAUTH_multiple(self): 1023 # Test that multiple authentication methods are tried. 1024 self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5") 1025 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1026 resp = smtp.login(sim_auth[0], sim_auth[1]) 1027 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1028 smtp.close() 1029 1030 def test_auth_function(self): 1031 supported = {'PLAIN', 'LOGIN'} 1032 try: 1033 hashlib.md5() 1034 except ValueError: 1035 pass 1036 else: 1037 supported.add('CRAM-MD5') 1038 for mechanism in supported: 1039 self.serv.add_feature("AUTH {}".format(mechanism)) 1040 for mechanism in supported: 1041 with self.subTest(mechanism=mechanism): 1042 smtp = smtplib.SMTP(HOST, self.port, 1043 local_hostname='localhost', timeout=15) 1044 smtp.ehlo('foo') 1045 smtp.user, smtp.password = sim_auth[0], sim_auth[1] 1046 method = 'auth_' + mechanism.lower().replace('-', '_') 1047 resp = smtp.auth(mechanism, getattr(smtp, method)) 1048 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1049 smtp.close() 1050 1051 def test_quit_resets_greeting(self): 1052 smtp = smtplib.SMTP(HOST, self.port, 1053 local_hostname='localhost', 1054 timeout=15) 1055 code, message = smtp.ehlo() 1056 self.assertEqual(code, 250) 1057 self.assertIn('size', smtp.esmtp_features) 1058 smtp.quit() 1059 self.assertNotIn('size', smtp.esmtp_features) 1060 smtp.connect(HOST, self.port) 1061 self.assertNotIn('size', smtp.esmtp_features) 1062 smtp.ehlo_or_helo_if_needed() 1063 self.assertIn('size', smtp.esmtp_features) 1064 smtp.quit() 1065 1066 def test_with_statement(self): 1067 with smtplib.SMTP(HOST, self.port) as smtp: 1068 code, message = smtp.noop() 1069 self.assertEqual(code, 250) 1070 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1071 with smtplib.SMTP(HOST, self.port) as smtp: 1072 smtp.close() 1073 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1074 1075 def test_with_statement_QUIT_failure(self): 1076 with self.assertRaises(smtplib.SMTPResponseException) as error: 1077 with smtplib.SMTP(HOST, self.port) as smtp: 1078 smtp.noop() 1079 self.serv._SMTPchannel.quit_response = '421 QUIT FAILED' 1080 self.assertEqual(error.exception.smtp_code, 421) 1081 self.assertEqual(error.exception.smtp_error, b'QUIT FAILED') 1082 1083 #TODO: add tests for correct AUTH method fallback now that the 1084 #test infrastructure can support it. 1085 1086 # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception 1087 def test__rest_from_mail_cmd(self): 1088 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1089 smtp.noop() 1090 self.serv._SMTPchannel.mail_response = '451 Requested action aborted' 1091 self.serv._SMTPchannel.disconnect = True 1092 with self.assertRaises(smtplib.SMTPSenderRefused): 1093 smtp.sendmail('John', 'Sally', 'test message') 1094 self.assertIsNone(smtp.sock) 1095 1096 # Issue 5713: make sure close, not rset, is called if we get a 421 error 1097 def test_421_from_mail_cmd(self): 1098 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1099 smtp.noop() 1100 self.serv._SMTPchannel.mail_response = '421 closing connection' 1101 with self.assertRaises(smtplib.SMTPSenderRefused): 1102 smtp.sendmail('John', 'Sally', 'test message') 1103 self.assertIsNone(smtp.sock) 1104 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1105 1106 def test_421_from_rcpt_cmd(self): 1107 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1108 smtp.noop() 1109 self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing'] 1110 with self.assertRaises(smtplib.SMTPRecipientsRefused) as r: 1111 smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message') 1112 self.assertIsNone(smtp.sock) 1113 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1114 self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')}) 1115 1116 def test_421_from_data_cmd(self): 1117 class MySimSMTPChannel(SimSMTPChannel): 1118 def found_terminator(self): 1119 if self.smtp_state == self.DATA: 1120 self.push('421 closing') 1121 else: 1122 super().found_terminator() 1123 self.serv.channel_class = MySimSMTPChannel 1124 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1125 smtp.noop() 1126 with self.assertRaises(smtplib.SMTPDataError): 1127 smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message') 1128 self.assertIsNone(smtp.sock) 1129 self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0) 1130 1131 def test_smtputf8_NotSupportedError_if_no_server_support(self): 1132 smtp = smtplib.SMTP( 1133 HOST, self.port, local_hostname='localhost', timeout=3) 1134 self.addCleanup(smtp.close) 1135 smtp.ehlo() 1136 self.assertTrue(smtp.does_esmtp) 1137 self.assertFalse(smtp.has_extn('smtputf8')) 1138 self.assertRaises( 1139 smtplib.SMTPNotSupportedError, 1140 smtp.sendmail, 1141 'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1142 self.assertRaises( 1143 smtplib.SMTPNotSupportedError, 1144 smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8']) 1145 1146 def test_send_unicode_without_SMTPUTF8(self): 1147 smtp = smtplib.SMTP( 1148 HOST, self.port, local_hostname='localhost', timeout=3) 1149 self.addCleanup(smtp.close) 1150 self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '') 1151 self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice') 1152 1153 def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self): 1154 # This test is located here and not in the SMTPUTF8SimTests 1155 # class because it needs a "regular" SMTP server to work 1156 msg = EmailMessage() 1157 msg['From'] = "Páolo <főo@bar.com>" 1158 msg['To'] = 'Dinsdale' 1159 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1160 smtp = smtplib.SMTP( 1161 HOST, self.port, local_hostname='localhost', timeout=3) 1162 self.addCleanup(smtp.close) 1163 with self.assertRaises(smtplib.SMTPNotSupportedError): 1164 smtp.send_message(msg) 1165 1166 def test_name_field_not_included_in_envelop_addresses(self): 1167 smtp = smtplib.SMTP( 1168 HOST, self.port, local_hostname='localhost', timeout=3 1169 ) 1170 self.addCleanup(smtp.close) 1171 1172 message = EmailMessage() 1173 message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com')) 1174 message['To'] = email.utils.formataddr(('René', 'rene@example.com')) 1175 1176 self.assertDictEqual(smtp.send_message(message), {}) 1177 1178 self.assertEqual(self.serv._addresses['from'], 'michael@example.com') 1179 self.assertEqual(self.serv._addresses['tos'], ['rene@example.com']) 1180 1181 1182class SimSMTPUTF8Server(SimSMTPServer): 1183 1184 def __init__(self, *args, **kw): 1185 # The base SMTP server turns these on automatically, but our test 1186 # server is set up to munge the EHLO response, so we need to provide 1187 # them as well. And yes, the call is to SMTPServer not SimSMTPServer. 1188 self._extra_features = ['SMTPUTF8', '8BITMIME'] 1189 smtpd.SMTPServer.__init__(self, *args, **kw) 1190 1191 def handle_accepted(self, conn, addr): 1192 self._SMTPchannel = self.channel_class( 1193 self._extra_features, self, conn, addr, 1194 decode_data=self._decode_data, 1195 enable_SMTPUTF8=self.enable_SMTPUTF8, 1196 ) 1197 1198 def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None, 1199 rcpt_options=None): 1200 self.last_peer = peer 1201 self.last_mailfrom = mailfrom 1202 self.last_rcpttos = rcpttos 1203 self.last_message = data 1204 self.last_mail_options = mail_options 1205 self.last_rcpt_options = rcpt_options 1206 1207 1208class SMTPUTF8SimTests(unittest.TestCase): 1209 1210 maxDiff = None 1211 1212 def setUp(self): 1213 self.thread_key = threading_setup() 1214 self.real_getfqdn = socket.getfqdn 1215 socket.getfqdn = mock_socket.getfqdn 1216 self.serv_evt = threading.Event() 1217 self.client_evt = threading.Event() 1218 # Pick a random unused port by passing 0 for the port number 1219 self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1), 1220 decode_data=False, 1221 enable_SMTPUTF8=True) 1222 # Keep a note of what port was assigned 1223 self.port = self.serv.socket.getsockname()[1] 1224 serv_args = (self.serv, self.serv_evt, self.client_evt) 1225 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1226 self.thread.start() 1227 1228 # wait until server thread has assigned a port number 1229 self.serv_evt.wait() 1230 self.serv_evt.clear() 1231 1232 def tearDown(self): 1233 socket.getfqdn = self.real_getfqdn 1234 # indicate that the client is finished 1235 self.client_evt.set() 1236 # wait for the server thread to terminate 1237 self.serv_evt.wait() 1238 join_thread(self.thread) 1239 del self.thread 1240 self.doCleanups() 1241 threading_cleanup(*self.thread_key) 1242 1243 def test_test_server_supports_extensions(self): 1244 smtp = smtplib.SMTP( 1245 HOST, self.port, local_hostname='localhost', timeout=3) 1246 self.addCleanup(smtp.close) 1247 smtp.ehlo() 1248 self.assertTrue(smtp.does_esmtp) 1249 self.assertTrue(smtp.has_extn('smtputf8')) 1250 1251 def test_send_unicode_with_SMTPUTF8_via_sendmail(self): 1252 m = '¡a test message containing unicode!'.encode('utf-8') 1253 smtp = smtplib.SMTP( 1254 HOST, self.port, local_hostname='localhost', timeout=3) 1255 self.addCleanup(smtp.close) 1256 smtp.sendmail('Jőhn', 'Sálly', m, 1257 mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1258 self.assertEqual(self.serv.last_mailfrom, 'Jőhn') 1259 self.assertEqual(self.serv.last_rcpttos, ['Sálly']) 1260 self.assertEqual(self.serv.last_message, m) 1261 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1262 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1263 self.assertEqual(self.serv.last_rcpt_options, []) 1264 1265 def test_send_unicode_with_SMTPUTF8_via_low_level_API(self): 1266 m = '¡a test message containing unicode!'.encode('utf-8') 1267 smtp = smtplib.SMTP( 1268 HOST, self.port, local_hostname='localhost', timeout=3) 1269 self.addCleanup(smtp.close) 1270 smtp.ehlo() 1271 self.assertEqual( 1272 smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']), 1273 (250, b'OK')) 1274 self.assertEqual(smtp.rcpt('János'), (250, b'OK')) 1275 self.assertEqual(smtp.data(m), (250, b'OK')) 1276 self.assertEqual(self.serv.last_mailfrom, 'Jő') 1277 self.assertEqual(self.serv.last_rcpttos, ['János']) 1278 self.assertEqual(self.serv.last_message, m) 1279 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1280 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1281 self.assertEqual(self.serv.last_rcpt_options, []) 1282 1283 def test_send_message_uses_smtputf8_if_addrs_non_ascii(self): 1284 msg = EmailMessage() 1285 msg['From'] = "Páolo <főo@bar.com>" 1286 msg['To'] = 'Dinsdale' 1287 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1288 # XXX I don't know why I need two \n's here, but this is an existing 1289 # bug (if it is one) and not a problem with the new functionality. 1290 msg.set_content("oh là là, know what I mean, know what I mean?\n\n") 1291 # XXX smtpd converts received /r/n to /n, so we can't easily test that 1292 # we are successfully sending /r/n :(. 1293 expected = textwrap.dedent("""\ 1294 From: Páolo <főo@bar.com> 1295 To: Dinsdale 1296 Subject: Nudge nudge, wink, wink \u1F609 1297 Content-Type: text/plain; charset="utf-8" 1298 Content-Transfer-Encoding: 8bit 1299 MIME-Version: 1.0 1300 1301 oh là là, know what I mean, know what I mean? 1302 """) 1303 smtp = smtplib.SMTP( 1304 HOST, self.port, local_hostname='localhost', timeout=3) 1305 self.addCleanup(smtp.close) 1306 self.assertEqual(smtp.send_message(msg), {}) 1307 self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com') 1308 self.assertEqual(self.serv.last_rcpttos, ['Dinsdale']) 1309 self.assertEqual(self.serv.last_message.decode(), expected) 1310 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1311 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1312 self.assertEqual(self.serv.last_rcpt_options, []) 1313 1314 1315EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='') 1316 1317class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel): 1318 def smtp_AUTH(self, arg): 1319 # RFC 4954's AUTH command allows for an optional initial-response. 1320 # Not all AUTH methods support this; some require a challenge. AUTH 1321 # PLAIN does those, so test that here. See issue #15014. 1322 args = arg.split() 1323 if args[0].lower() == 'plain': 1324 if len(args) == 2: 1325 # AUTH PLAIN <initial-response> with the response base 64 1326 # encoded. Hard code the expected response for the test. 1327 if args[1] == EXPECTED_RESPONSE: 1328 self.push('235 Ok') 1329 return 1330 self.push('571 Bad authentication') 1331 1332class SimSMTPAUTHInitialResponseServer(SimSMTPServer): 1333 channel_class = SimSMTPAUTHInitialResponseChannel 1334 1335 1336class SMTPAUTHInitialResponseSimTests(unittest.TestCase): 1337 def setUp(self): 1338 self.thread_key = threading_setup() 1339 self.real_getfqdn = socket.getfqdn 1340 socket.getfqdn = mock_socket.getfqdn 1341 self.serv_evt = threading.Event() 1342 self.client_evt = threading.Event() 1343 # Pick a random unused port by passing 0 for the port number 1344 self.serv = SimSMTPAUTHInitialResponseServer( 1345 (HOST, 0), ('nowhere', -1), decode_data=True) 1346 # Keep a note of what port was assigned 1347 self.port = self.serv.socket.getsockname()[1] 1348 serv_args = (self.serv, self.serv_evt, self.client_evt) 1349 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1350 self.thread.start() 1351 1352 # wait until server thread has assigned a port number 1353 self.serv_evt.wait() 1354 self.serv_evt.clear() 1355 1356 def tearDown(self): 1357 socket.getfqdn = self.real_getfqdn 1358 # indicate that the client is finished 1359 self.client_evt.set() 1360 # wait for the server thread to terminate 1361 self.serv_evt.wait() 1362 join_thread(self.thread) 1363 del self.thread 1364 self.doCleanups() 1365 threading_cleanup(*self.thread_key) 1366 1367 def testAUTH_PLAIN_initial_response_login(self): 1368 self.serv.add_feature('AUTH PLAIN') 1369 smtp = smtplib.SMTP(HOST, self.port, 1370 local_hostname='localhost', timeout=15) 1371 smtp.login('psu', 'doesnotexist') 1372 smtp.close() 1373 1374 def testAUTH_PLAIN_initial_response_auth(self): 1375 self.serv.add_feature('AUTH PLAIN') 1376 smtp = smtplib.SMTP(HOST, self.port, 1377 local_hostname='localhost', timeout=15) 1378 smtp.user = 'psu' 1379 smtp.password = 'doesnotexist' 1380 code, response = smtp.auth('plain', smtp.auth_plain) 1381 smtp.close() 1382 self.assertEqual(code, 235) 1383 1384 1385if __name__ == '__main__': 1386 unittest.main() 1387