• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import base64
2import email.mime.text
3from email.message import EmailMessage
4from email.base64mime import body_encode as encode_base64
5import email.utils
6import hashlib
7import hmac
8import socket
9import smtplib
10import io
11import re
12import sys
13import time
14import select
15import errno
16import textwrap
17import threading
18
19import unittest
20from test import support, mock_socket
21from test.support import hashlib_helper
22from test.support import socket_helper
23from test.support import threading_helper
24from test.support import asyncore
25from test.support import smtpd
26from unittest.mock import Mock
27
28
29support.requires_working_socket(module=True)
30
31HOST = socket_helper.HOST
32
33if sys.platform == 'darwin':
34    # select.poll returns a select.POLLHUP at the end of the tests
35    # on darwin, so just ignore it
36    def handle_expt(self):
37        pass
38    smtpd.SMTPChannel.handle_expt = handle_expt
39
40
41def server(evt, buf, serv):
42    serv.listen()
43    evt.set()
44    try:
45        conn, addr = serv.accept()
46    except TimeoutError:
47        pass
48    else:
49        n = 500
50        while buf and n > 0:
51            r, w, e = select.select([], [conn], [])
52            if w:
53                sent = conn.send(buf)
54                buf = buf[sent:]
55
56            n -= 1
57
58        conn.close()
59    finally:
60        serv.close()
61        evt.set()
62
63class GeneralTests:
64
65    def setUp(self):
66        smtplib.socket = mock_socket
67        self.port = 25
68
69    def tearDown(self):
70        smtplib.socket = socket
71
72    # This method is no longer used but is retained for backward compatibility,
73    # so test to make sure it still works.
74    def testQuoteData(self):
75        teststr  = "abc\n.jkl\rfoo\r\n..blue"
76        expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
77        self.assertEqual(expected, smtplib.quotedata(teststr))
78
79    def testBasic1(self):
80        mock_socket.reply_with(b"220 Hola mundo")
81        # connects
82        client = self.client(HOST, self.port)
83        client.close()
84
85    def testSourceAddress(self):
86        mock_socket.reply_with(b"220 Hola mundo")
87        # connects
88        client = self.client(HOST, self.port,
89                             source_address=('127.0.0.1',19876))
90        self.assertEqual(client.source_address, ('127.0.0.1', 19876))
91        client.close()
92
93    def testBasic2(self):
94        mock_socket.reply_with(b"220 Hola mundo")
95        # connects, include port in host name
96        client = self.client("%s:%s" % (HOST, self.port))
97        client.close()
98
99    def testLocalHostName(self):
100        mock_socket.reply_with(b"220 Hola mundo")
101        # check that supplied local_hostname is used
102        client = self.client(HOST, self.port, local_hostname="testhost")
103        self.assertEqual(client.local_hostname, "testhost")
104        client.close()
105
106    def testTimeoutDefault(self):
107        mock_socket.reply_with(b"220 Hola mundo")
108        self.assertIsNone(mock_socket.getdefaulttimeout())
109        mock_socket.setdefaulttimeout(30)
110        self.assertEqual(mock_socket.getdefaulttimeout(), 30)
111        try:
112            client = self.client(HOST, self.port)
113        finally:
114            mock_socket.setdefaulttimeout(None)
115        self.assertEqual(client.sock.gettimeout(), 30)
116        client.close()
117
118    def testTimeoutNone(self):
119        mock_socket.reply_with(b"220 Hola mundo")
120        self.assertIsNone(socket.getdefaulttimeout())
121        socket.setdefaulttimeout(30)
122        try:
123            client = self.client(HOST, self.port, timeout=None)
124        finally:
125            socket.setdefaulttimeout(None)
126        self.assertIsNone(client.sock.gettimeout())
127        client.close()
128
129    def testTimeoutZero(self):
130        mock_socket.reply_with(b"220 Hola mundo")
131        with self.assertRaises(ValueError):
132            self.client(HOST, self.port, timeout=0)
133
134    def testTimeoutValue(self):
135        mock_socket.reply_with(b"220 Hola mundo")
136        client = self.client(HOST, self.port, timeout=30)
137        self.assertEqual(client.sock.gettimeout(), 30)
138        client.close()
139
140    def test_debuglevel(self):
141        mock_socket.reply_with(b"220 Hello world")
142        client = self.client()
143        client.set_debuglevel(1)
144        with support.captured_stderr() as stderr:
145            client.connect(HOST, self.port)
146        client.close()
147        expected = re.compile(r"^connect:", re.MULTILINE)
148        self.assertRegex(stderr.getvalue(), expected)
149
150    def test_debuglevel_2(self):
151        mock_socket.reply_with(b"220 Hello world")
152        client = self.client()
153        client.set_debuglevel(2)
154        with support.captured_stderr() as stderr:
155            client.connect(HOST, self.port)
156        client.close()
157        expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
158                              re.MULTILINE)
159        self.assertRegex(stderr.getvalue(), expected)
160
161
162class SMTPGeneralTests(GeneralTests, unittest.TestCase):
163
164    client = smtplib.SMTP
165
166
167class LMTPGeneralTests(GeneralTests, unittest.TestCase):
168
169    client = smtplib.LMTP
170
171    @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), "test requires Unix domain socket")
172    def testUnixDomainSocketTimeoutDefault(self):
173        local_host = '/some/local/lmtp/delivery/program'
174        mock_socket.reply_with(b"220 Hello world")
175        try:
176            client = self.client(local_host, self.port)
177        finally:
178            mock_socket.setdefaulttimeout(None)
179        self.assertIsNone(client.sock.gettimeout())
180        client.close()
181
182    def testTimeoutZero(self):
183        super().testTimeoutZero()
184        local_host = '/some/local/lmtp/delivery/program'
185        with self.assertRaises(ValueError):
186            self.client(local_host, timeout=0)
187
188# Test server thread using the specified SMTP server class
189def debugging_server(serv, serv_evt, client_evt):
190    serv_evt.set()
191
192    try:
193        if hasattr(select, 'poll'):
194            poll_fun = asyncore.poll2
195        else:
196            poll_fun = asyncore.poll
197
198        n = 1000
199        while asyncore.socket_map and n > 0:
200            poll_fun(0.01, asyncore.socket_map)
201
202            # when the client conversation is finished, it will
203            # set client_evt, and it's then ok to kill the server
204            if client_evt.is_set():
205                serv.close()
206                break
207
208            n -= 1
209
210    except TimeoutError:
211        pass
212    finally:
213        if not client_evt.is_set():
214            # allow some time for the client to read the result
215            time.sleep(0.5)
216            serv.close()
217        asyncore.close_all()
218        serv_evt.set()
219
220MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
221MSG_END = '------------ END MESSAGE ------------\n'
222
223# NOTE: Some SMTP objects in the tests below are created with a non-default
224# local_hostname argument to the constructor, since (on some systems) the FQDN
225# lookup caused by the default local_hostname sometimes takes so long that the
226# test server times out, causing the test to fail.
227
228# Test behavior of smtpd.DebuggingServer
229class DebuggingServerTests(unittest.TestCase):
230
231    maxDiff = None
232
233    def setUp(self):
234        self.thread_key = threading_helper.threading_setup()
235        self.real_getfqdn = socket.getfqdn
236        socket.getfqdn = mock_socket.getfqdn
237        # temporarily replace sys.stdout to capture DebuggingServer output
238        self.old_stdout = sys.stdout
239        self.output = io.StringIO()
240        sys.stdout = self.output
241
242        self.serv_evt = threading.Event()
243        self.client_evt = threading.Event()
244        # Capture SMTPChannel debug output
245        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
246        smtpd.DEBUGSTREAM = io.StringIO()
247        # Pick a random unused port by passing 0 for the port number
248        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
249                                          decode_data=True)
250        # Keep a note of what server host and port were assigned
251        self.host, self.port = self.serv.socket.getsockname()[:2]
252        serv_args = (self.serv, self.serv_evt, self.client_evt)
253        self.thread = threading.Thread(target=debugging_server, args=serv_args)
254        self.thread.start()
255
256        # wait until server thread has assigned a port number
257        self.serv_evt.wait()
258        self.serv_evt.clear()
259
260    def tearDown(self):
261        socket.getfqdn = self.real_getfqdn
262        # indicate that the client is finished
263        self.client_evt.set()
264        # wait for the server thread to terminate
265        self.serv_evt.wait()
266        threading_helper.join_thread(self.thread)
267        # restore sys.stdout
268        sys.stdout = self.old_stdout
269        # restore DEBUGSTREAM
270        smtpd.DEBUGSTREAM.close()
271        smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
272        del self.thread
273        self.doCleanups()
274        threading_helper.threading_cleanup(*self.thread_key)
275
276    def get_output_without_xpeer(self):
277        test_output = self.output.getvalue()
278        return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2',
279                      test_output, flags=re.MULTILINE|re.DOTALL)
280
281    def testBasic(self):
282        # connect
283        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
284                            timeout=support.LOOPBACK_TIMEOUT)
285        smtp.quit()
286
287    def testSourceAddress(self):
288        # connect
289        src_port = socket_helper.find_unused_port()
290        try:
291            smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
292                                timeout=support.LOOPBACK_TIMEOUT,
293                                source_address=(self.host, src_port))
294            self.addCleanup(smtp.close)
295            self.assertEqual(smtp.source_address, (self.host, src_port))
296            self.assertEqual(smtp.local_hostname, 'localhost')
297            smtp.quit()
298        except OSError as e:
299            if e.errno == errno.EADDRINUSE:
300                self.skipTest("couldn't bind to source port %d" % src_port)
301            raise
302
303    def testNOOP(self):
304        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
305                            timeout=support.LOOPBACK_TIMEOUT)
306        self.addCleanup(smtp.close)
307        expected = (250, b'OK')
308        self.assertEqual(smtp.noop(), expected)
309        smtp.quit()
310
311    def testRSET(self):
312        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
313                            timeout=support.LOOPBACK_TIMEOUT)
314        self.addCleanup(smtp.close)
315        expected = (250, b'OK')
316        self.assertEqual(smtp.rset(), expected)
317        smtp.quit()
318
319    def testELHO(self):
320        # EHLO isn't implemented in DebuggingServer
321        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
322                            timeout=support.LOOPBACK_TIMEOUT)
323        self.addCleanup(smtp.close)
324        expected = (250, b'\nSIZE 33554432\nHELP')
325        self.assertEqual(smtp.ehlo(), expected)
326        smtp.quit()
327
328    def testEXPNNotImplemented(self):
329        # EXPN isn't implemented in DebuggingServer
330        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
331                            timeout=support.LOOPBACK_TIMEOUT)
332        self.addCleanup(smtp.close)
333        expected = (502, b'EXPN not implemented')
334        smtp.putcmd('EXPN')
335        self.assertEqual(smtp.getreply(), expected)
336        smtp.quit()
337
338    def test_issue43124_putcmd_escapes_newline(self):
339        # see: https://bugs.python.org/issue43124
340        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
341                            timeout=support.LOOPBACK_TIMEOUT)
342        self.addCleanup(smtp.close)
343        with self.assertRaises(ValueError) as exc:
344            smtp.putcmd('helo\nX-INJECTED')
345        self.assertIn("prohibited newline characters", str(exc.exception))
346        smtp.quit()
347
348    def testVRFY(self):
349        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
350                            timeout=support.LOOPBACK_TIMEOUT)
351        self.addCleanup(smtp.close)
352        expected = (252, b'Cannot VRFY user, but will accept message ' + \
353                         b'and attempt delivery')
354        self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
355        self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
356        smtp.quit()
357
358    def testSecondHELO(self):
359        # check that a second HELO returns a message that it's a duplicate
360        # (this behavior is specific to smtpd.SMTPChannel)
361        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
362                            timeout=support.LOOPBACK_TIMEOUT)
363        self.addCleanup(smtp.close)
364        smtp.helo()
365        expected = (503, b'Duplicate HELO/EHLO')
366        self.assertEqual(smtp.helo(), expected)
367        smtp.quit()
368
369    def testHELP(self):
370        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
371                            timeout=support.LOOPBACK_TIMEOUT)
372        self.addCleanup(smtp.close)
373        self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
374                                      b'RCPT DATA RSET NOOP QUIT VRFY')
375        smtp.quit()
376
377    def testSend(self):
378        # connect and send mail
379        m = 'A test message'
380        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
381                            timeout=support.LOOPBACK_TIMEOUT)
382        self.addCleanup(smtp.close)
383        smtp.sendmail('John', 'Sally', m)
384        # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
385        # in asyncore.  This sleep might help, but should really be fixed
386        # properly by using an Event variable.
387        time.sleep(0.01)
388        smtp.quit()
389
390        self.client_evt.set()
391        self.serv_evt.wait()
392        self.output.flush()
393        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
394        self.assertEqual(self.output.getvalue(), mexpect)
395
396    def testSendBinary(self):
397        m = b'A test message'
398        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
399                            timeout=support.LOOPBACK_TIMEOUT)
400        self.addCleanup(smtp.close)
401        smtp.sendmail('John', 'Sally', m)
402        # XXX (see comment in testSend)
403        time.sleep(0.01)
404        smtp.quit()
405
406        self.client_evt.set()
407        self.serv_evt.wait()
408        self.output.flush()
409        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
410        self.assertEqual(self.output.getvalue(), mexpect)
411
412    def testSendNeedingDotQuote(self):
413        # Issue 12283
414        m = '.A test\n.mes.sage.'
415        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
416                            timeout=support.LOOPBACK_TIMEOUT)
417        self.addCleanup(smtp.close)
418        smtp.sendmail('John', 'Sally', m)
419        # XXX (see comment in testSend)
420        time.sleep(0.01)
421        smtp.quit()
422
423        self.client_evt.set()
424        self.serv_evt.wait()
425        self.output.flush()
426        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
427        self.assertEqual(self.output.getvalue(), mexpect)
428
429    def test_issue43124_escape_localhostname(self):
430        # see: https://bugs.python.org/issue43124
431        # connect and send mail
432        m = 'wazzuuup\nlinetwo'
433        smtp = smtplib.SMTP(HOST, self.port, local_hostname='hi\nX-INJECTED',
434                            timeout=support.LOOPBACK_TIMEOUT)
435        self.addCleanup(smtp.close)
436        with self.assertRaises(ValueError) as exc:
437            smtp.sendmail("hi@me.com", "you@me.com", m)
438        self.assertIn(
439            "prohibited newline characters: ehlo hi\\nX-INJECTED",
440            str(exc.exception),
441        )
442        # XXX (see comment in testSend)
443        time.sleep(0.01)
444        smtp.quit()
445
446        debugout = smtpd.DEBUGSTREAM.getvalue()
447        self.assertNotIn("X-INJECTED", debugout)
448
449    def test_issue43124_escape_options(self):
450        # see: https://bugs.python.org/issue43124
451        # connect and send mail
452        m = 'wazzuuup\nlinetwo'
453        smtp = smtplib.SMTP(
454            HOST, self.port, local_hostname='localhost',
455            timeout=support.LOOPBACK_TIMEOUT)
456
457        self.addCleanup(smtp.close)
458        smtp.sendmail("hi@me.com", "you@me.com", m)
459        with self.assertRaises(ValueError) as exc:
460            smtp.mail("hi@me.com", ["X-OPTION\nX-INJECTED-1", "X-OPTION2\nX-INJECTED-2"])
461        msg = str(exc.exception)
462        self.assertIn("prohibited newline characters", msg)
463        self.assertIn("X-OPTION\\nX-INJECTED-1 X-OPTION2\\nX-INJECTED-2", msg)
464        # XXX (see comment in testSend)
465        time.sleep(0.01)
466        smtp.quit()
467
468        debugout = smtpd.DEBUGSTREAM.getvalue()
469        self.assertNotIn("X-OPTION", debugout)
470        self.assertNotIn("X-OPTION2", debugout)
471        self.assertNotIn("X-INJECTED-1", debugout)
472        self.assertNotIn("X-INJECTED-2", debugout)
473
474    def testSendNullSender(self):
475        m = 'A test message'
476        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
477                            timeout=support.LOOPBACK_TIMEOUT)
478        self.addCleanup(smtp.close)
479        smtp.sendmail('<>', 'Sally', m)
480        # XXX (see comment in testSend)
481        time.sleep(0.01)
482        smtp.quit()
483
484        self.client_evt.set()
485        self.serv_evt.wait()
486        self.output.flush()
487        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
488        self.assertEqual(self.output.getvalue(), mexpect)
489        debugout = smtpd.DEBUGSTREAM.getvalue()
490        sender = re.compile("^sender: <>$", re.MULTILINE)
491        self.assertRegex(debugout, sender)
492
493    def testSendMessage(self):
494        m = email.mime.text.MIMEText('A test message')
495        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
496                            timeout=support.LOOPBACK_TIMEOUT)
497        self.addCleanup(smtp.close)
498        smtp.send_message(m, from_addr='John', to_addrs='Sally')
499        # XXX (see comment in testSend)
500        time.sleep(0.01)
501        smtp.quit()
502
503        self.client_evt.set()
504        self.serv_evt.wait()
505        self.output.flush()
506        # Remove the X-Peer header that DebuggingServer adds as figuring out
507        # exactly what IP address format is put there is not easy (and
508        # irrelevant to our test).  Typically 127.0.0.1 or ::1, but it is
509        # not always the same as socket.gethostbyname(HOST). :(
510        test_output = self.get_output_without_xpeer()
511        del m['X-Peer']
512        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
513        self.assertEqual(test_output, mexpect)
514
515    def testSendMessageWithAddresses(self):
516        m = email.mime.text.MIMEText('A test message')
517        m['From'] = 'foo@bar.com'
518        m['To'] = 'John'
519        m['CC'] = 'Sally, Fred'
520        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
521        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
522                            timeout=support.LOOPBACK_TIMEOUT)
523        self.addCleanup(smtp.close)
524        smtp.send_message(m)
525        # XXX (see comment in testSend)
526        time.sleep(0.01)
527        smtp.quit()
528        # make sure the Bcc header is still in the message.
529        self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
530                                    '<warped@silly.walks.com>')
531
532        self.client_evt.set()
533        self.serv_evt.wait()
534        self.output.flush()
535        # Remove the X-Peer header that DebuggingServer adds.
536        test_output = self.get_output_without_xpeer()
537        del m['X-Peer']
538        # The Bcc header should not be transmitted.
539        del m['Bcc']
540        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
541        self.assertEqual(test_output, mexpect)
542        debugout = smtpd.DEBUGSTREAM.getvalue()
543        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
544        self.assertRegex(debugout, sender)
545        for addr in ('John', 'Sally', 'Fred', 'root@localhost',
546                     'warped@silly.walks.com'):
547            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
548                                 re.MULTILINE)
549            self.assertRegex(debugout, to_addr)
550
551    def testSendMessageWithSomeAddresses(self):
552        # Make sure nothing breaks if not all of the three 'to' headers exist
553        m = email.mime.text.MIMEText('A test message')
554        m['From'] = 'foo@bar.com'
555        m['To'] = 'John, Dinsdale'
556        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
557                            timeout=support.LOOPBACK_TIMEOUT)
558        self.addCleanup(smtp.close)
559        smtp.send_message(m)
560        # XXX (see comment in testSend)
561        time.sleep(0.01)
562        smtp.quit()
563
564        self.client_evt.set()
565        self.serv_evt.wait()
566        self.output.flush()
567        # Remove the X-Peer header that DebuggingServer adds.
568        test_output = self.get_output_without_xpeer()
569        del m['X-Peer']
570        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
571        self.assertEqual(test_output, mexpect)
572        debugout = smtpd.DEBUGSTREAM.getvalue()
573        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
574        self.assertRegex(debugout, sender)
575        for addr in ('John', 'Dinsdale'):
576            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
577                                 re.MULTILINE)
578            self.assertRegex(debugout, to_addr)
579
580    def testSendMessageWithSpecifiedAddresses(self):
581        # Make sure addresses specified in call override those in message.
582        m = email.mime.text.MIMEText('A test message')
583        m['From'] = 'foo@bar.com'
584        m['To'] = 'John, Dinsdale'
585        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
586                            timeout=support.LOOPBACK_TIMEOUT)
587        self.addCleanup(smtp.close)
588        smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net')
589        # XXX (see comment in testSend)
590        time.sleep(0.01)
591        smtp.quit()
592
593        self.client_evt.set()
594        self.serv_evt.wait()
595        self.output.flush()
596        # Remove the X-Peer header that DebuggingServer adds.
597        test_output = self.get_output_without_xpeer()
598        del m['X-Peer']
599        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
600        self.assertEqual(test_output, mexpect)
601        debugout = smtpd.DEBUGSTREAM.getvalue()
602        sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
603        self.assertRegex(debugout, sender)
604        for addr in ('John', 'Dinsdale'):
605            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
606                                 re.MULTILINE)
607            self.assertNotRegex(debugout, to_addr)
608        recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
609        self.assertRegex(debugout, recip)
610
611    def testSendMessageWithMultipleFrom(self):
612        # Sender overrides To
613        m = email.mime.text.MIMEText('A test message')
614        m['From'] = 'Bernard, Bianca'
615        m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
616        m['To'] = 'John, Dinsdale'
617        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
618                            timeout=support.LOOPBACK_TIMEOUT)
619        self.addCleanup(smtp.close)
620        smtp.send_message(m)
621        # XXX (see comment in testSend)
622        time.sleep(0.01)
623        smtp.quit()
624
625        self.client_evt.set()
626        self.serv_evt.wait()
627        self.output.flush()
628        # Remove the X-Peer header that DebuggingServer adds.
629        test_output = self.get_output_without_xpeer()
630        del m['X-Peer']
631        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
632        self.assertEqual(test_output, mexpect)
633        debugout = smtpd.DEBUGSTREAM.getvalue()
634        sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE)
635        self.assertRegex(debugout, sender)
636        for addr in ('John', 'Dinsdale'):
637            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
638                                 re.MULTILINE)
639            self.assertRegex(debugout, to_addr)
640
641    def testSendMessageResent(self):
642        m = email.mime.text.MIMEText('A test message')
643        m['From'] = 'foo@bar.com'
644        m['To'] = 'John'
645        m['CC'] = 'Sally, Fred'
646        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
647        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
648        m['Resent-From'] = 'holy@grail.net'
649        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
650        m['Resent-Bcc'] = 'doe@losthope.net'
651        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
652                            timeout=support.LOOPBACK_TIMEOUT)
653        self.addCleanup(smtp.close)
654        smtp.send_message(m)
655        # XXX (see comment in testSend)
656        time.sleep(0.01)
657        smtp.quit()
658
659        self.client_evt.set()
660        self.serv_evt.wait()
661        self.output.flush()
662        # The Resent-Bcc headers are deleted before serialization.
663        del m['Bcc']
664        del m['Resent-Bcc']
665        # Remove the X-Peer header that DebuggingServer adds.
666        test_output = self.get_output_without_xpeer()
667        del m['X-Peer']
668        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
669        self.assertEqual(test_output, mexpect)
670        debugout = smtpd.DEBUGSTREAM.getvalue()
671        sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
672        self.assertRegex(debugout, sender)
673        for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
674            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
675                                 re.MULTILINE)
676            self.assertRegex(debugout, to_addr)
677
678    def testSendMessageMultipleResentRaises(self):
679        m = email.mime.text.MIMEText('A test message')
680        m['From'] = 'foo@bar.com'
681        m['To'] = 'John'
682        m['CC'] = 'Sally, Fred'
683        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
684        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
685        m['Resent-From'] = 'holy@grail.net'
686        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
687        m['Resent-Bcc'] = 'doe@losthope.net'
688        m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
689        m['Resent-To'] = 'holy@grail.net'
690        m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
691        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
692                            timeout=support.LOOPBACK_TIMEOUT)
693        self.addCleanup(smtp.close)
694        with self.assertRaises(ValueError):
695            smtp.send_message(m)
696        smtp.close()
697
698class NonConnectingTests(unittest.TestCase):
699
700    def testNotConnected(self):
701        # Test various operations on an unconnected SMTP object that
702        # should raise exceptions (at present the attempt in SMTP.send
703        # to reference the nonexistent 'sock' attribute of the SMTP object
704        # causes an AttributeError)
705        smtp = smtplib.SMTP()
706        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
707        self.assertRaises(smtplib.SMTPServerDisconnected,
708                          smtp.send, 'test msg')
709
710    def testNonnumericPort(self):
711        # check that non-numeric port raises OSError
712        self.assertRaises(OSError, smtplib.SMTP,
713                          "localhost", "bogus")
714        self.assertRaises(OSError, smtplib.SMTP,
715                          "localhost:bogus")
716
717    def testSockAttributeExists(self):
718        # check that sock attribute is present outside of a connect() call
719        # (regression test, the previous behavior raised an
720        #  AttributeError: 'SMTP' object has no attribute 'sock')
721        with smtplib.SMTP() as smtp:
722            self.assertIsNone(smtp.sock)
723
724
725class DefaultArgumentsTests(unittest.TestCase):
726
727    def setUp(self):
728        self.msg = EmailMessage()
729        self.msg['From'] = 'Páolo <főo@bar.com>'
730        self.smtp = smtplib.SMTP()
731        self.smtp.ehlo = Mock(return_value=(200, 'OK'))
732        self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock()
733
734    def testSendMessage(self):
735        expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME')
736        self.smtp.send_message(self.msg)
737        self.smtp.send_message(self.msg)
738        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
739                         expected_mail_options)
740        self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3],
741                         expected_mail_options)
742
743    def testSendMessageWithMailOptions(self):
744        mail_options = ['STARTTLS']
745        expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME')
746        self.smtp.send_message(self.msg, None, None, mail_options)
747        self.assertEqual(mail_options, ['STARTTLS'])
748        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
749                         expected_mail_options)
750
751
752# test response of client to a non-successful HELO message
753class BadHELOServerTests(unittest.TestCase):
754
755    def setUp(self):
756        smtplib.socket = mock_socket
757        mock_socket.reply_with(b"199 no hello for you!")
758        self.old_stdout = sys.stdout
759        self.output = io.StringIO()
760        sys.stdout = self.output
761        self.port = 25
762
763    def tearDown(self):
764        smtplib.socket = socket
765        sys.stdout = self.old_stdout
766
767    def testFailingHELO(self):
768        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
769                            HOST, self.port, 'localhost', 3)
770
771
772class TooLongLineTests(unittest.TestCase):
773    respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
774
775    def setUp(self):
776        self.thread_key = threading_helper.threading_setup()
777        self.old_stdout = sys.stdout
778        self.output = io.StringIO()
779        sys.stdout = self.output
780
781        self.evt = threading.Event()
782        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
783        self.sock.settimeout(15)
784        self.port = socket_helper.bind_port(self.sock)
785        servargs = (self.evt, self.respdata, self.sock)
786        self.thread = threading.Thread(target=server, args=servargs)
787        self.thread.start()
788        self.evt.wait()
789        self.evt.clear()
790
791    def tearDown(self):
792        self.evt.wait()
793        sys.stdout = self.old_stdout
794        threading_helper.join_thread(self.thread)
795        del self.thread
796        self.doCleanups()
797        threading_helper.threading_cleanup(*self.thread_key)
798
799    def testLineTooLong(self):
800        self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
801                          HOST, self.port, 'localhost', 3)
802
803
804sim_users = {'Mr.A@somewhere.com':'John A',
805             'Ms.B@xn--fo-fka.com':'Sally B',
806             'Mrs.C@somewhereesle.com':'Ruth C',
807            }
808
809sim_auth = ('Mr.A@somewhere.com', 'somepassword')
810sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
811                          'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
812sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
813             'list-2':['Ms.B@xn--fo-fka.com',],
814            }
815
816# Simulated SMTP channel & server
817class ResponseException(Exception): pass
818class SimSMTPChannel(smtpd.SMTPChannel):
819
820    quit_response = None
821    mail_response = None
822    rcpt_response = None
823    data_response = None
824    rcpt_count = 0
825    rset_count = 0
826    disconnect = 0
827    AUTH = 99    # Add protocol state to enable auth testing.
828    authenticated_user = None
829
830    def __init__(self, extra_features, *args, **kw):
831        self._extrafeatures = ''.join(
832            [ "250-{0}\r\n".format(x) for x in extra_features ])
833        self.all_received_lines = []
834        super(SimSMTPChannel, self).__init__(*args, **kw)
835
836    # AUTH related stuff.  It would be nice if support for this were in smtpd.
837    def found_terminator(self):
838        if self.smtp_state == self.AUTH:
839            line = self._emptystring.join(self.received_lines)
840            print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
841            self.received_lines = []
842            try:
843                self.auth_object(line)
844            except ResponseException as e:
845                self.smtp_state = self.COMMAND
846                self.push('%s %s' % (e.smtp_code, e.smtp_error))
847            return
848        self.all_received_lines.append(self.received_lines)
849        super().found_terminator()
850
851
852    def smtp_AUTH(self, arg):
853        if not self.seen_greeting:
854            self.push('503 Error: send EHLO first')
855            return
856        if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
857            self.push('500 Error: command "AUTH" not recognized')
858            return
859        if self.authenticated_user is not None:
860            self.push(
861                '503 Bad sequence of commands: already authenticated')
862            return
863        args = arg.split()
864        if len(args) not in [1, 2]:
865            self.push('501 Syntax: AUTH <mechanism> [initial-response]')
866            return
867        auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
868        try:
869            self.auth_object = getattr(self, auth_object_name)
870        except AttributeError:
871            self.push('504 Command parameter not implemented: unsupported '
872                      ' authentication mechanism {!r}'.format(auth_object_name))
873            return
874        self.smtp_state = self.AUTH
875        self.auth_object(args[1] if len(args) == 2 else None)
876
877    def _authenticated(self, user, valid):
878        if valid:
879            self.authenticated_user = user
880            self.push('235 Authentication Succeeded')
881        else:
882            self.push('535 Authentication credentials invalid')
883        self.smtp_state = self.COMMAND
884
885    def _decode_base64(self, string):
886        return base64.decodebytes(string.encode('ascii')).decode('utf-8')
887
888    def _auth_plain(self, arg=None):
889        if arg is None:
890            self.push('334 ')
891        else:
892            logpass = self._decode_base64(arg)
893            try:
894                *_, user, password = logpass.split('\0')
895            except ValueError as e:
896                self.push('535 Splitting response {!r} into user and password'
897                          ' failed: {}'.format(logpass, e))
898                return
899            self._authenticated(user, password == sim_auth[1])
900
901    def _auth_login(self, arg=None):
902        if arg is None:
903            # base64 encoded 'Username:'
904            self.push('334 VXNlcm5hbWU6')
905        elif not hasattr(self, '_auth_login_user'):
906            self._auth_login_user = self._decode_base64(arg)
907            # base64 encoded 'Password:'
908            self.push('334 UGFzc3dvcmQ6')
909        else:
910            password = self._decode_base64(arg)
911            self._authenticated(self._auth_login_user, password == sim_auth[1])
912            del self._auth_login_user
913
914    def _auth_buggy(self, arg=None):
915        # This AUTH mechanism will 'trap' client in a neverending 334
916        # base64 encoded 'BuGgYbUgGy'
917        self.push('334 QnVHZ1liVWdHeQ==')
918
919    def _auth_cram_md5(self, arg=None):
920        if arg is None:
921            self.push('334 {}'.format(sim_cram_md5_challenge))
922        else:
923            logpass = self._decode_base64(arg)
924            try:
925                user, hashed_pass = logpass.split()
926            except ValueError as e:
927                self.push('535 Splitting response {!r} into user and password '
928                          'failed: {}'.format(logpass, e))
929                return False
930            valid_hashed_pass = hmac.HMAC(
931                sim_auth[1].encode('ascii'),
932                self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
933                'md5').hexdigest()
934            self._authenticated(user, hashed_pass == valid_hashed_pass)
935    # end AUTH related stuff.
936
937    def smtp_EHLO(self, arg):
938        resp = ('250-testhost\r\n'
939                '250-EXPN\r\n'
940                '250-SIZE 20000000\r\n'
941                '250-STARTTLS\r\n'
942                '250-DELIVERBY\r\n')
943        resp = resp + self._extrafeatures + '250 HELP'
944        self.push(resp)
945        self.seen_greeting = arg
946        self.extended_smtp = True
947
948    def smtp_VRFY(self, arg):
949        # For max compatibility smtplib should be sending the raw address.
950        if arg in sim_users:
951            self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
952        else:
953            self.push('550 No such user: %s' % arg)
954
955    def smtp_EXPN(self, arg):
956        list_name = arg.lower()
957        if list_name in sim_lists:
958            user_list = sim_lists[list_name]
959            for n, user_email in enumerate(user_list):
960                quoted_addr = smtplib.quoteaddr(user_email)
961                if n < len(user_list) - 1:
962                    self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
963                else:
964                    self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
965        else:
966            self.push('550 No access for you!')
967
968    def smtp_QUIT(self, arg):
969        if self.quit_response is None:
970            super(SimSMTPChannel, self).smtp_QUIT(arg)
971        else:
972            self.push(self.quit_response)
973            self.close_when_done()
974
975    def smtp_MAIL(self, arg):
976        if self.mail_response is None:
977            super().smtp_MAIL(arg)
978        else:
979            self.push(self.mail_response)
980            if self.disconnect:
981                self.close_when_done()
982
983    def smtp_RCPT(self, arg):
984        if self.rcpt_response is None:
985            super().smtp_RCPT(arg)
986            return
987        self.rcpt_count += 1
988        self.push(self.rcpt_response[self.rcpt_count-1])
989
990    def smtp_RSET(self, arg):
991        self.rset_count += 1
992        super().smtp_RSET(arg)
993
994    def smtp_DATA(self, arg):
995        if self.data_response is None:
996            super().smtp_DATA(arg)
997        else:
998            self.push(self.data_response)
999
1000    def handle_error(self):
1001        raise
1002
1003
1004class SimSMTPServer(smtpd.SMTPServer):
1005
1006    channel_class = SimSMTPChannel
1007
1008    def __init__(self, *args, **kw):
1009        self._extra_features = []
1010        self._addresses = {}
1011        smtpd.SMTPServer.__init__(self, *args, **kw)
1012
1013    def handle_accepted(self, conn, addr):
1014        self._SMTPchannel = self.channel_class(
1015            self._extra_features, self, conn, addr,
1016            decode_data=self._decode_data)
1017
1018    def process_message(self, peer, mailfrom, rcpttos, data):
1019        self._addresses['from'] = mailfrom
1020        self._addresses['tos'] = rcpttos
1021
1022    def add_feature(self, feature):
1023        self._extra_features.append(feature)
1024
1025    def handle_error(self):
1026        raise
1027
1028
1029# Test various SMTP & ESMTP commands/behaviors that require a simulated server
1030# (i.e., something with more features than DebuggingServer)
1031class SMTPSimTests(unittest.TestCase):
1032
1033    def setUp(self):
1034        self.thread_key = threading_helper.threading_setup()
1035        self.real_getfqdn = socket.getfqdn
1036        socket.getfqdn = mock_socket.getfqdn
1037        self.serv_evt = threading.Event()
1038        self.client_evt = threading.Event()
1039        # Pick a random unused port by passing 0 for the port number
1040        self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
1041        # Keep a note of what port was assigned
1042        self.port = self.serv.socket.getsockname()[1]
1043        serv_args = (self.serv, self.serv_evt, self.client_evt)
1044        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1045        self.thread.start()
1046
1047        # wait until server thread has assigned a port number
1048        self.serv_evt.wait()
1049        self.serv_evt.clear()
1050
1051    def tearDown(self):
1052        socket.getfqdn = self.real_getfqdn
1053        # indicate that the client is finished
1054        self.client_evt.set()
1055        # wait for the server thread to terminate
1056        self.serv_evt.wait()
1057        threading_helper.join_thread(self.thread)
1058        del self.thread
1059        self.doCleanups()
1060        threading_helper.threading_cleanup(*self.thread_key)
1061
1062    def testBasic(self):
1063        # smoke test
1064        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1065                            timeout=support.LOOPBACK_TIMEOUT)
1066        smtp.quit()
1067
1068    def testEHLO(self):
1069        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1070                            timeout=support.LOOPBACK_TIMEOUT)
1071
1072        # no features should be present before the EHLO
1073        self.assertEqual(smtp.esmtp_features, {})
1074
1075        # features expected from the test server
1076        expected_features = {'expn':'',
1077                             'size': '20000000',
1078                             'starttls': '',
1079                             'deliverby': '',
1080                             'help': '',
1081                             }
1082
1083        smtp.ehlo()
1084        self.assertEqual(smtp.esmtp_features, expected_features)
1085        for k in expected_features:
1086            self.assertTrue(smtp.has_extn(k))
1087        self.assertFalse(smtp.has_extn('unsupported-feature'))
1088        smtp.quit()
1089
1090    def testVRFY(self):
1091        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1092                            timeout=support.LOOPBACK_TIMEOUT)
1093
1094        for addr_spec, name in sim_users.items():
1095            expected_known = (250, bytes('%s %s' %
1096                                         (name, smtplib.quoteaddr(addr_spec)),
1097                                         "ascii"))
1098            self.assertEqual(smtp.vrfy(addr_spec), expected_known)
1099
1100        u = 'nobody@nowhere.com'
1101        expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
1102        self.assertEqual(smtp.vrfy(u), expected_unknown)
1103        smtp.quit()
1104
1105    def testEXPN(self):
1106        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1107                            timeout=support.LOOPBACK_TIMEOUT)
1108
1109        for listname, members in sim_lists.items():
1110            users = []
1111            for m in members:
1112                users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
1113            expected_known = (250, bytes('\n'.join(users), "ascii"))
1114            self.assertEqual(smtp.expn(listname), expected_known)
1115
1116        u = 'PSU-Members-List'
1117        expected_unknown = (550, b'No access for you!')
1118        self.assertEqual(smtp.expn(u), expected_unknown)
1119        smtp.quit()
1120
1121    def testAUTH_PLAIN(self):
1122        self.serv.add_feature("AUTH PLAIN")
1123        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1124                            timeout=support.LOOPBACK_TIMEOUT)
1125        resp = smtp.login(sim_auth[0], sim_auth[1])
1126        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1127        smtp.close()
1128
1129    def testAUTH_LOGIN(self):
1130        self.serv.add_feature("AUTH LOGIN")
1131        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1132                            timeout=support.LOOPBACK_TIMEOUT)
1133        resp = smtp.login(sim_auth[0], sim_auth[1])
1134        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1135        smtp.close()
1136
1137    def testAUTH_LOGIN_initial_response_ok(self):
1138        self.serv.add_feature("AUTH LOGIN")
1139        with smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1140                          timeout=support.LOOPBACK_TIMEOUT) as smtp:
1141            smtp.user, smtp.password = sim_auth
1142            smtp.ehlo("test_auth_login")
1143            resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=True)
1144            self.assertEqual(resp, (235, b'Authentication Succeeded'))
1145
1146    def testAUTH_LOGIN_initial_response_notok(self):
1147        self.serv.add_feature("AUTH LOGIN")
1148        with smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1149                          timeout=support.LOOPBACK_TIMEOUT) as smtp:
1150            smtp.user, smtp.password = sim_auth
1151            smtp.ehlo("test_auth_login")
1152            resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=False)
1153            self.assertEqual(resp, (235, b'Authentication Succeeded'))
1154
1155    def testAUTH_BUGGY(self):
1156        self.serv.add_feature("AUTH BUGGY")
1157
1158        def auth_buggy(challenge=None):
1159            self.assertEqual(b"BuGgYbUgGy", challenge)
1160            return "\0"
1161
1162        smtp = smtplib.SMTP(
1163            HOST, self.port, local_hostname='localhost',
1164            timeout=support.LOOPBACK_TIMEOUT
1165        )
1166        try:
1167            smtp.user, smtp.password = sim_auth
1168            smtp.ehlo("test_auth_buggy")
1169            expect = r"^Server AUTH mechanism infinite loop.*"
1170            with self.assertRaisesRegex(smtplib.SMTPException, expect) as cm:
1171                smtp.auth("BUGGY", auth_buggy, initial_response_ok=False)
1172        finally:
1173            smtp.close()
1174
1175    @hashlib_helper.requires_hashdigest('md5', openssl=True)
1176    def testAUTH_CRAM_MD5(self):
1177        self.serv.add_feature("AUTH CRAM-MD5")
1178        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1179                            timeout=support.LOOPBACK_TIMEOUT)
1180        resp = smtp.login(sim_auth[0], sim_auth[1])
1181        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1182        smtp.close()
1183
1184    @hashlib_helper.requires_hashdigest('md5', openssl=True)
1185    def testAUTH_multiple(self):
1186        # Test that multiple authentication methods are tried.
1187        self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
1188        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1189                            timeout=support.LOOPBACK_TIMEOUT)
1190        resp = smtp.login(sim_auth[0], sim_auth[1])
1191        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1192        smtp.close()
1193
1194    def test_auth_function(self):
1195        supported = {'PLAIN', 'LOGIN'}
1196        try:
1197            hashlib.md5()
1198        except ValueError:
1199            pass
1200        else:
1201            supported.add('CRAM-MD5')
1202        for mechanism in supported:
1203            self.serv.add_feature("AUTH {}".format(mechanism))
1204        for mechanism in supported:
1205            with self.subTest(mechanism=mechanism):
1206                smtp = smtplib.SMTP(HOST, self.port,
1207                                    local_hostname='localhost',
1208                                    timeout=support.LOOPBACK_TIMEOUT)
1209                smtp.ehlo('foo')
1210                smtp.user, smtp.password = sim_auth[0], sim_auth[1]
1211                method = 'auth_' + mechanism.lower().replace('-', '_')
1212                resp = smtp.auth(mechanism, getattr(smtp, method))
1213                self.assertEqual(resp, (235, b'Authentication Succeeded'))
1214                smtp.close()
1215
1216    def test_quit_resets_greeting(self):
1217        smtp = smtplib.SMTP(HOST, self.port,
1218                            local_hostname='localhost',
1219                            timeout=support.LOOPBACK_TIMEOUT)
1220        code, message = smtp.ehlo()
1221        self.assertEqual(code, 250)
1222        self.assertIn('size', smtp.esmtp_features)
1223        smtp.quit()
1224        self.assertNotIn('size', smtp.esmtp_features)
1225        smtp.connect(HOST, self.port)
1226        self.assertNotIn('size', smtp.esmtp_features)
1227        smtp.ehlo_or_helo_if_needed()
1228        self.assertIn('size', smtp.esmtp_features)
1229        smtp.quit()
1230
1231    def test_with_statement(self):
1232        with smtplib.SMTP(HOST, self.port) as smtp:
1233            code, message = smtp.noop()
1234            self.assertEqual(code, 250)
1235        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1236        with smtplib.SMTP(HOST, self.port) as smtp:
1237            smtp.close()
1238        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1239
1240    def test_with_statement_QUIT_failure(self):
1241        with self.assertRaises(smtplib.SMTPResponseException) as error:
1242            with smtplib.SMTP(HOST, self.port) as smtp:
1243                smtp.noop()
1244                self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
1245        self.assertEqual(error.exception.smtp_code, 421)
1246        self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
1247
1248    #TODO: add tests for correct AUTH method fallback now that the
1249    #test infrastructure can support it.
1250
1251    # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
1252    def test__rest_from_mail_cmd(self):
1253        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1254                            timeout=support.LOOPBACK_TIMEOUT)
1255        smtp.noop()
1256        self.serv._SMTPchannel.mail_response = '451 Requested action aborted'
1257        self.serv._SMTPchannel.disconnect = True
1258        with self.assertRaises(smtplib.SMTPSenderRefused):
1259            smtp.sendmail('John', 'Sally', 'test message')
1260        self.assertIsNone(smtp.sock)
1261
1262    # Issue 5713: make sure close, not rset, is called if we get a 421 error
1263    def test_421_from_mail_cmd(self):
1264        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1265                            timeout=support.LOOPBACK_TIMEOUT)
1266        smtp.noop()
1267        self.serv._SMTPchannel.mail_response = '421 closing connection'
1268        with self.assertRaises(smtplib.SMTPSenderRefused):
1269            smtp.sendmail('John', 'Sally', 'test message')
1270        self.assertIsNone(smtp.sock)
1271        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1272
1273    def test_421_from_rcpt_cmd(self):
1274        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1275                            timeout=support.LOOPBACK_TIMEOUT)
1276        smtp.noop()
1277        self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing']
1278        with self.assertRaises(smtplib.SMTPRecipientsRefused) as r:
1279            smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message')
1280        self.assertIsNone(smtp.sock)
1281        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1282        self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')})
1283
1284    def test_421_from_data_cmd(self):
1285        class MySimSMTPChannel(SimSMTPChannel):
1286            def found_terminator(self):
1287                if self.smtp_state == self.DATA:
1288                    self.push('421 closing')
1289                else:
1290                    super().found_terminator()
1291        self.serv.channel_class = MySimSMTPChannel
1292        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1293                            timeout=support.LOOPBACK_TIMEOUT)
1294        smtp.noop()
1295        with self.assertRaises(smtplib.SMTPDataError):
1296            smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message')
1297        self.assertIsNone(smtp.sock)
1298        self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
1299
1300    def test_smtputf8_NotSupportedError_if_no_server_support(self):
1301        smtp = smtplib.SMTP(
1302            HOST, self.port, local_hostname='localhost',
1303            timeout=support.LOOPBACK_TIMEOUT)
1304        self.addCleanup(smtp.close)
1305        smtp.ehlo()
1306        self.assertTrue(smtp.does_esmtp)
1307        self.assertFalse(smtp.has_extn('smtputf8'))
1308        self.assertRaises(
1309            smtplib.SMTPNotSupportedError,
1310            smtp.sendmail,
1311            'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1312        self.assertRaises(
1313            smtplib.SMTPNotSupportedError,
1314            smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
1315
1316    def test_send_unicode_without_SMTPUTF8(self):
1317        smtp = smtplib.SMTP(
1318            HOST, self.port, local_hostname='localhost',
1319            timeout=support.LOOPBACK_TIMEOUT)
1320        self.addCleanup(smtp.close)
1321        self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
1322        self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
1323
1324    def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
1325        # This test is located here and not in the SMTPUTF8SimTests
1326        # class because it needs a "regular" SMTP server to work
1327        msg = EmailMessage()
1328        msg['From'] = "Páolo <főo@bar.com>"
1329        msg['To'] = 'Dinsdale'
1330        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1331        smtp = smtplib.SMTP(
1332            HOST, self.port, local_hostname='localhost',
1333            timeout=support.LOOPBACK_TIMEOUT)
1334        self.addCleanup(smtp.close)
1335        with self.assertRaises(smtplib.SMTPNotSupportedError):
1336            smtp.send_message(msg)
1337
1338    def test_name_field_not_included_in_envelop_addresses(self):
1339        smtp = smtplib.SMTP(
1340            HOST, self.port, local_hostname='localhost',
1341            timeout=support.LOOPBACK_TIMEOUT)
1342        self.addCleanup(smtp.close)
1343
1344        message = EmailMessage()
1345        message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com'))
1346        message['To'] = email.utils.formataddr(('René', 'rene@example.com'))
1347
1348        self.assertDictEqual(smtp.send_message(message), {})
1349
1350        self.assertEqual(self.serv._addresses['from'], 'michael@example.com')
1351        self.assertEqual(self.serv._addresses['tos'], ['rene@example.com'])
1352
1353    def test_lowercase_mail_from_rcpt_to(self):
1354        m = 'A test message'
1355        smtp = smtplib.SMTP(
1356            HOST, self.port, local_hostname='localhost',
1357            timeout=support.LOOPBACK_TIMEOUT)
1358        self.addCleanup(smtp.close)
1359
1360        smtp.sendmail('John', 'Sally', m)
1361
1362        self.assertIn(['mail from:<John> size=14'], self.serv._SMTPchannel.all_received_lines)
1363        self.assertIn(['rcpt to:<Sally>'], self.serv._SMTPchannel.all_received_lines)
1364
1365
1366class SimSMTPUTF8Server(SimSMTPServer):
1367
1368    def __init__(self, *args, **kw):
1369        # The base SMTP server turns these on automatically, but our test
1370        # server is set up to munge the EHLO response, so we need to provide
1371        # them as well.  And yes, the call is to SMTPServer not SimSMTPServer.
1372        self._extra_features = ['SMTPUTF8', '8BITMIME']
1373        smtpd.SMTPServer.__init__(self, *args, **kw)
1374
1375    def handle_accepted(self, conn, addr):
1376        self._SMTPchannel = self.channel_class(
1377            self._extra_features, self, conn, addr,
1378            decode_data=self._decode_data,
1379            enable_SMTPUTF8=self.enable_SMTPUTF8,
1380        )
1381
1382    def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
1383                                                             rcpt_options=None):
1384        self.last_peer = peer
1385        self.last_mailfrom = mailfrom
1386        self.last_rcpttos = rcpttos
1387        self.last_message = data
1388        self.last_mail_options = mail_options
1389        self.last_rcpt_options = rcpt_options
1390
1391
1392class SMTPUTF8SimTests(unittest.TestCase):
1393
1394    maxDiff = None
1395
1396    def setUp(self):
1397        self.thread_key = threading_helper.threading_setup()
1398        self.real_getfqdn = socket.getfqdn
1399        socket.getfqdn = mock_socket.getfqdn
1400        self.serv_evt = threading.Event()
1401        self.client_evt = threading.Event()
1402        # Pick a random unused port by passing 0 for the port number
1403        self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
1404                                      decode_data=False,
1405                                      enable_SMTPUTF8=True)
1406        # Keep a note of what port was assigned
1407        self.port = self.serv.socket.getsockname()[1]
1408        serv_args = (self.serv, self.serv_evt, self.client_evt)
1409        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1410        self.thread.start()
1411
1412        # wait until server thread has assigned a port number
1413        self.serv_evt.wait()
1414        self.serv_evt.clear()
1415
1416    def tearDown(self):
1417        socket.getfqdn = self.real_getfqdn
1418        # indicate that the client is finished
1419        self.client_evt.set()
1420        # wait for the server thread to terminate
1421        self.serv_evt.wait()
1422        threading_helper.join_thread(self.thread)
1423        del self.thread
1424        self.doCleanups()
1425        threading_helper.threading_cleanup(*self.thread_key)
1426
1427    def test_test_server_supports_extensions(self):
1428        smtp = smtplib.SMTP(
1429            HOST, self.port, local_hostname='localhost',
1430            timeout=support.LOOPBACK_TIMEOUT)
1431        self.addCleanup(smtp.close)
1432        smtp.ehlo()
1433        self.assertTrue(smtp.does_esmtp)
1434        self.assertTrue(smtp.has_extn('smtputf8'))
1435
1436    def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
1437        m = '¡a test message containing unicode!'.encode('utf-8')
1438        smtp = smtplib.SMTP(
1439            HOST, self.port, local_hostname='localhost',
1440            timeout=support.LOOPBACK_TIMEOUT)
1441        self.addCleanup(smtp.close)
1442        smtp.sendmail('Jőhn', 'Sálly', m,
1443                      mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1444        self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
1445        self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
1446        self.assertEqual(self.serv.last_message, m)
1447        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1448        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1449        self.assertEqual(self.serv.last_rcpt_options, [])
1450
1451    def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
1452        m = '¡a test message containing unicode!'.encode('utf-8')
1453        smtp = smtplib.SMTP(
1454            HOST, self.port, local_hostname='localhost',
1455            timeout=support.LOOPBACK_TIMEOUT)
1456        self.addCleanup(smtp.close)
1457        smtp.ehlo()
1458        self.assertEqual(
1459            smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']),
1460            (250, b'OK'))
1461        self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
1462        self.assertEqual(smtp.data(m), (250, b'OK'))
1463        self.assertEqual(self.serv.last_mailfrom, 'Jő')
1464        self.assertEqual(self.serv.last_rcpttos, ['János'])
1465        self.assertEqual(self.serv.last_message, m)
1466        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1467        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1468        self.assertEqual(self.serv.last_rcpt_options, [])
1469
1470    def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
1471        msg = EmailMessage()
1472        msg['From'] = "Páolo <főo@bar.com>"
1473        msg['To'] = 'Dinsdale'
1474        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1475        # XXX I don't know why I need two \n's here, but this is an existing
1476        # bug (if it is one) and not a problem with the new functionality.
1477        msg.set_content("oh là là, know what I mean, know what I mean?\n\n")
1478        # XXX smtpd converts received /r/n to /n, so we can't easily test that
1479        # we are successfully sending /r/n :(.
1480        expected = textwrap.dedent("""\
1481            From: Páolo <főo@bar.com>
1482            To: Dinsdale
1483            Subject: Nudge nudge, wink, wink \u1F609
1484            Content-Type: text/plain; charset="utf-8"
1485            Content-Transfer-Encoding: 8bit
1486            MIME-Version: 1.0
1487
1488            oh là là, know what I mean, know what I mean?
1489            """)
1490        smtp = smtplib.SMTP(
1491            HOST, self.port, local_hostname='localhost',
1492            timeout=support.LOOPBACK_TIMEOUT)
1493        self.addCleanup(smtp.close)
1494        self.assertEqual(smtp.send_message(msg), {})
1495        self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com')
1496        self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
1497        self.assertEqual(self.serv.last_message.decode(), expected)
1498        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1499        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1500        self.assertEqual(self.serv.last_rcpt_options, [])
1501
1502
1503EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
1504
1505class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel):
1506    def smtp_AUTH(self, arg):
1507        # RFC 4954's AUTH command allows for an optional initial-response.
1508        # Not all AUTH methods support this; some require a challenge.  AUTH
1509        # PLAIN does those, so test that here.  See issue #15014.
1510        args = arg.split()
1511        if args[0].lower() == 'plain':
1512            if len(args) == 2:
1513                # AUTH PLAIN <initial-response> with the response base 64
1514                # encoded.  Hard code the expected response for the test.
1515                if args[1] == EXPECTED_RESPONSE:
1516                    self.push('235 Ok')
1517                    return
1518        self.push('571 Bad authentication')
1519
1520class SimSMTPAUTHInitialResponseServer(SimSMTPServer):
1521    channel_class = SimSMTPAUTHInitialResponseChannel
1522
1523
1524class SMTPAUTHInitialResponseSimTests(unittest.TestCase):
1525    def setUp(self):
1526        self.thread_key = threading_helper.threading_setup()
1527        self.real_getfqdn = socket.getfqdn
1528        socket.getfqdn = mock_socket.getfqdn
1529        self.serv_evt = threading.Event()
1530        self.client_evt = threading.Event()
1531        # Pick a random unused port by passing 0 for the port number
1532        self.serv = SimSMTPAUTHInitialResponseServer(
1533            (HOST, 0), ('nowhere', -1), decode_data=True)
1534        # Keep a note of what port was assigned
1535        self.port = self.serv.socket.getsockname()[1]
1536        serv_args = (self.serv, self.serv_evt, self.client_evt)
1537        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1538        self.thread.start()
1539
1540        # wait until server thread has assigned a port number
1541        self.serv_evt.wait()
1542        self.serv_evt.clear()
1543
1544    def tearDown(self):
1545        socket.getfqdn = self.real_getfqdn
1546        # indicate that the client is finished
1547        self.client_evt.set()
1548        # wait for the server thread to terminate
1549        self.serv_evt.wait()
1550        threading_helper.join_thread(self.thread)
1551        del self.thread
1552        self.doCleanups()
1553        threading_helper.threading_cleanup(*self.thread_key)
1554
1555    def testAUTH_PLAIN_initial_response_login(self):
1556        self.serv.add_feature('AUTH PLAIN')
1557        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1558                            timeout=support.LOOPBACK_TIMEOUT)
1559        smtp.login('psu', 'doesnotexist')
1560        smtp.close()
1561
1562    def testAUTH_PLAIN_initial_response_auth(self):
1563        self.serv.add_feature('AUTH PLAIN')
1564        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1565                            timeout=support.LOOPBACK_TIMEOUT)
1566        smtp.user = 'psu'
1567        smtp.password = 'doesnotexist'
1568        code, response = smtp.auth('plain', smtp.auth_plain)
1569        smtp.close()
1570        self.assertEqual(code, 235)
1571
1572
1573if __name__ == '__main__':
1574    unittest.main()
1575