• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import asyncore
2import base64
3import email.mime.text
4from email.message import EmailMessage
5from email.base64mime import body_encode as encode_base64
6import email.utils
7import hashlib
8import hmac
9import socket
10import smtpd
11import smtplib
12import io
13import re
14import sys
15import time
16import select
17import errno
18import textwrap
19import threading
20
21import unittest
22from test import support, mock_socket
23from test.support import HOST
24from test.support import threading_setup, threading_cleanup, join_thread
25from test.support import requires_hashdigest
26from unittest.mock import Mock
27
28
29if sys.platform == 'darwin':
30    # select.poll returns a select.POLLHUP at the end of the tests
31    # on darwin, so just ignore it
32    def handle_expt(self):
33        pass
34    smtpd.SMTPChannel.handle_expt = handle_expt
35
36
37def server(evt, buf, serv):
38    serv.listen()
39    evt.set()
40    try:
41        conn, addr = serv.accept()
42    except socket.timeout:
43        pass
44    else:
45        n = 500
46        while buf and n > 0:
47            r, w, e = select.select([], [conn], [])
48            if w:
49                sent = conn.send(buf)
50                buf = buf[sent:]
51
52            n -= 1
53
54        conn.close()
55    finally:
56        serv.close()
57        evt.set()
58
59class GeneralTests(unittest.TestCase):
60
61    def setUp(self):
62        smtplib.socket = mock_socket
63        self.port = 25
64
65    def tearDown(self):
66        smtplib.socket = socket
67
68    # This method is no longer used but is retained for backward compatibility,
69    # so test to make sure it still works.
70    def testQuoteData(self):
71        teststr  = "abc\n.jkl\rfoo\r\n..blue"
72        expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
73        self.assertEqual(expected, smtplib.quotedata(teststr))
74
75    def testBasic1(self):
76        mock_socket.reply_with(b"220 Hola mundo")
77        # connects
78        smtp = smtplib.SMTP(HOST, self.port)
79        smtp.close()
80
81    def testSourceAddress(self):
82        mock_socket.reply_with(b"220 Hola mundo")
83        # connects
84        smtp = smtplib.SMTP(HOST, self.port,
85                source_address=('127.0.0.1',19876))
86        self.assertEqual(smtp.source_address, ('127.0.0.1', 19876))
87        smtp.close()
88
89    def testBasic2(self):
90        mock_socket.reply_with(b"220 Hola mundo")
91        # connects, include port in host name
92        smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))
93        smtp.close()
94
95    def testLocalHostName(self):
96        mock_socket.reply_with(b"220 Hola mundo")
97        # check that supplied local_hostname is used
98        smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")
99        self.assertEqual(smtp.local_hostname, "testhost")
100        smtp.close()
101
102    def testTimeoutDefault(self):
103        mock_socket.reply_with(b"220 Hola mundo")
104        self.assertIsNone(mock_socket.getdefaulttimeout())
105        mock_socket.setdefaulttimeout(30)
106        self.assertEqual(mock_socket.getdefaulttimeout(), 30)
107        try:
108            smtp = smtplib.SMTP(HOST, self.port)
109        finally:
110            mock_socket.setdefaulttimeout(None)
111        self.assertEqual(smtp.sock.gettimeout(), 30)
112        smtp.close()
113
114    def testTimeoutNone(self):
115        mock_socket.reply_with(b"220 Hola mundo")
116        self.assertIsNone(socket.getdefaulttimeout())
117        socket.setdefaulttimeout(30)
118        try:
119            smtp = smtplib.SMTP(HOST, self.port, timeout=None)
120        finally:
121            socket.setdefaulttimeout(None)
122        self.assertIsNone(smtp.sock.gettimeout())
123        smtp.close()
124
125    def testTimeoutValue(self):
126        mock_socket.reply_with(b"220 Hola mundo")
127        smtp = smtplib.SMTP(HOST, self.port, timeout=30)
128        self.assertEqual(smtp.sock.gettimeout(), 30)
129        smtp.close()
130
131    def test_debuglevel(self):
132        mock_socket.reply_with(b"220 Hello world")
133        smtp = smtplib.SMTP()
134        smtp.set_debuglevel(1)
135        with support.captured_stderr() as stderr:
136            smtp.connect(HOST, self.port)
137        smtp.close()
138        expected = re.compile(r"^connect:", re.MULTILINE)
139        self.assertRegex(stderr.getvalue(), expected)
140
141    def test_debuglevel_2(self):
142        mock_socket.reply_with(b"220 Hello world")
143        smtp = smtplib.SMTP()
144        smtp.set_debuglevel(2)
145        with support.captured_stderr() as stderr:
146            smtp.connect(HOST, self.port)
147        smtp.close()
148        expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
149                              re.MULTILINE)
150        self.assertRegex(stderr.getvalue(), expected)
151
152
153# Test server thread using the specified SMTP server class
154def debugging_server(serv, serv_evt, client_evt):
155    serv_evt.set()
156
157    try:
158        if hasattr(select, 'poll'):
159            poll_fun = asyncore.poll2
160        else:
161            poll_fun = asyncore.poll
162
163        n = 1000
164        while asyncore.socket_map and n > 0:
165            poll_fun(0.01, asyncore.socket_map)
166
167            # when the client conversation is finished, it will
168            # set client_evt, and it's then ok to kill the server
169            if client_evt.is_set():
170                serv.close()
171                break
172
173            n -= 1
174
175    except socket.timeout:
176        pass
177    finally:
178        if not client_evt.is_set():
179            # allow some time for the client to read the result
180            time.sleep(0.5)
181            serv.close()
182        asyncore.close_all()
183        serv_evt.set()
184
185MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
186MSG_END = '------------ END MESSAGE ------------\n'
187
188# NOTE: Some SMTP objects in the tests below are created with a non-default
189# local_hostname argument to the constructor, since (on some systems) the FQDN
190# lookup caused by the default local_hostname sometimes takes so long that the
191# test server times out, causing the test to fail.
192
193# Test behavior of smtpd.DebuggingServer
194class DebuggingServerTests(unittest.TestCase):
195
196    maxDiff = None
197
198    def setUp(self):
199        self.thread_key = threading_setup()
200        self.real_getfqdn = socket.getfqdn
201        socket.getfqdn = mock_socket.getfqdn
202        # temporarily replace sys.stdout to capture DebuggingServer output
203        self.old_stdout = sys.stdout
204        self.output = io.StringIO()
205        sys.stdout = self.output
206
207        self.serv_evt = threading.Event()
208        self.client_evt = threading.Event()
209        # Capture SMTPChannel debug output
210        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
211        smtpd.DEBUGSTREAM = io.StringIO()
212        # Pick a random unused port by passing 0 for the port number
213        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
214                                          decode_data=True)
215        # Keep a note of what server host and port were assigned
216        self.host, self.port = self.serv.socket.getsockname()[:2]
217        serv_args = (self.serv, self.serv_evt, self.client_evt)
218        self.thread = threading.Thread(target=debugging_server, args=serv_args)
219        self.thread.start()
220
221        # wait until server thread has assigned a port number
222        self.serv_evt.wait()
223        self.serv_evt.clear()
224
225    def tearDown(self):
226        socket.getfqdn = self.real_getfqdn
227        # indicate that the client is finished
228        self.client_evt.set()
229        # wait for the server thread to terminate
230        self.serv_evt.wait()
231        join_thread(self.thread)
232        # restore sys.stdout
233        sys.stdout = self.old_stdout
234        # restore DEBUGSTREAM
235        smtpd.DEBUGSTREAM.close()
236        smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
237        del self.thread
238        self.doCleanups()
239        threading_cleanup(*self.thread_key)
240
241    def get_output_without_xpeer(self):
242        test_output = self.output.getvalue()
243        return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2',
244                      test_output, flags=re.MULTILINE|re.DOTALL)
245
246    def testBasic(self):
247        # connect
248        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
249        smtp.quit()
250
251    def testSourceAddress(self):
252        # connect
253        src_port = support.find_unused_port()
254        try:
255            smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
256                                timeout=3, source_address=(self.host, src_port))
257            self.addCleanup(smtp.close)
258            self.assertEqual(smtp.source_address, (self.host, src_port))
259            self.assertEqual(smtp.local_hostname, 'localhost')
260            smtp.quit()
261        except OSError as e:
262            if e.errno == errno.EADDRINUSE:
263                self.skipTest("couldn't bind to source port %d" % src_port)
264            raise
265
266    def testNOOP(self):
267        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
268        self.addCleanup(smtp.close)
269        expected = (250, b'OK')
270        self.assertEqual(smtp.noop(), expected)
271        smtp.quit()
272
273    def testRSET(self):
274        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
275        self.addCleanup(smtp.close)
276        expected = (250, b'OK')
277        self.assertEqual(smtp.rset(), expected)
278        smtp.quit()
279
280    def testELHO(self):
281        # EHLO isn't implemented in DebuggingServer
282        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
283        self.addCleanup(smtp.close)
284        expected = (250, b'\nSIZE 33554432\nHELP')
285        self.assertEqual(smtp.ehlo(), expected)
286        smtp.quit()
287
288    def testEXPNNotImplemented(self):
289        # EXPN isn't implemented in DebuggingServer
290        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
291        self.addCleanup(smtp.close)
292        expected = (502, b'EXPN not implemented')
293        smtp.putcmd('EXPN')
294        self.assertEqual(smtp.getreply(), expected)
295        smtp.quit()
296
297    def testVRFY(self):
298        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
299        self.addCleanup(smtp.close)
300        expected = (252, b'Cannot VRFY user, but will accept message ' + \
301                         b'and attempt delivery')
302        self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
303        self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
304        smtp.quit()
305
306    def testSecondHELO(self):
307        # check that a second HELO returns a message that it's a duplicate
308        # (this behavior is specific to smtpd.SMTPChannel)
309        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
310        self.addCleanup(smtp.close)
311        smtp.helo()
312        expected = (503, b'Duplicate HELO/EHLO')
313        self.assertEqual(smtp.helo(), expected)
314        smtp.quit()
315
316    def testHELP(self):
317        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
318        self.addCleanup(smtp.close)
319        self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
320                                      b'RCPT DATA RSET NOOP QUIT VRFY')
321        smtp.quit()
322
323    def testSend(self):
324        # connect and send mail
325        m = 'A test message'
326        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
327        self.addCleanup(smtp.close)
328        smtp.sendmail('John', 'Sally', m)
329        # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
330        # in asyncore.  This sleep might help, but should really be fixed
331        # properly by using an Event variable.
332        time.sleep(0.01)
333        smtp.quit()
334
335        self.client_evt.set()
336        self.serv_evt.wait()
337        self.output.flush()
338        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
339        self.assertEqual(self.output.getvalue(), mexpect)
340
341    def testSendBinary(self):
342        m = b'A test message'
343        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
344        self.addCleanup(smtp.close)
345        smtp.sendmail('John', 'Sally', m)
346        # XXX (see comment in testSend)
347        time.sleep(0.01)
348        smtp.quit()
349
350        self.client_evt.set()
351        self.serv_evt.wait()
352        self.output.flush()
353        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
354        self.assertEqual(self.output.getvalue(), mexpect)
355
356    def testSendNeedingDotQuote(self):
357        # Issue 12283
358        m = '.A test\n.mes.sage.'
359        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
360        self.addCleanup(smtp.close)
361        smtp.sendmail('John', 'Sally', m)
362        # XXX (see comment in testSend)
363        time.sleep(0.01)
364        smtp.quit()
365
366        self.client_evt.set()
367        self.serv_evt.wait()
368        self.output.flush()
369        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
370        self.assertEqual(self.output.getvalue(), mexpect)
371
372    def testSendNullSender(self):
373        m = 'A test message'
374        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
375        self.addCleanup(smtp.close)
376        smtp.sendmail('<>', 'Sally', m)
377        # XXX (see comment in testSend)
378        time.sleep(0.01)
379        smtp.quit()
380
381        self.client_evt.set()
382        self.serv_evt.wait()
383        self.output.flush()
384        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
385        self.assertEqual(self.output.getvalue(), mexpect)
386        debugout = smtpd.DEBUGSTREAM.getvalue()
387        sender = re.compile("^sender: <>$", re.MULTILINE)
388        self.assertRegex(debugout, sender)
389
390    def testSendMessage(self):
391        m = email.mime.text.MIMEText('A test message')
392        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
393        self.addCleanup(smtp.close)
394        smtp.send_message(m, from_addr='John', to_addrs='Sally')
395        # XXX (see comment in testSend)
396        time.sleep(0.01)
397        smtp.quit()
398
399        self.client_evt.set()
400        self.serv_evt.wait()
401        self.output.flush()
402        # Remove the X-Peer header that DebuggingServer adds as figuring out
403        # exactly what IP address format is put there is not easy (and
404        # irrelevant to our test).  Typically 127.0.0.1 or ::1, but it is
405        # not always the same as socket.gethostbyname(HOST). :(
406        test_output = self.get_output_without_xpeer()
407        del m['X-Peer']
408        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
409        self.assertEqual(test_output, mexpect)
410
411    def testSendMessageWithAddresses(self):
412        m = email.mime.text.MIMEText('A test message')
413        m['From'] = 'foo@bar.com'
414        m['To'] = 'John'
415        m['CC'] = 'Sally, Fred'
416        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
417        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
418        self.addCleanup(smtp.close)
419        smtp.send_message(m)
420        # XXX (see comment in testSend)
421        time.sleep(0.01)
422        smtp.quit()
423        # make sure the Bcc header is still in the message.
424        self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
425                                    '<warped@silly.walks.com>')
426
427        self.client_evt.set()
428        self.serv_evt.wait()
429        self.output.flush()
430        # Remove the X-Peer header that DebuggingServer adds.
431        test_output = self.get_output_without_xpeer()
432        del m['X-Peer']
433        # The Bcc header should not be transmitted.
434        del m['Bcc']
435        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
436        self.assertEqual(test_output, mexpect)
437        debugout = smtpd.DEBUGSTREAM.getvalue()
438        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
439        self.assertRegex(debugout, sender)
440        for addr in ('John', 'Sally', 'Fred', 'root@localhost',
441                     'warped@silly.walks.com'):
442            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
443                                 re.MULTILINE)
444            self.assertRegex(debugout, to_addr)
445
446    def testSendMessageWithSomeAddresses(self):
447        # Make sure nothing breaks if not all of the three 'to' headers exist
448        m = email.mime.text.MIMEText('A test message')
449        m['From'] = 'foo@bar.com'
450        m['To'] = 'John, Dinsdale'
451        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
452        self.addCleanup(smtp.close)
453        smtp.send_message(m)
454        # XXX (see comment in testSend)
455        time.sleep(0.01)
456        smtp.quit()
457
458        self.client_evt.set()
459        self.serv_evt.wait()
460        self.output.flush()
461        # Remove the X-Peer header that DebuggingServer adds.
462        test_output = self.get_output_without_xpeer()
463        del m['X-Peer']
464        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
465        self.assertEqual(test_output, mexpect)
466        debugout = smtpd.DEBUGSTREAM.getvalue()
467        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
468        self.assertRegex(debugout, sender)
469        for addr in ('John', 'Dinsdale'):
470            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
471                                 re.MULTILINE)
472            self.assertRegex(debugout, to_addr)
473
474    def testSendMessageWithSpecifiedAddresses(self):
475        # Make sure addresses specified in call override those in message.
476        m = email.mime.text.MIMEText('A test message')
477        m['From'] = 'foo@bar.com'
478        m['To'] = 'John, Dinsdale'
479        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
480        self.addCleanup(smtp.close)
481        smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net')
482        # XXX (see comment in testSend)
483        time.sleep(0.01)
484        smtp.quit()
485
486        self.client_evt.set()
487        self.serv_evt.wait()
488        self.output.flush()
489        # Remove the X-Peer header that DebuggingServer adds.
490        test_output = self.get_output_without_xpeer()
491        del m['X-Peer']
492        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
493        self.assertEqual(test_output, mexpect)
494        debugout = smtpd.DEBUGSTREAM.getvalue()
495        sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
496        self.assertRegex(debugout, sender)
497        for addr in ('John', 'Dinsdale'):
498            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
499                                 re.MULTILINE)
500            self.assertNotRegex(debugout, to_addr)
501        recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
502        self.assertRegex(debugout, recip)
503
504    def testSendMessageWithMultipleFrom(self):
505        # Sender overrides To
506        m = email.mime.text.MIMEText('A test message')
507        m['From'] = 'Bernard, Bianca'
508        m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
509        m['To'] = 'John, Dinsdale'
510        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
511        self.addCleanup(smtp.close)
512        smtp.send_message(m)
513        # XXX (see comment in testSend)
514        time.sleep(0.01)
515        smtp.quit()
516
517        self.client_evt.set()
518        self.serv_evt.wait()
519        self.output.flush()
520        # Remove the X-Peer header that DebuggingServer adds.
521        test_output = self.get_output_without_xpeer()
522        del m['X-Peer']
523        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
524        self.assertEqual(test_output, mexpect)
525        debugout = smtpd.DEBUGSTREAM.getvalue()
526        sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE)
527        self.assertRegex(debugout, sender)
528        for addr in ('John', 'Dinsdale'):
529            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
530                                 re.MULTILINE)
531            self.assertRegex(debugout, to_addr)
532
533    def testSendMessageResent(self):
534        m = email.mime.text.MIMEText('A test message')
535        m['From'] = 'foo@bar.com'
536        m['To'] = 'John'
537        m['CC'] = 'Sally, Fred'
538        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
539        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
540        m['Resent-From'] = 'holy@grail.net'
541        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
542        m['Resent-Bcc'] = 'doe@losthope.net'
543        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
544        self.addCleanup(smtp.close)
545        smtp.send_message(m)
546        # XXX (see comment in testSend)
547        time.sleep(0.01)
548        smtp.quit()
549
550        self.client_evt.set()
551        self.serv_evt.wait()
552        self.output.flush()
553        # The Resent-Bcc headers are deleted before serialization.
554        del m['Bcc']
555        del m['Resent-Bcc']
556        # Remove the X-Peer header that DebuggingServer adds.
557        test_output = self.get_output_without_xpeer()
558        del m['X-Peer']
559        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
560        self.assertEqual(test_output, mexpect)
561        debugout = smtpd.DEBUGSTREAM.getvalue()
562        sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
563        self.assertRegex(debugout, sender)
564        for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
565            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
566                                 re.MULTILINE)
567            self.assertRegex(debugout, to_addr)
568
569    def testSendMessageMultipleResentRaises(self):
570        m = email.mime.text.MIMEText('A test message')
571        m['From'] = 'foo@bar.com'
572        m['To'] = 'John'
573        m['CC'] = 'Sally, Fred'
574        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
575        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
576        m['Resent-From'] = 'holy@grail.net'
577        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
578        m['Resent-Bcc'] = 'doe@losthope.net'
579        m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
580        m['Resent-To'] = 'holy@grail.net'
581        m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
582        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
583        self.addCleanup(smtp.close)
584        with self.assertRaises(ValueError):
585            smtp.send_message(m)
586        smtp.close()
587
588class NonConnectingTests(unittest.TestCase):
589
590    def testNotConnected(self):
591        # Test various operations on an unconnected SMTP object that
592        # should raise exceptions (at present the attempt in SMTP.send
593        # to reference the nonexistent 'sock' attribute of the SMTP object
594        # causes an AttributeError)
595        smtp = smtplib.SMTP()
596        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
597        self.assertRaises(smtplib.SMTPServerDisconnected,
598                          smtp.send, 'test msg')
599
600    def testNonnumericPort(self):
601        # check that non-numeric port raises OSError
602        self.assertRaises(OSError, smtplib.SMTP,
603                          "localhost", "bogus")
604        self.assertRaises(OSError, smtplib.SMTP,
605                          "localhost:bogus")
606
607    def testSockAttributeExists(self):
608        # check that sock attribute is present outside of a connect() call
609        # (regression test, the previous behavior raised an
610        #  AttributeError: 'SMTP' object has no attribute 'sock')
611        with smtplib.SMTP() as smtp:
612            self.assertIsNone(smtp.sock)
613
614
615class DefaultArgumentsTests(unittest.TestCase):
616
617    def setUp(self):
618        self.msg = EmailMessage()
619        self.msg['From'] = 'Páolo <főo@bar.com>'
620        self.smtp = smtplib.SMTP()
621        self.smtp.ehlo = Mock(return_value=(200, 'OK'))
622        self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock()
623
624    def testSendMessage(self):
625        expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME')
626        self.smtp.send_message(self.msg)
627        self.smtp.send_message(self.msg)
628        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
629                         expected_mail_options)
630        self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3],
631                         expected_mail_options)
632
633    def testSendMessageWithMailOptions(self):
634        mail_options = ['STARTTLS']
635        expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME')
636        self.smtp.send_message(self.msg, None, None, mail_options)
637        self.assertEqual(mail_options, ['STARTTLS'])
638        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
639                         expected_mail_options)
640
641
642# test response of client to a non-successful HELO message
643class BadHELOServerTests(unittest.TestCase):
644
645    def setUp(self):
646        smtplib.socket = mock_socket
647        mock_socket.reply_with(b"199 no hello for you!")
648        self.old_stdout = sys.stdout
649        self.output = io.StringIO()
650        sys.stdout = self.output
651        self.port = 25
652
653    def tearDown(self):
654        smtplib.socket = socket
655        sys.stdout = self.old_stdout
656
657    def testFailingHELO(self):
658        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
659                            HOST, self.port, 'localhost', 3)
660
661
662class TooLongLineTests(unittest.TestCase):
663    respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
664
665    def setUp(self):
666        self.thread_key = threading_setup()
667        self.old_stdout = sys.stdout
668        self.output = io.StringIO()
669        sys.stdout = self.output
670
671        self.evt = threading.Event()
672        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
673        self.sock.settimeout(15)
674        self.port = support.bind_port(self.sock)
675        servargs = (self.evt, self.respdata, self.sock)
676        self.thread = threading.Thread(target=server, args=servargs)
677        self.thread.start()
678        self.evt.wait()
679        self.evt.clear()
680
681    def tearDown(self):
682        self.evt.wait()
683        sys.stdout = self.old_stdout
684        join_thread(self.thread)
685        del self.thread
686        self.doCleanups()
687        threading_cleanup(*self.thread_key)
688
689    def testLineTooLong(self):
690        self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
691                          HOST, self.port, 'localhost', 3)
692
693
694sim_users = {'Mr.A@somewhere.com':'John A',
695             'Ms.B@xn--fo-fka.com':'Sally B',
696             'Mrs.C@somewhereesle.com':'Ruth C',
697            }
698
699sim_auth = ('Mr.A@somewhere.com', 'somepassword')
700sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
701                          'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
702sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
703             'list-2':['Ms.B@xn--fo-fka.com',],
704            }
705
706# Simulated SMTP channel & server
707class ResponseException(Exception): pass
708class SimSMTPChannel(smtpd.SMTPChannel):
709
710    quit_response = None
711    mail_response = None
712    rcpt_response = None
713    data_response = None
714    rcpt_count = 0
715    rset_count = 0
716    disconnect = 0
717    AUTH = 99    # Add protocol state to enable auth testing.
718    authenticated_user = None
719
720    def __init__(self, extra_features, *args, **kw):
721        self._extrafeatures = ''.join(
722            [ "250-{0}\r\n".format(x) for x in extra_features ])
723        super(SimSMTPChannel, self).__init__(*args, **kw)
724
725    # AUTH related stuff.  It would be nice if support for this were in smtpd.
726    def found_terminator(self):
727        if self.smtp_state == self.AUTH:
728            line = self._emptystring.join(self.received_lines)
729            print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
730            self.received_lines = []
731            try:
732                self.auth_object(line)
733            except ResponseException as e:
734                self.smtp_state = self.COMMAND
735                self.push('%s %s' % (e.smtp_code, e.smtp_error))
736                return
737        super().found_terminator()
738
739
740    def smtp_AUTH(self, arg):
741        if not self.seen_greeting:
742            self.push('503 Error: send EHLO first')
743            return
744        if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
745            self.push('500 Error: command "AUTH" not recognized')
746            return
747        if self.authenticated_user is not None:
748            self.push(
749                '503 Bad sequence of commands: already authenticated')
750            return
751        args = arg.split()
752        if len(args) not in [1, 2]:
753            self.push('501 Syntax: AUTH <mechanism> [initial-response]')
754            return
755        auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
756        try:
757            self.auth_object = getattr(self, auth_object_name)
758        except AttributeError:
759            self.push('504 Command parameter not implemented: unsupported '
760                      ' authentication mechanism {!r}'.format(auth_object_name))
761            return
762        self.smtp_state = self.AUTH
763        self.auth_object(args[1] if len(args) == 2 else None)
764
765    def _authenticated(self, user, valid):
766        if valid:
767            self.authenticated_user = user
768            self.push('235 Authentication Succeeded')
769        else:
770            self.push('535 Authentication credentials invalid')
771        self.smtp_state = self.COMMAND
772
773    def _decode_base64(self, string):
774        return base64.decodebytes(string.encode('ascii')).decode('utf-8')
775
776    def _auth_plain(self, arg=None):
777        if arg is None:
778            self.push('334 ')
779        else:
780            logpass = self._decode_base64(arg)
781            try:
782                *_, user, password = logpass.split('\0')
783            except ValueError as e:
784                self.push('535 Splitting response {!r} into user and password'
785                          ' failed: {}'.format(logpass, e))
786                return
787            self._authenticated(user, password == sim_auth[1])
788
789    def _auth_login(self, arg=None):
790        if arg is None:
791            # base64 encoded 'Username:'
792            self.push('334 VXNlcm5hbWU6')
793        elif not hasattr(self, '_auth_login_user'):
794            self._auth_login_user = self._decode_base64(arg)
795            # base64 encoded 'Password:'
796            self.push('334 UGFzc3dvcmQ6')
797        else:
798            password = self._decode_base64(arg)
799            self._authenticated(self._auth_login_user, password == sim_auth[1])
800            del self._auth_login_user
801
802    def _auth_cram_md5(self, arg=None):
803        if arg is None:
804            self.push('334 {}'.format(sim_cram_md5_challenge))
805        else:
806            logpass = self._decode_base64(arg)
807            try:
808                user, hashed_pass = logpass.split()
809            except ValueError as e:
810                self.push('535 Splitting response {!r} into user and password '
811                          'failed: {}'.format(logpass, e))
812                return False
813            valid_hashed_pass = hmac.HMAC(
814                sim_auth[1].encode('ascii'),
815                self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
816                'md5').hexdigest()
817            self._authenticated(user, hashed_pass == valid_hashed_pass)
818    # end AUTH related stuff.
819
820    def smtp_EHLO(self, arg):
821        resp = ('250-testhost\r\n'
822                '250-EXPN\r\n'
823                '250-SIZE 20000000\r\n'
824                '250-STARTTLS\r\n'
825                '250-DELIVERBY\r\n')
826        resp = resp + self._extrafeatures + '250 HELP'
827        self.push(resp)
828        self.seen_greeting = arg
829        self.extended_smtp = True
830
831    def smtp_VRFY(self, arg):
832        # For max compatibility smtplib should be sending the raw address.
833        if arg in sim_users:
834            self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
835        else:
836            self.push('550 No such user: %s' % arg)
837
838    def smtp_EXPN(self, arg):
839        list_name = arg.lower()
840        if list_name in sim_lists:
841            user_list = sim_lists[list_name]
842            for n, user_email in enumerate(user_list):
843                quoted_addr = smtplib.quoteaddr(user_email)
844                if n < len(user_list) - 1:
845                    self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
846                else:
847                    self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
848        else:
849            self.push('550 No access for you!')
850
851    def smtp_QUIT(self, arg):
852        if self.quit_response is None:
853            super(SimSMTPChannel, self).smtp_QUIT(arg)
854        else:
855            self.push(self.quit_response)
856            self.close_when_done()
857
858    def smtp_MAIL(self, arg):
859        if self.mail_response is None:
860            super().smtp_MAIL(arg)
861        else:
862            self.push(self.mail_response)
863            if self.disconnect:
864                self.close_when_done()
865
866    def smtp_RCPT(self, arg):
867        if self.rcpt_response is None:
868            super().smtp_RCPT(arg)
869            return
870        self.rcpt_count += 1
871        self.push(self.rcpt_response[self.rcpt_count-1])
872
873    def smtp_RSET(self, arg):
874        self.rset_count += 1
875        super().smtp_RSET(arg)
876
877    def smtp_DATA(self, arg):
878        if self.data_response is None:
879            super().smtp_DATA(arg)
880        else:
881            self.push(self.data_response)
882
883    def handle_error(self):
884        raise
885
886
887class SimSMTPServer(smtpd.SMTPServer):
888
889    channel_class = SimSMTPChannel
890
891    def __init__(self, *args, **kw):
892        self._extra_features = []
893        self._addresses = {}
894        smtpd.SMTPServer.__init__(self, *args, **kw)
895
896    def handle_accepted(self, conn, addr):
897        self._SMTPchannel = self.channel_class(
898            self._extra_features, self, conn, addr,
899            decode_data=self._decode_data)
900
901    def process_message(self, peer, mailfrom, rcpttos, data):
902        self._addresses['from'] = mailfrom
903        self._addresses['tos'] = rcpttos
904
905    def add_feature(self, feature):
906        self._extra_features.append(feature)
907
908    def handle_error(self):
909        raise
910
911
912# Test various SMTP & ESMTP commands/behaviors that require a simulated server
913# (i.e., something with more features than DebuggingServer)
914class SMTPSimTests(unittest.TestCase):
915
916    def setUp(self):
917        self.thread_key = threading_setup()
918        self.real_getfqdn = socket.getfqdn
919        socket.getfqdn = mock_socket.getfqdn
920        self.serv_evt = threading.Event()
921        self.client_evt = threading.Event()
922        # Pick a random unused port by passing 0 for the port number
923        self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
924        # Keep a note of what port was assigned
925        self.port = self.serv.socket.getsockname()[1]
926        serv_args = (self.serv, self.serv_evt, self.client_evt)
927        self.thread = threading.Thread(target=debugging_server, args=serv_args)
928        self.thread.start()
929
930        # wait until server thread has assigned a port number
931        self.serv_evt.wait()
932        self.serv_evt.clear()
933
934    def tearDown(self):
935        socket.getfqdn = self.real_getfqdn
936        # indicate that the client is finished
937        self.client_evt.set()
938        # wait for the server thread to terminate
939        self.serv_evt.wait()
940        join_thread(self.thread)
941        del self.thread
942        self.doCleanups()
943        threading_cleanup(*self.thread_key)
944
945    def testBasic(self):
946        # smoke test
947        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
948        smtp.quit()
949
950    def testEHLO(self):
951        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
952
953        # no features should be present before the EHLO
954        self.assertEqual(smtp.esmtp_features, {})
955
956        # features expected from the test server
957        expected_features = {'expn':'',
958                             'size': '20000000',
959                             'starttls': '',
960                             'deliverby': '',
961                             'help': '',
962                             }
963
964        smtp.ehlo()
965        self.assertEqual(smtp.esmtp_features, expected_features)
966        for k in expected_features:
967            self.assertTrue(smtp.has_extn(k))
968        self.assertFalse(smtp.has_extn('unsupported-feature'))
969        smtp.quit()
970
971    def testVRFY(self):
972        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
973
974        for addr_spec, name in sim_users.items():
975            expected_known = (250, bytes('%s %s' %
976                                         (name, smtplib.quoteaddr(addr_spec)),
977                                         "ascii"))
978            self.assertEqual(smtp.vrfy(addr_spec), expected_known)
979
980        u = 'nobody@nowhere.com'
981        expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
982        self.assertEqual(smtp.vrfy(u), expected_unknown)
983        smtp.quit()
984
985    def testEXPN(self):
986        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
987
988        for listname, members in sim_lists.items():
989            users = []
990            for m in members:
991                users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
992            expected_known = (250, bytes('\n'.join(users), "ascii"))
993            self.assertEqual(smtp.expn(listname), expected_known)
994
995        u = 'PSU-Members-List'
996        expected_unknown = (550, b'No access for you!')
997        self.assertEqual(smtp.expn(u), expected_unknown)
998        smtp.quit()
999
1000    def testAUTH_PLAIN(self):
1001        self.serv.add_feature("AUTH PLAIN")
1002        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1003        resp = smtp.login(sim_auth[0], sim_auth[1])
1004        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1005        smtp.close()
1006
1007    def testAUTH_LOGIN(self):
1008        self.serv.add_feature("AUTH LOGIN")
1009        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1010        resp = smtp.login(sim_auth[0], sim_auth[1])
1011        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1012        smtp.close()
1013
1014    @requires_hashdigest('md5')
1015    def testAUTH_CRAM_MD5(self):
1016        self.serv.add_feature("AUTH CRAM-MD5")
1017        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1018        resp = smtp.login(sim_auth[0], sim_auth[1])
1019        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1020        smtp.close()
1021
1022    def testAUTH_multiple(self):
1023        # Test that multiple authentication methods are tried.
1024        self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
1025        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1026        resp = smtp.login(sim_auth[0], sim_auth[1])
1027        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1028        smtp.close()
1029
1030    def test_auth_function(self):
1031        supported = {'PLAIN', 'LOGIN'}
1032        try:
1033            hashlib.md5()
1034        except ValueError:
1035            pass
1036        else:
1037            supported.add('CRAM-MD5')
1038        for mechanism in supported:
1039            self.serv.add_feature("AUTH {}".format(mechanism))
1040        for mechanism in supported:
1041            with self.subTest(mechanism=mechanism):
1042                smtp = smtplib.SMTP(HOST, self.port,
1043                                    local_hostname='localhost', timeout=15)
1044                smtp.ehlo('foo')
1045                smtp.user, smtp.password = sim_auth[0], sim_auth[1]
1046                method = 'auth_' + mechanism.lower().replace('-', '_')
1047                resp = smtp.auth(mechanism, getattr(smtp, method))
1048                self.assertEqual(resp, (235, b'Authentication Succeeded'))
1049                smtp.close()
1050
1051    def test_quit_resets_greeting(self):
1052        smtp = smtplib.SMTP(HOST, self.port,
1053                            local_hostname='localhost',
1054                            timeout=15)
1055        code, message = smtp.ehlo()
1056        self.assertEqual(code, 250)
1057        self.assertIn('size', smtp.esmtp_features)
1058        smtp.quit()
1059        self.assertNotIn('size', smtp.esmtp_features)
1060        smtp.connect(HOST, self.port)
1061        self.assertNotIn('size', smtp.esmtp_features)
1062        smtp.ehlo_or_helo_if_needed()
1063        self.assertIn('size', smtp.esmtp_features)
1064        smtp.quit()
1065
1066    def test_with_statement(self):
1067        with smtplib.SMTP(HOST, self.port) as smtp:
1068            code, message = smtp.noop()
1069            self.assertEqual(code, 250)
1070        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1071        with smtplib.SMTP(HOST, self.port) as smtp:
1072            smtp.close()
1073        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1074
1075    def test_with_statement_QUIT_failure(self):
1076        with self.assertRaises(smtplib.SMTPResponseException) as error:
1077            with smtplib.SMTP(HOST, self.port) as smtp:
1078                smtp.noop()
1079                self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
1080        self.assertEqual(error.exception.smtp_code, 421)
1081        self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
1082
1083    #TODO: add tests for correct AUTH method fallback now that the
1084    #test infrastructure can support it.
1085
1086    # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
1087    def test__rest_from_mail_cmd(self):
1088        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1089        smtp.noop()
1090        self.serv._SMTPchannel.mail_response = '451 Requested action aborted'
1091        self.serv._SMTPchannel.disconnect = True
1092        with self.assertRaises(smtplib.SMTPSenderRefused):
1093            smtp.sendmail('John', 'Sally', 'test message')
1094        self.assertIsNone(smtp.sock)
1095
1096    # Issue 5713: make sure close, not rset, is called if we get a 421 error
1097    def test_421_from_mail_cmd(self):
1098        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1099        smtp.noop()
1100        self.serv._SMTPchannel.mail_response = '421 closing connection'
1101        with self.assertRaises(smtplib.SMTPSenderRefused):
1102            smtp.sendmail('John', 'Sally', 'test message')
1103        self.assertIsNone(smtp.sock)
1104        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1105
1106    def test_421_from_rcpt_cmd(self):
1107        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1108        smtp.noop()
1109        self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing']
1110        with self.assertRaises(smtplib.SMTPRecipientsRefused) as r:
1111            smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message')
1112        self.assertIsNone(smtp.sock)
1113        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1114        self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')})
1115
1116    def test_421_from_data_cmd(self):
1117        class MySimSMTPChannel(SimSMTPChannel):
1118            def found_terminator(self):
1119                if self.smtp_state == self.DATA:
1120                    self.push('421 closing')
1121                else:
1122                    super().found_terminator()
1123        self.serv.channel_class = MySimSMTPChannel
1124        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1125        smtp.noop()
1126        with self.assertRaises(smtplib.SMTPDataError):
1127            smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message')
1128        self.assertIsNone(smtp.sock)
1129        self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
1130
1131    def test_smtputf8_NotSupportedError_if_no_server_support(self):
1132        smtp = smtplib.SMTP(
1133            HOST, self.port, local_hostname='localhost', timeout=3)
1134        self.addCleanup(smtp.close)
1135        smtp.ehlo()
1136        self.assertTrue(smtp.does_esmtp)
1137        self.assertFalse(smtp.has_extn('smtputf8'))
1138        self.assertRaises(
1139            smtplib.SMTPNotSupportedError,
1140            smtp.sendmail,
1141            'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1142        self.assertRaises(
1143            smtplib.SMTPNotSupportedError,
1144            smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
1145
1146    def test_send_unicode_without_SMTPUTF8(self):
1147        smtp = smtplib.SMTP(
1148            HOST, self.port, local_hostname='localhost', timeout=3)
1149        self.addCleanup(smtp.close)
1150        self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
1151        self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
1152
1153    def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
1154        # This test is located here and not in the SMTPUTF8SimTests
1155        # class because it needs a "regular" SMTP server to work
1156        msg = EmailMessage()
1157        msg['From'] = "Páolo <főo@bar.com>"
1158        msg['To'] = 'Dinsdale'
1159        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1160        smtp = smtplib.SMTP(
1161            HOST, self.port, local_hostname='localhost', timeout=3)
1162        self.addCleanup(smtp.close)
1163        with self.assertRaises(smtplib.SMTPNotSupportedError):
1164            smtp.send_message(msg)
1165
1166    def test_name_field_not_included_in_envelop_addresses(self):
1167        smtp = smtplib.SMTP(
1168            HOST, self.port, local_hostname='localhost', timeout=3
1169        )
1170        self.addCleanup(smtp.close)
1171
1172        message = EmailMessage()
1173        message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com'))
1174        message['To'] = email.utils.formataddr(('René', 'rene@example.com'))
1175
1176        self.assertDictEqual(smtp.send_message(message), {})
1177
1178        self.assertEqual(self.serv._addresses['from'], 'michael@example.com')
1179        self.assertEqual(self.serv._addresses['tos'], ['rene@example.com'])
1180
1181
1182class SimSMTPUTF8Server(SimSMTPServer):
1183
1184    def __init__(self, *args, **kw):
1185        # The base SMTP server turns these on automatically, but our test
1186        # server is set up to munge the EHLO response, so we need to provide
1187        # them as well.  And yes, the call is to SMTPServer not SimSMTPServer.
1188        self._extra_features = ['SMTPUTF8', '8BITMIME']
1189        smtpd.SMTPServer.__init__(self, *args, **kw)
1190
1191    def handle_accepted(self, conn, addr):
1192        self._SMTPchannel = self.channel_class(
1193            self._extra_features, self, conn, addr,
1194            decode_data=self._decode_data,
1195            enable_SMTPUTF8=self.enable_SMTPUTF8,
1196        )
1197
1198    def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
1199                                                             rcpt_options=None):
1200        self.last_peer = peer
1201        self.last_mailfrom = mailfrom
1202        self.last_rcpttos = rcpttos
1203        self.last_message = data
1204        self.last_mail_options = mail_options
1205        self.last_rcpt_options = rcpt_options
1206
1207
1208class SMTPUTF8SimTests(unittest.TestCase):
1209
1210    maxDiff = None
1211
1212    def setUp(self):
1213        self.thread_key = threading_setup()
1214        self.real_getfqdn = socket.getfqdn
1215        socket.getfqdn = mock_socket.getfqdn
1216        self.serv_evt = threading.Event()
1217        self.client_evt = threading.Event()
1218        # Pick a random unused port by passing 0 for the port number
1219        self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
1220                                      decode_data=False,
1221                                      enable_SMTPUTF8=True)
1222        # Keep a note of what port was assigned
1223        self.port = self.serv.socket.getsockname()[1]
1224        serv_args = (self.serv, self.serv_evt, self.client_evt)
1225        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1226        self.thread.start()
1227
1228        # wait until server thread has assigned a port number
1229        self.serv_evt.wait()
1230        self.serv_evt.clear()
1231
1232    def tearDown(self):
1233        socket.getfqdn = self.real_getfqdn
1234        # indicate that the client is finished
1235        self.client_evt.set()
1236        # wait for the server thread to terminate
1237        self.serv_evt.wait()
1238        join_thread(self.thread)
1239        del self.thread
1240        self.doCleanups()
1241        threading_cleanup(*self.thread_key)
1242
1243    def test_test_server_supports_extensions(self):
1244        smtp = smtplib.SMTP(
1245            HOST, self.port, local_hostname='localhost', timeout=3)
1246        self.addCleanup(smtp.close)
1247        smtp.ehlo()
1248        self.assertTrue(smtp.does_esmtp)
1249        self.assertTrue(smtp.has_extn('smtputf8'))
1250
1251    def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
1252        m = '¡a test message containing unicode!'.encode('utf-8')
1253        smtp = smtplib.SMTP(
1254            HOST, self.port, local_hostname='localhost', timeout=3)
1255        self.addCleanup(smtp.close)
1256        smtp.sendmail('Jőhn', 'Sálly', m,
1257                      mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1258        self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
1259        self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
1260        self.assertEqual(self.serv.last_message, m)
1261        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1262        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1263        self.assertEqual(self.serv.last_rcpt_options, [])
1264
1265    def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
1266        m = '¡a test message containing unicode!'.encode('utf-8')
1267        smtp = smtplib.SMTP(
1268            HOST, self.port, local_hostname='localhost', timeout=3)
1269        self.addCleanup(smtp.close)
1270        smtp.ehlo()
1271        self.assertEqual(
1272            smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']),
1273            (250, b'OK'))
1274        self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
1275        self.assertEqual(smtp.data(m), (250, b'OK'))
1276        self.assertEqual(self.serv.last_mailfrom, 'Jő')
1277        self.assertEqual(self.serv.last_rcpttos, ['János'])
1278        self.assertEqual(self.serv.last_message, m)
1279        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1280        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1281        self.assertEqual(self.serv.last_rcpt_options, [])
1282
1283    def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
1284        msg = EmailMessage()
1285        msg['From'] = "Páolo <főo@bar.com>"
1286        msg['To'] = 'Dinsdale'
1287        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1288        # XXX I don't know why I need two \n's here, but this is an existing
1289        # bug (if it is one) and not a problem with the new functionality.
1290        msg.set_content("oh là là, know what I mean, know what I mean?\n\n")
1291        # XXX smtpd converts received /r/n to /n, so we can't easily test that
1292        # we are successfully sending /r/n :(.
1293        expected = textwrap.dedent("""\
1294            From: Páolo <főo@bar.com>
1295            To: Dinsdale
1296            Subject: Nudge nudge, wink, wink \u1F609
1297            Content-Type: text/plain; charset="utf-8"
1298            Content-Transfer-Encoding: 8bit
1299            MIME-Version: 1.0
1300
1301            oh là là, know what I mean, know what I mean?
1302            """)
1303        smtp = smtplib.SMTP(
1304            HOST, self.port, local_hostname='localhost', timeout=3)
1305        self.addCleanup(smtp.close)
1306        self.assertEqual(smtp.send_message(msg), {})
1307        self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com')
1308        self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
1309        self.assertEqual(self.serv.last_message.decode(), expected)
1310        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1311        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1312        self.assertEqual(self.serv.last_rcpt_options, [])
1313
1314
1315EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
1316
1317class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel):
1318    def smtp_AUTH(self, arg):
1319        # RFC 4954's AUTH command allows for an optional initial-response.
1320        # Not all AUTH methods support this; some require a challenge.  AUTH
1321        # PLAIN does those, so test that here.  See issue #15014.
1322        args = arg.split()
1323        if args[0].lower() == 'plain':
1324            if len(args) == 2:
1325                # AUTH PLAIN <initial-response> with the response base 64
1326                # encoded.  Hard code the expected response for the test.
1327                if args[1] == EXPECTED_RESPONSE:
1328                    self.push('235 Ok')
1329                    return
1330        self.push('571 Bad authentication')
1331
1332class SimSMTPAUTHInitialResponseServer(SimSMTPServer):
1333    channel_class = SimSMTPAUTHInitialResponseChannel
1334
1335
1336class SMTPAUTHInitialResponseSimTests(unittest.TestCase):
1337    def setUp(self):
1338        self.thread_key = threading_setup()
1339        self.real_getfqdn = socket.getfqdn
1340        socket.getfqdn = mock_socket.getfqdn
1341        self.serv_evt = threading.Event()
1342        self.client_evt = threading.Event()
1343        # Pick a random unused port by passing 0 for the port number
1344        self.serv = SimSMTPAUTHInitialResponseServer(
1345            (HOST, 0), ('nowhere', -1), decode_data=True)
1346        # Keep a note of what port was assigned
1347        self.port = self.serv.socket.getsockname()[1]
1348        serv_args = (self.serv, self.serv_evt, self.client_evt)
1349        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1350        self.thread.start()
1351
1352        # wait until server thread has assigned a port number
1353        self.serv_evt.wait()
1354        self.serv_evt.clear()
1355
1356    def tearDown(self):
1357        socket.getfqdn = self.real_getfqdn
1358        # indicate that the client is finished
1359        self.client_evt.set()
1360        # wait for the server thread to terminate
1361        self.serv_evt.wait()
1362        join_thread(self.thread)
1363        del self.thread
1364        self.doCleanups()
1365        threading_cleanup(*self.thread_key)
1366
1367    def testAUTH_PLAIN_initial_response_login(self):
1368        self.serv.add_feature('AUTH PLAIN')
1369        smtp = smtplib.SMTP(HOST, self.port,
1370                            local_hostname='localhost', timeout=15)
1371        smtp.login('psu', 'doesnotexist')
1372        smtp.close()
1373
1374    def testAUTH_PLAIN_initial_response_auth(self):
1375        self.serv.add_feature('AUTH PLAIN')
1376        smtp = smtplib.SMTP(HOST, self.port,
1377                            local_hostname='localhost', timeout=15)
1378        smtp.user = 'psu'
1379        smtp.password = 'doesnotexist'
1380        code, response = smtp.auth('plain', smtp.auth_plain)
1381        smtp.close()
1382        self.assertEqual(code, 235)
1383
1384
1385if __name__ == '__main__':
1386    unittest.main()
1387