• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import asyncore
2import base64
3import email.mime.text
4from email.message import EmailMessage
5from email.base64mime import body_encode as encode_base64
6import email.utils
7import hmac
8import socket
9import smtpd
10import smtplib
11import io
12import re
13import sys
14import time
15import select
16import errno
17import textwrap
18import threading
19
20import unittest
21from test import support, mock_socket
22from test.support import HOST, HOSTv4, HOSTv6
23from unittest.mock import Mock
24
25
26if sys.platform == 'darwin':
27    # select.poll returns a select.POLLHUP at the end of the tests
28    # on darwin, so just ignore it
29    def handle_expt(self):
30        pass
31    smtpd.SMTPChannel.handle_expt = handle_expt
32
33
34def server(evt, buf, serv):
35    serv.listen()
36    evt.set()
37    try:
38        conn, addr = serv.accept()
39    except socket.timeout:
40        pass
41    else:
42        n = 500
43        while buf and n > 0:
44            r, w, e = select.select([], [conn], [])
45            if w:
46                sent = conn.send(buf)
47                buf = buf[sent:]
48
49            n -= 1
50
51        conn.close()
52    finally:
53        serv.close()
54        evt.set()
55
56class GeneralTests(unittest.TestCase):
57
58    def setUp(self):
59        smtplib.socket = mock_socket
60        self.port = 25
61
62    def tearDown(self):
63        smtplib.socket = socket
64
65    # This method is no longer used but is retained for backward compatibility,
66    # so test to make sure it still works.
67    def testQuoteData(self):
68        teststr  = "abc\n.jkl\rfoo\r\n..blue"
69        expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
70        self.assertEqual(expected, smtplib.quotedata(teststr))
71
72    def testBasic1(self):
73        mock_socket.reply_with(b"220 Hola mundo")
74        # connects
75        smtp = smtplib.SMTP(HOST, self.port)
76        smtp.close()
77
78    def testSourceAddress(self):
79        mock_socket.reply_with(b"220 Hola mundo")
80        # connects
81        smtp = smtplib.SMTP(HOST, self.port,
82                source_address=('127.0.0.1',19876))
83        self.assertEqual(smtp.source_address, ('127.0.0.1', 19876))
84        smtp.close()
85
86    def testBasic2(self):
87        mock_socket.reply_with(b"220 Hola mundo")
88        # connects, include port in host name
89        smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))
90        smtp.close()
91
92    def testLocalHostName(self):
93        mock_socket.reply_with(b"220 Hola mundo")
94        # check that supplied local_hostname is used
95        smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")
96        self.assertEqual(smtp.local_hostname, "testhost")
97        smtp.close()
98
99    def testTimeoutDefault(self):
100        mock_socket.reply_with(b"220 Hola mundo")
101        self.assertIsNone(mock_socket.getdefaulttimeout())
102        mock_socket.setdefaulttimeout(30)
103        self.assertEqual(mock_socket.getdefaulttimeout(), 30)
104        try:
105            smtp = smtplib.SMTP(HOST, self.port)
106        finally:
107            mock_socket.setdefaulttimeout(None)
108        self.assertEqual(smtp.sock.gettimeout(), 30)
109        smtp.close()
110
111    def testTimeoutNone(self):
112        mock_socket.reply_with(b"220 Hola mundo")
113        self.assertIsNone(socket.getdefaulttimeout())
114        socket.setdefaulttimeout(30)
115        try:
116            smtp = smtplib.SMTP(HOST, self.port, timeout=None)
117        finally:
118            socket.setdefaulttimeout(None)
119        self.assertIsNone(smtp.sock.gettimeout())
120        smtp.close()
121
122    def testTimeoutValue(self):
123        mock_socket.reply_with(b"220 Hola mundo")
124        smtp = smtplib.SMTP(HOST, self.port, timeout=30)
125        self.assertEqual(smtp.sock.gettimeout(), 30)
126        smtp.close()
127
128    def test_debuglevel(self):
129        mock_socket.reply_with(b"220 Hello world")
130        smtp = smtplib.SMTP()
131        smtp.set_debuglevel(1)
132        with support.captured_stderr() as stderr:
133            smtp.connect(HOST, self.port)
134        smtp.close()
135        expected = re.compile(r"^connect:", re.MULTILINE)
136        self.assertRegex(stderr.getvalue(), expected)
137
138    def test_debuglevel_2(self):
139        mock_socket.reply_with(b"220 Hello world")
140        smtp = smtplib.SMTP()
141        smtp.set_debuglevel(2)
142        with support.captured_stderr() as stderr:
143            smtp.connect(HOST, self.port)
144        smtp.close()
145        expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
146                              re.MULTILINE)
147        self.assertRegex(stderr.getvalue(), expected)
148
149
150# Test server thread using the specified SMTP server class
151def debugging_server(serv, serv_evt, client_evt):
152    serv_evt.set()
153
154    try:
155        if hasattr(select, 'poll'):
156            poll_fun = asyncore.poll2
157        else:
158            poll_fun = asyncore.poll
159
160        n = 1000
161        while asyncore.socket_map and n > 0:
162            poll_fun(0.01, asyncore.socket_map)
163
164            # when the client conversation is finished, it will
165            # set client_evt, and it's then ok to kill the server
166            if client_evt.is_set():
167                serv.close()
168                break
169
170            n -= 1
171
172    except socket.timeout:
173        pass
174    finally:
175        if not client_evt.is_set():
176            # allow some time for the client to read the result
177            time.sleep(0.5)
178            serv.close()
179        asyncore.close_all()
180        serv_evt.set()
181
182MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
183MSG_END = '------------ END MESSAGE ------------\n'
184
185# NOTE: Some SMTP objects in the tests below are created with a non-default
186# local_hostname argument to the constructor, since (on some systems) the FQDN
187# lookup caused by the default local_hostname sometimes takes so long that the
188# test server times out, causing the test to fail.
189
190# Test behavior of smtpd.DebuggingServer
191class DebuggingServerTests(unittest.TestCase):
192
193    maxDiff = None
194
195    def setUp(self):
196        self.real_getfqdn = socket.getfqdn
197        socket.getfqdn = mock_socket.getfqdn
198        # temporarily replace sys.stdout to capture DebuggingServer output
199        self.old_stdout = sys.stdout
200        self.output = io.StringIO()
201        sys.stdout = self.output
202
203        self.serv_evt = threading.Event()
204        self.client_evt = threading.Event()
205        # Capture SMTPChannel debug output
206        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
207        smtpd.DEBUGSTREAM = io.StringIO()
208        # Pick a random unused port by passing 0 for the port number
209        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
210                                          decode_data=True)
211        # Keep a note of what server host and port were assigned
212        self.host, self.port = self.serv.socket.getsockname()[:2]
213        serv_args = (self.serv, self.serv_evt, self.client_evt)
214        self.thread = threading.Thread(target=debugging_server, args=serv_args)
215        self.thread.start()
216
217        # wait until server thread has assigned a port number
218        self.serv_evt.wait()
219        self.serv_evt.clear()
220
221    def tearDown(self):
222        socket.getfqdn = self.real_getfqdn
223        # indicate that the client is finished
224        self.client_evt.set()
225        # wait for the server thread to terminate
226        self.serv_evt.wait()
227        self.thread.join()
228        # restore sys.stdout
229        sys.stdout = self.old_stdout
230        # restore DEBUGSTREAM
231        smtpd.DEBUGSTREAM.close()
232        smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
233
234    def get_output_without_xpeer(self):
235        test_output = self.output.getvalue()
236        return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2',
237                      test_output, flags=re.MULTILINE|re.DOTALL)
238
239    def testBasic(self):
240        # connect
241        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
242        smtp.quit()
243
244    def testSourceAddress(self):
245        # connect
246        src_port = support.find_unused_port()
247        try:
248            smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
249                                timeout=3, source_address=(self.host, src_port))
250            self.assertEqual(smtp.source_address, (self.host, src_port))
251            self.assertEqual(smtp.local_hostname, 'localhost')
252            smtp.quit()
253        except OSError as e:
254            if e.errno == errno.EADDRINUSE:
255                self.skipTest("couldn't bind to source port %d" % src_port)
256            raise
257
258    def testNOOP(self):
259        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
260        expected = (250, b'OK')
261        self.assertEqual(smtp.noop(), expected)
262        smtp.quit()
263
264    def testRSET(self):
265        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
266        expected = (250, b'OK')
267        self.assertEqual(smtp.rset(), expected)
268        smtp.quit()
269
270    def testELHO(self):
271        # EHLO isn't implemented in DebuggingServer
272        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
273        expected = (250, b'\nSIZE 33554432\nHELP')
274        self.assertEqual(smtp.ehlo(), expected)
275        smtp.quit()
276
277    def testEXPNNotImplemented(self):
278        # EXPN isn't implemented in DebuggingServer
279        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
280        expected = (502, b'EXPN not implemented')
281        smtp.putcmd('EXPN')
282        self.assertEqual(smtp.getreply(), expected)
283        smtp.quit()
284
285    def testVRFY(self):
286        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
287        expected = (252, b'Cannot VRFY user, but will accept message ' + \
288                         b'and attempt delivery')
289        self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
290        self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
291        smtp.quit()
292
293    def testSecondHELO(self):
294        # check that a second HELO returns a message that it's a duplicate
295        # (this behavior is specific to smtpd.SMTPChannel)
296        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
297        smtp.helo()
298        expected = (503, b'Duplicate HELO/EHLO')
299        self.assertEqual(smtp.helo(), expected)
300        smtp.quit()
301
302    def testHELP(self):
303        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
304        self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
305                                      b'RCPT DATA RSET NOOP QUIT VRFY')
306        smtp.quit()
307
308    def testSend(self):
309        # connect and send mail
310        m = 'A test message'
311        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
312        smtp.sendmail('John', 'Sally', m)
313        # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
314        # in asyncore.  This sleep might help, but should really be fixed
315        # properly by using an Event variable.
316        time.sleep(0.01)
317        smtp.quit()
318
319        self.client_evt.set()
320        self.serv_evt.wait()
321        self.output.flush()
322        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
323        self.assertEqual(self.output.getvalue(), mexpect)
324
325    def testSendBinary(self):
326        m = b'A test message'
327        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
328        smtp.sendmail('John', 'Sally', m)
329        # XXX (see comment in testSend)
330        time.sleep(0.01)
331        smtp.quit()
332
333        self.client_evt.set()
334        self.serv_evt.wait()
335        self.output.flush()
336        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
337        self.assertEqual(self.output.getvalue(), mexpect)
338
339    def testSendNeedingDotQuote(self):
340        # Issue 12283
341        m = '.A test\n.mes.sage.'
342        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
343        smtp.sendmail('John', 'Sally', m)
344        # XXX (see comment in testSend)
345        time.sleep(0.01)
346        smtp.quit()
347
348        self.client_evt.set()
349        self.serv_evt.wait()
350        self.output.flush()
351        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
352        self.assertEqual(self.output.getvalue(), mexpect)
353
354    def testSendNullSender(self):
355        m = 'A test message'
356        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
357        smtp.sendmail('<>', 'Sally', m)
358        # XXX (see comment in testSend)
359        time.sleep(0.01)
360        smtp.quit()
361
362        self.client_evt.set()
363        self.serv_evt.wait()
364        self.output.flush()
365        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
366        self.assertEqual(self.output.getvalue(), mexpect)
367        debugout = smtpd.DEBUGSTREAM.getvalue()
368        sender = re.compile("^sender: <>$", re.MULTILINE)
369        self.assertRegex(debugout, sender)
370
371    def testSendMessage(self):
372        m = email.mime.text.MIMEText('A test message')
373        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
374        smtp.send_message(m, from_addr='John', to_addrs='Sally')
375        # XXX (see comment in testSend)
376        time.sleep(0.01)
377        smtp.quit()
378
379        self.client_evt.set()
380        self.serv_evt.wait()
381        self.output.flush()
382        # Remove the X-Peer header that DebuggingServer adds as figuring out
383        # exactly what IP address format is put there is not easy (and
384        # irrelevant to our test).  Typically 127.0.0.1 or ::1, but it is
385        # not always the same as socket.gethostbyname(HOST). :(
386        test_output = self.get_output_without_xpeer()
387        del m['X-Peer']
388        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
389        self.assertEqual(test_output, mexpect)
390
391    def testSendMessageWithAddresses(self):
392        m = email.mime.text.MIMEText('A test message')
393        m['From'] = 'foo@bar.com'
394        m['To'] = 'John'
395        m['CC'] = 'Sally, Fred'
396        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
397        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
398        smtp.send_message(m)
399        # XXX (see comment in testSend)
400        time.sleep(0.01)
401        smtp.quit()
402        # make sure the Bcc header is still in the message.
403        self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
404                                    '<warped@silly.walks.com>')
405
406        self.client_evt.set()
407        self.serv_evt.wait()
408        self.output.flush()
409        # Remove the X-Peer header that DebuggingServer adds.
410        test_output = self.get_output_without_xpeer()
411        del m['X-Peer']
412        # The Bcc header should not be transmitted.
413        del m['Bcc']
414        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
415        self.assertEqual(test_output, mexpect)
416        debugout = smtpd.DEBUGSTREAM.getvalue()
417        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
418        self.assertRegex(debugout, sender)
419        for addr in ('John', 'Sally', 'Fred', 'root@localhost',
420                     'warped@silly.walks.com'):
421            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
422                                 re.MULTILINE)
423            self.assertRegex(debugout, to_addr)
424
425    def testSendMessageWithSomeAddresses(self):
426        # Make sure nothing breaks if not all of the three 'to' headers exist
427        m = email.mime.text.MIMEText('A test message')
428        m['From'] = 'foo@bar.com'
429        m['To'] = 'John, Dinsdale'
430        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
431        smtp.send_message(m)
432        # XXX (see comment in testSend)
433        time.sleep(0.01)
434        smtp.quit()
435
436        self.client_evt.set()
437        self.serv_evt.wait()
438        self.output.flush()
439        # Remove the X-Peer header that DebuggingServer adds.
440        test_output = self.get_output_without_xpeer()
441        del m['X-Peer']
442        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
443        self.assertEqual(test_output, mexpect)
444        debugout = smtpd.DEBUGSTREAM.getvalue()
445        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
446        self.assertRegex(debugout, sender)
447        for addr in ('John', 'Dinsdale'):
448            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
449                                 re.MULTILINE)
450            self.assertRegex(debugout, to_addr)
451
452    def testSendMessageWithSpecifiedAddresses(self):
453        # Make sure addresses specified in call override those in message.
454        m = email.mime.text.MIMEText('A test message')
455        m['From'] = 'foo@bar.com'
456        m['To'] = 'John, Dinsdale'
457        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
458        smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net')
459        # XXX (see comment in testSend)
460        time.sleep(0.01)
461        smtp.quit()
462
463        self.client_evt.set()
464        self.serv_evt.wait()
465        self.output.flush()
466        # Remove the X-Peer header that DebuggingServer adds.
467        test_output = self.get_output_without_xpeer()
468        del m['X-Peer']
469        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
470        self.assertEqual(test_output, mexpect)
471        debugout = smtpd.DEBUGSTREAM.getvalue()
472        sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
473        self.assertRegex(debugout, sender)
474        for addr in ('John', 'Dinsdale'):
475            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
476                                 re.MULTILINE)
477            self.assertNotRegex(debugout, to_addr)
478        recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
479        self.assertRegex(debugout, recip)
480
481    def testSendMessageWithMultipleFrom(self):
482        # Sender overrides To
483        m = email.mime.text.MIMEText('A test message')
484        m['From'] = 'Bernard, Bianca'
485        m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
486        m['To'] = 'John, Dinsdale'
487        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
488        smtp.send_message(m)
489        # XXX (see comment in testSend)
490        time.sleep(0.01)
491        smtp.quit()
492
493        self.client_evt.set()
494        self.serv_evt.wait()
495        self.output.flush()
496        # Remove the X-Peer header that DebuggingServer adds.
497        test_output = self.get_output_without_xpeer()
498        del m['X-Peer']
499        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
500        self.assertEqual(test_output, mexpect)
501        debugout = smtpd.DEBUGSTREAM.getvalue()
502        sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE)
503        self.assertRegex(debugout, sender)
504        for addr in ('John', 'Dinsdale'):
505            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
506                                 re.MULTILINE)
507            self.assertRegex(debugout, to_addr)
508
509    def testSendMessageResent(self):
510        m = email.mime.text.MIMEText('A test message')
511        m['From'] = 'foo@bar.com'
512        m['To'] = 'John'
513        m['CC'] = 'Sally, Fred'
514        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
515        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
516        m['Resent-From'] = 'holy@grail.net'
517        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
518        m['Resent-Bcc'] = 'doe@losthope.net'
519        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
520        smtp.send_message(m)
521        # XXX (see comment in testSend)
522        time.sleep(0.01)
523        smtp.quit()
524
525        self.client_evt.set()
526        self.serv_evt.wait()
527        self.output.flush()
528        # The Resent-Bcc headers are deleted before serialization.
529        del m['Bcc']
530        del m['Resent-Bcc']
531        # Remove the X-Peer header that DebuggingServer adds.
532        test_output = self.get_output_without_xpeer()
533        del m['X-Peer']
534        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
535        self.assertEqual(test_output, mexpect)
536        debugout = smtpd.DEBUGSTREAM.getvalue()
537        sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
538        self.assertRegex(debugout, sender)
539        for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
540            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
541                                 re.MULTILINE)
542            self.assertRegex(debugout, to_addr)
543
544    def testSendMessageMultipleResentRaises(self):
545        m = email.mime.text.MIMEText('A test message')
546        m['From'] = 'foo@bar.com'
547        m['To'] = 'John'
548        m['CC'] = 'Sally, Fred'
549        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
550        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
551        m['Resent-From'] = 'holy@grail.net'
552        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
553        m['Resent-Bcc'] = 'doe@losthope.net'
554        m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
555        m['Resent-To'] = 'holy@grail.net'
556        m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
557        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
558        with self.assertRaises(ValueError):
559            smtp.send_message(m)
560        smtp.close()
561
562class NonConnectingTests(unittest.TestCase):
563
564    def testNotConnected(self):
565        # Test various operations on an unconnected SMTP object that
566        # should raise exceptions (at present the attempt in SMTP.send
567        # to reference the nonexistent 'sock' attribute of the SMTP object
568        # causes an AttributeError)
569        smtp = smtplib.SMTP()
570        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
571        self.assertRaises(smtplib.SMTPServerDisconnected,
572                          smtp.send, 'test msg')
573
574    def testNonnumericPort(self):
575        # check that non-numeric port raises OSError
576        self.assertRaises(OSError, smtplib.SMTP,
577                          "localhost", "bogus")
578        self.assertRaises(OSError, smtplib.SMTP,
579                          "localhost:bogus")
580
581
582class DefaultArgumentsTests(unittest.TestCase):
583
584    def setUp(self):
585        self.msg = EmailMessage()
586        self.msg['From'] = 'Páolo <főo@bar.com>'
587        self.smtp = smtplib.SMTP()
588        self.smtp.ehlo = Mock(return_value=(200, 'OK'))
589        self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock()
590
591    def testSendMessage(self):
592        expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME')
593        self.smtp.send_message(self.msg)
594        self.smtp.send_message(self.msg)
595        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
596                         expected_mail_options)
597        self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3],
598                         expected_mail_options)
599
600    def testSendMessageWithMailOptions(self):
601        mail_options = ['STARTTLS']
602        expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME')
603        self.smtp.send_message(self.msg, None, None, mail_options)
604        self.assertEqual(mail_options, ['STARTTLS'])
605        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
606                         expected_mail_options)
607
608
609# test response of client to a non-successful HELO message
610class BadHELOServerTests(unittest.TestCase):
611
612    def setUp(self):
613        smtplib.socket = mock_socket
614        mock_socket.reply_with(b"199 no hello for you!")
615        self.old_stdout = sys.stdout
616        self.output = io.StringIO()
617        sys.stdout = self.output
618        self.port = 25
619
620    def tearDown(self):
621        smtplib.socket = socket
622        sys.stdout = self.old_stdout
623
624    def testFailingHELO(self):
625        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
626                            HOST, self.port, 'localhost', 3)
627
628
629class TooLongLineTests(unittest.TestCase):
630    respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
631
632    def setUp(self):
633        self.old_stdout = sys.stdout
634        self.output = io.StringIO()
635        sys.stdout = self.output
636
637        self.evt = threading.Event()
638        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
639        self.sock.settimeout(15)
640        self.port = support.bind_port(self.sock)
641        servargs = (self.evt, self.respdata, self.sock)
642        thread = threading.Thread(target=server, args=servargs)
643        thread.start()
644        self.addCleanup(thread.join)
645        self.evt.wait()
646        self.evt.clear()
647
648    def tearDown(self):
649        self.evt.wait()
650        sys.stdout = self.old_stdout
651
652    def testLineTooLong(self):
653        self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
654                          HOST, self.port, 'localhost', 3)
655
656
657sim_users = {'Mr.A@somewhere.com':'John A',
658             'Ms.B@xn--fo-fka.com':'Sally B',
659             'Mrs.C@somewhereesle.com':'Ruth C',
660            }
661
662sim_auth = ('Mr.A@somewhere.com', 'somepassword')
663sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
664                          'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
665sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
666             'list-2':['Ms.B@xn--fo-fka.com',],
667            }
668
669# Simulated SMTP channel & server
670class ResponseException(Exception): pass
671class SimSMTPChannel(smtpd.SMTPChannel):
672
673    quit_response = None
674    mail_response = None
675    rcpt_response = None
676    data_response = None
677    rcpt_count = 0
678    rset_count = 0
679    disconnect = 0
680    AUTH = 99    # Add protocol state to enable auth testing.
681    authenticated_user = None
682
683    def __init__(self, extra_features, *args, **kw):
684        self._extrafeatures = ''.join(
685            [ "250-{0}\r\n".format(x) for x in extra_features ])
686        super(SimSMTPChannel, self).__init__(*args, **kw)
687
688    # AUTH related stuff.  It would be nice if support for this were in smtpd.
689    def found_terminator(self):
690        if self.smtp_state == self.AUTH:
691            line = self._emptystring.join(self.received_lines)
692            print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
693            self.received_lines = []
694            try:
695                self.auth_object(line)
696            except ResponseException as e:
697                self.smtp_state = self.COMMAND
698                self.push('%s %s' % (e.smtp_code, e.smtp_error))
699                return
700        super().found_terminator()
701
702
703    def smtp_AUTH(self, arg):
704        if not self.seen_greeting:
705            self.push('503 Error: send EHLO first')
706            return
707        if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
708            self.push('500 Error: command "AUTH" not recognized')
709            return
710        if self.authenticated_user is not None:
711            self.push(
712                '503 Bad sequence of commands: already authenticated')
713            return
714        args = arg.split()
715        if len(args) not in [1, 2]:
716            self.push('501 Syntax: AUTH <mechanism> [initial-response]')
717            return
718        auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
719        try:
720            self.auth_object = getattr(self, auth_object_name)
721        except AttributeError:
722            self.push('504 Command parameter not implemented: unsupported '
723                      ' authentication mechanism {!r}'.format(auth_object_name))
724            return
725        self.smtp_state = self.AUTH
726        self.auth_object(args[1] if len(args) == 2 else None)
727
728    def _authenticated(self, user, valid):
729        if valid:
730            self.authenticated_user = user
731            self.push('235 Authentication Succeeded')
732        else:
733            self.push('535 Authentication credentials invalid')
734        self.smtp_state = self.COMMAND
735
736    def _decode_base64(self, string):
737        return base64.decodebytes(string.encode('ascii')).decode('utf-8')
738
739    def _auth_plain(self, arg=None):
740        if arg is None:
741            self.push('334 ')
742        else:
743            logpass = self._decode_base64(arg)
744            try:
745                *_, user, password = logpass.split('\0')
746            except ValueError as e:
747                self.push('535 Splitting response {!r} into user and password'
748                          ' failed: {}'.format(logpass, e))
749                return
750            self._authenticated(user, password == sim_auth[1])
751
752    def _auth_login(self, arg=None):
753        if arg is None:
754            # base64 encoded 'Username:'
755            self.push('334 VXNlcm5hbWU6')
756        elif not hasattr(self, '_auth_login_user'):
757            self._auth_login_user = self._decode_base64(arg)
758            # base64 encoded 'Password:'
759            self.push('334 UGFzc3dvcmQ6')
760        else:
761            password = self._decode_base64(arg)
762            self._authenticated(self._auth_login_user, password == sim_auth[1])
763            del self._auth_login_user
764
765    def _auth_cram_md5(self, arg=None):
766        if arg is None:
767            self.push('334 {}'.format(sim_cram_md5_challenge))
768        else:
769            logpass = self._decode_base64(arg)
770            try:
771                user, hashed_pass = logpass.split()
772            except ValueError as e:
773                self.push('535 Splitting response {!r} into user and password '
774                          'failed: {}'.format(logpass, e))
775                return False
776            valid_hashed_pass = hmac.HMAC(
777                sim_auth[1].encode('ascii'),
778                self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
779                'md5').hexdigest()
780            self._authenticated(user, hashed_pass == valid_hashed_pass)
781    # end AUTH related stuff.
782
783    def smtp_EHLO(self, arg):
784        resp = ('250-testhost\r\n'
785                '250-EXPN\r\n'
786                '250-SIZE 20000000\r\n'
787                '250-STARTTLS\r\n'
788                '250-DELIVERBY\r\n')
789        resp = resp + self._extrafeatures + '250 HELP'
790        self.push(resp)
791        self.seen_greeting = arg
792        self.extended_smtp = True
793
794    def smtp_VRFY(self, arg):
795        # For max compatibility smtplib should be sending the raw address.
796        if arg in sim_users:
797            self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
798        else:
799            self.push('550 No such user: %s' % arg)
800
801    def smtp_EXPN(self, arg):
802        list_name = arg.lower()
803        if list_name in sim_lists:
804            user_list = sim_lists[list_name]
805            for n, user_email in enumerate(user_list):
806                quoted_addr = smtplib.quoteaddr(user_email)
807                if n < len(user_list) - 1:
808                    self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
809                else:
810                    self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
811        else:
812            self.push('550 No access for you!')
813
814    def smtp_QUIT(self, arg):
815        if self.quit_response is None:
816            super(SimSMTPChannel, self).smtp_QUIT(arg)
817        else:
818            self.push(self.quit_response)
819            self.close_when_done()
820
821    def smtp_MAIL(self, arg):
822        if self.mail_response is None:
823            super().smtp_MAIL(arg)
824        else:
825            self.push(self.mail_response)
826            if self.disconnect:
827                self.close_when_done()
828
829    def smtp_RCPT(self, arg):
830        if self.rcpt_response is None:
831            super().smtp_RCPT(arg)
832            return
833        self.rcpt_count += 1
834        self.push(self.rcpt_response[self.rcpt_count-1])
835
836    def smtp_RSET(self, arg):
837        self.rset_count += 1
838        super().smtp_RSET(arg)
839
840    def smtp_DATA(self, arg):
841        if self.data_response is None:
842            super().smtp_DATA(arg)
843        else:
844            self.push(self.data_response)
845
846    def handle_error(self):
847        raise
848
849
850class SimSMTPServer(smtpd.SMTPServer):
851
852    channel_class = SimSMTPChannel
853
854    def __init__(self, *args, **kw):
855        self._extra_features = []
856        self._addresses = {}
857        smtpd.SMTPServer.__init__(self, *args, **kw)
858
859    def handle_accepted(self, conn, addr):
860        self._SMTPchannel = self.channel_class(
861            self._extra_features, self, conn, addr,
862            decode_data=self._decode_data)
863
864    def process_message(self, peer, mailfrom, rcpttos, data):
865        self._addresses['from'] = mailfrom
866        self._addresses['tos'] = rcpttos
867
868    def add_feature(self, feature):
869        self._extra_features.append(feature)
870
871    def handle_error(self):
872        raise
873
874
875# Test various SMTP & ESMTP commands/behaviors that require a simulated server
876# (i.e., something with more features than DebuggingServer)
877class SMTPSimTests(unittest.TestCase):
878
879    def setUp(self):
880        self.real_getfqdn = socket.getfqdn
881        socket.getfqdn = mock_socket.getfqdn
882        self.serv_evt = threading.Event()
883        self.client_evt = threading.Event()
884        # Pick a random unused port by passing 0 for the port number
885        self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
886        # Keep a note of what port was assigned
887        self.port = self.serv.socket.getsockname()[1]
888        serv_args = (self.serv, self.serv_evt, self.client_evt)
889        self.thread = threading.Thread(target=debugging_server, args=serv_args)
890        self.thread.start()
891
892        # wait until server thread has assigned a port number
893        self.serv_evt.wait()
894        self.serv_evt.clear()
895
896    def tearDown(self):
897        socket.getfqdn = self.real_getfqdn
898        # indicate that the client is finished
899        self.client_evt.set()
900        # wait for the server thread to terminate
901        self.serv_evt.wait()
902        self.thread.join()
903
904    def testBasic(self):
905        # smoke test
906        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
907        smtp.quit()
908
909    def testEHLO(self):
910        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
911
912        # no features should be present before the EHLO
913        self.assertEqual(smtp.esmtp_features, {})
914
915        # features expected from the test server
916        expected_features = {'expn':'',
917                             'size': '20000000',
918                             'starttls': '',
919                             'deliverby': '',
920                             'help': '',
921                             }
922
923        smtp.ehlo()
924        self.assertEqual(smtp.esmtp_features, expected_features)
925        for k in expected_features:
926            self.assertTrue(smtp.has_extn(k))
927        self.assertFalse(smtp.has_extn('unsupported-feature'))
928        smtp.quit()
929
930    def testVRFY(self):
931        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
932
933        for addr_spec, name in sim_users.items():
934            expected_known = (250, bytes('%s %s' %
935                                         (name, smtplib.quoteaddr(addr_spec)),
936                                         "ascii"))
937            self.assertEqual(smtp.vrfy(addr_spec), expected_known)
938
939        u = 'nobody@nowhere.com'
940        expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
941        self.assertEqual(smtp.vrfy(u), expected_unknown)
942        smtp.quit()
943
944    def testEXPN(self):
945        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
946
947        for listname, members in sim_lists.items():
948            users = []
949            for m in members:
950                users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
951            expected_known = (250, bytes('\n'.join(users), "ascii"))
952            self.assertEqual(smtp.expn(listname), expected_known)
953
954        u = 'PSU-Members-List'
955        expected_unknown = (550, b'No access for you!')
956        self.assertEqual(smtp.expn(u), expected_unknown)
957        smtp.quit()
958
959    def testAUTH_PLAIN(self):
960        self.serv.add_feature("AUTH PLAIN")
961        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
962        resp = smtp.login(sim_auth[0], sim_auth[1])
963        self.assertEqual(resp, (235, b'Authentication Succeeded'))
964        smtp.close()
965
966    def testAUTH_LOGIN(self):
967        self.serv.add_feature("AUTH LOGIN")
968        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
969        resp = smtp.login(sim_auth[0], sim_auth[1])
970        self.assertEqual(resp, (235, b'Authentication Succeeded'))
971        smtp.close()
972
973    def testAUTH_CRAM_MD5(self):
974        self.serv.add_feature("AUTH CRAM-MD5")
975        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
976        resp = smtp.login(sim_auth[0], sim_auth[1])
977        self.assertEqual(resp, (235, b'Authentication Succeeded'))
978        smtp.close()
979
980    def testAUTH_multiple(self):
981        # Test that multiple authentication methods are tried.
982        self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
983        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
984        resp = smtp.login(sim_auth[0], sim_auth[1])
985        self.assertEqual(resp, (235, b'Authentication Succeeded'))
986        smtp.close()
987
988    def test_auth_function(self):
989        supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'}
990        for mechanism in supported:
991            self.serv.add_feature("AUTH {}".format(mechanism))
992        for mechanism in supported:
993            with self.subTest(mechanism=mechanism):
994                smtp = smtplib.SMTP(HOST, self.port,
995                                    local_hostname='localhost', timeout=15)
996                smtp.ehlo('foo')
997                smtp.user, smtp.password = sim_auth[0], sim_auth[1]
998                method = 'auth_' + mechanism.lower().replace('-', '_')
999                resp = smtp.auth(mechanism, getattr(smtp, method))
1000                self.assertEqual(resp, (235, b'Authentication Succeeded'))
1001                smtp.close()
1002
1003    def test_quit_resets_greeting(self):
1004        smtp = smtplib.SMTP(HOST, self.port,
1005                            local_hostname='localhost',
1006                            timeout=15)
1007        code, message = smtp.ehlo()
1008        self.assertEqual(code, 250)
1009        self.assertIn('size', smtp.esmtp_features)
1010        smtp.quit()
1011        self.assertNotIn('size', smtp.esmtp_features)
1012        smtp.connect(HOST, self.port)
1013        self.assertNotIn('size', smtp.esmtp_features)
1014        smtp.ehlo_or_helo_if_needed()
1015        self.assertIn('size', smtp.esmtp_features)
1016        smtp.quit()
1017
1018    def test_with_statement(self):
1019        with smtplib.SMTP(HOST, self.port) as smtp:
1020            code, message = smtp.noop()
1021            self.assertEqual(code, 250)
1022        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1023        with smtplib.SMTP(HOST, self.port) as smtp:
1024            smtp.close()
1025        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1026
1027    def test_with_statement_QUIT_failure(self):
1028        with self.assertRaises(smtplib.SMTPResponseException) as error:
1029            with smtplib.SMTP(HOST, self.port) as smtp:
1030                smtp.noop()
1031                self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
1032        self.assertEqual(error.exception.smtp_code, 421)
1033        self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
1034
1035    #TODO: add tests for correct AUTH method fallback now that the
1036    #test infrastructure can support it.
1037
1038    # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
1039    def test__rest_from_mail_cmd(self):
1040        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1041        smtp.noop()
1042        self.serv._SMTPchannel.mail_response = '451 Requested action aborted'
1043        self.serv._SMTPchannel.disconnect = True
1044        with self.assertRaises(smtplib.SMTPSenderRefused):
1045            smtp.sendmail('John', 'Sally', 'test message')
1046        self.assertIsNone(smtp.sock)
1047
1048    # Issue 5713: make sure close, not rset, is called if we get a 421 error
1049    def test_421_from_mail_cmd(self):
1050        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1051        smtp.noop()
1052        self.serv._SMTPchannel.mail_response = '421 closing connection'
1053        with self.assertRaises(smtplib.SMTPSenderRefused):
1054            smtp.sendmail('John', 'Sally', 'test message')
1055        self.assertIsNone(smtp.sock)
1056        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1057
1058    def test_421_from_rcpt_cmd(self):
1059        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1060        smtp.noop()
1061        self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing']
1062        with self.assertRaises(smtplib.SMTPRecipientsRefused) as r:
1063            smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message')
1064        self.assertIsNone(smtp.sock)
1065        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1066        self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')})
1067
1068    def test_421_from_data_cmd(self):
1069        class MySimSMTPChannel(SimSMTPChannel):
1070            def found_terminator(self):
1071                if self.smtp_state == self.DATA:
1072                    self.push('421 closing')
1073                else:
1074                    super().found_terminator()
1075        self.serv.channel_class = MySimSMTPChannel
1076        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1077        smtp.noop()
1078        with self.assertRaises(smtplib.SMTPDataError):
1079            smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message')
1080        self.assertIsNone(smtp.sock)
1081        self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
1082
1083    def test_smtputf8_NotSupportedError_if_no_server_support(self):
1084        smtp = smtplib.SMTP(
1085            HOST, self.port, local_hostname='localhost', timeout=3)
1086        self.addCleanup(smtp.close)
1087        smtp.ehlo()
1088        self.assertTrue(smtp.does_esmtp)
1089        self.assertFalse(smtp.has_extn('smtputf8'))
1090        self.assertRaises(
1091            smtplib.SMTPNotSupportedError,
1092            smtp.sendmail,
1093            'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1094        self.assertRaises(
1095            smtplib.SMTPNotSupportedError,
1096            smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
1097
1098    def test_send_unicode_without_SMTPUTF8(self):
1099        smtp = smtplib.SMTP(
1100            HOST, self.port, local_hostname='localhost', timeout=3)
1101        self.addCleanup(smtp.close)
1102        self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
1103        self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
1104
1105    def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
1106        # This test is located here and not in the SMTPUTF8SimTests
1107        # class because it needs a "regular" SMTP server to work
1108        msg = EmailMessage()
1109        msg['From'] = "Páolo <főo@bar.com>"
1110        msg['To'] = 'Dinsdale'
1111        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1112        smtp = smtplib.SMTP(
1113            HOST, self.port, local_hostname='localhost', timeout=3)
1114        self.addCleanup(smtp.close)
1115        with self.assertRaises(smtplib.SMTPNotSupportedError):
1116            smtp.send_message(msg)
1117
1118    def test_name_field_not_included_in_envelop_addresses(self):
1119        smtp = smtplib.SMTP(
1120            HOST, self.port, local_hostname='localhost', timeout=3
1121        )
1122        self.addCleanup(smtp.close)
1123
1124        message = EmailMessage()
1125        message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com'))
1126        message['To'] = email.utils.formataddr(('René', 'rene@example.com'))
1127
1128        self.assertDictEqual(smtp.send_message(message), {})
1129
1130        self.assertEqual(self.serv._addresses['from'], 'michael@example.com')
1131        self.assertEqual(self.serv._addresses['tos'], ['rene@example.com'])
1132
1133
1134class SimSMTPUTF8Server(SimSMTPServer):
1135
1136    def __init__(self, *args, **kw):
1137        # The base SMTP server turns these on automatically, but our test
1138        # server is set up to munge the EHLO response, so we need to provide
1139        # them as well.  And yes, the call is to SMTPServer not SimSMTPServer.
1140        self._extra_features = ['SMTPUTF8', '8BITMIME']
1141        smtpd.SMTPServer.__init__(self, *args, **kw)
1142
1143    def handle_accepted(self, conn, addr):
1144        self._SMTPchannel = self.channel_class(
1145            self._extra_features, self, conn, addr,
1146            decode_data=self._decode_data,
1147            enable_SMTPUTF8=self.enable_SMTPUTF8,
1148        )
1149
1150    def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
1151                                                             rcpt_options=None):
1152        self.last_peer = peer
1153        self.last_mailfrom = mailfrom
1154        self.last_rcpttos = rcpttos
1155        self.last_message = data
1156        self.last_mail_options = mail_options
1157        self.last_rcpt_options = rcpt_options
1158
1159
1160class SMTPUTF8SimTests(unittest.TestCase):
1161
1162    maxDiff = None
1163
1164    def setUp(self):
1165        self.real_getfqdn = socket.getfqdn
1166        socket.getfqdn = mock_socket.getfqdn
1167        self.serv_evt = threading.Event()
1168        self.client_evt = threading.Event()
1169        # Pick a random unused port by passing 0 for the port number
1170        self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
1171                                      decode_data=False,
1172                                      enable_SMTPUTF8=True)
1173        # Keep a note of what port was assigned
1174        self.port = self.serv.socket.getsockname()[1]
1175        serv_args = (self.serv, self.serv_evt, self.client_evt)
1176        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1177        self.thread.start()
1178
1179        # wait until server thread has assigned a port number
1180        self.serv_evt.wait()
1181        self.serv_evt.clear()
1182
1183    def tearDown(self):
1184        socket.getfqdn = self.real_getfqdn
1185        # indicate that the client is finished
1186        self.client_evt.set()
1187        # wait for the server thread to terminate
1188        self.serv_evt.wait()
1189        self.thread.join()
1190
1191    def test_test_server_supports_extensions(self):
1192        smtp = smtplib.SMTP(
1193            HOST, self.port, local_hostname='localhost', timeout=3)
1194        self.addCleanup(smtp.close)
1195        smtp.ehlo()
1196        self.assertTrue(smtp.does_esmtp)
1197        self.assertTrue(smtp.has_extn('smtputf8'))
1198
1199    def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
1200        m = '¡a test message containing unicode!'.encode('utf-8')
1201        smtp = smtplib.SMTP(
1202            HOST, self.port, local_hostname='localhost', timeout=3)
1203        self.addCleanup(smtp.close)
1204        smtp.sendmail('Jőhn', 'Sálly', m,
1205                      mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1206        self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
1207        self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
1208        self.assertEqual(self.serv.last_message, m)
1209        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1210        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1211        self.assertEqual(self.serv.last_rcpt_options, [])
1212
1213    def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
1214        m = '¡a test message containing unicode!'.encode('utf-8')
1215        smtp = smtplib.SMTP(
1216            HOST, self.port, local_hostname='localhost', timeout=3)
1217        self.addCleanup(smtp.close)
1218        smtp.ehlo()
1219        self.assertEqual(
1220            smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']),
1221            (250, b'OK'))
1222        self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
1223        self.assertEqual(smtp.data(m), (250, b'OK'))
1224        self.assertEqual(self.serv.last_mailfrom, 'Jő')
1225        self.assertEqual(self.serv.last_rcpttos, ['János'])
1226        self.assertEqual(self.serv.last_message, m)
1227        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1228        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1229        self.assertEqual(self.serv.last_rcpt_options, [])
1230
1231    def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
1232        msg = EmailMessage()
1233        msg['From'] = "Páolo <főo@bar.com>"
1234        msg['To'] = 'Dinsdale'
1235        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1236        # XXX I don't know why I need two \n's here, but this is an existing
1237        # bug (if it is one) and not a problem with the new functionality.
1238        msg.set_content("oh là là, know what I mean, know what I mean?\n\n")
1239        # XXX smtpd converts received /r/n to /n, so we can't easily test that
1240        # we are successfully sending /r/n :(.
1241        expected = textwrap.dedent("""\
1242            From: Páolo <főo@bar.com>
1243            To: Dinsdale
1244            Subject: Nudge nudge, wink, wink \u1F609
1245            Content-Type: text/plain; charset="utf-8"
1246            Content-Transfer-Encoding: 8bit
1247            MIME-Version: 1.0
1248
1249            oh là là, know what I mean, know what I mean?
1250            """)
1251        smtp = smtplib.SMTP(
1252            HOST, self.port, local_hostname='localhost', timeout=3)
1253        self.addCleanup(smtp.close)
1254        self.assertEqual(smtp.send_message(msg), {})
1255        self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com')
1256        self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
1257        self.assertEqual(self.serv.last_message.decode(), expected)
1258        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1259        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1260        self.assertEqual(self.serv.last_rcpt_options, [])
1261
1262
1263EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
1264
1265class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel):
1266    def smtp_AUTH(self, arg):
1267        # RFC 4954's AUTH command allows for an optional initial-response.
1268        # Not all AUTH methods support this; some require a challenge.  AUTH
1269        # PLAIN does those, so test that here.  See issue #15014.
1270        args = arg.split()
1271        if args[0].lower() == 'plain':
1272            if len(args) == 2:
1273                # AUTH PLAIN <initial-response> with the response base 64
1274                # encoded.  Hard code the expected response for the test.
1275                if args[1] == EXPECTED_RESPONSE:
1276                    self.push('235 Ok')
1277                    return
1278        self.push('571 Bad authentication')
1279
1280class SimSMTPAUTHInitialResponseServer(SimSMTPServer):
1281    channel_class = SimSMTPAUTHInitialResponseChannel
1282
1283
1284class SMTPAUTHInitialResponseSimTests(unittest.TestCase):
1285    def setUp(self):
1286        self.real_getfqdn = socket.getfqdn
1287        socket.getfqdn = mock_socket.getfqdn
1288        self.serv_evt = threading.Event()
1289        self.client_evt = threading.Event()
1290        # Pick a random unused port by passing 0 for the port number
1291        self.serv = SimSMTPAUTHInitialResponseServer(
1292            (HOST, 0), ('nowhere', -1), decode_data=True)
1293        # Keep a note of what port was assigned
1294        self.port = self.serv.socket.getsockname()[1]
1295        serv_args = (self.serv, self.serv_evt, self.client_evt)
1296        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1297        self.thread.start()
1298
1299        # wait until server thread has assigned a port number
1300        self.serv_evt.wait()
1301        self.serv_evt.clear()
1302
1303    def tearDown(self):
1304        socket.getfqdn = self.real_getfqdn
1305        # indicate that the client is finished
1306        self.client_evt.set()
1307        # wait for the server thread to terminate
1308        self.serv_evt.wait()
1309        self.thread.join()
1310
1311    def testAUTH_PLAIN_initial_response_login(self):
1312        self.serv.add_feature('AUTH PLAIN')
1313        smtp = smtplib.SMTP(HOST, self.port,
1314                            local_hostname='localhost', timeout=15)
1315        smtp.login('psu', 'doesnotexist')
1316        smtp.close()
1317
1318    def testAUTH_PLAIN_initial_response_auth(self):
1319        self.serv.add_feature('AUTH PLAIN')
1320        smtp = smtplib.SMTP(HOST, self.port,
1321                            local_hostname='localhost', timeout=15)
1322        smtp.user = 'psu'
1323        smtp.password = 'doesnotexist'
1324        code, response = smtp.auth('plain', smtp.auth_plain)
1325        smtp.close()
1326        self.assertEqual(code, 235)
1327
1328
1329if __name__ == '__main__':
1330    unittest.main()
1331