• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import asyncore
2import base64
3import email.mime.text
4from email.message import EmailMessage
5from email.base64mime import body_encode as encode_base64
6import email.utils
7import hashlib
8import hmac
9import socket
10import smtpd
11import smtplib
12import io
13import re
14import sys
15import time
16import select
17import errno
18import textwrap
19import threading
20
21import unittest
22from test import support, mock_socket
23from test.support import hashlib_helper
24from test.support import socket_helper
25from test.support import threading_setup, threading_cleanup, join_thread
26from unittest.mock import Mock
27
28HOST = socket_helper.HOST
29
30if sys.platform == 'darwin':
31    # select.poll returns a select.POLLHUP at the end of the tests
32    # on darwin, so just ignore it
33    def handle_expt(self):
34        pass
35    smtpd.SMTPChannel.handle_expt = handle_expt
36
37
38def server(evt, buf, serv):
39    serv.listen()
40    evt.set()
41    try:
42        conn, addr = serv.accept()
43    except socket.timeout:
44        pass
45    else:
46        n = 500
47        while buf and n > 0:
48            r, w, e = select.select([], [conn], [])
49            if w:
50                sent = conn.send(buf)
51                buf = buf[sent:]
52
53            n -= 1
54
55        conn.close()
56    finally:
57        serv.close()
58        evt.set()
59
60class GeneralTests:
61
62    def setUp(self):
63        smtplib.socket = mock_socket
64        self.port = 25
65
66    def tearDown(self):
67        smtplib.socket = socket
68
69    # This method is no longer used but is retained for backward compatibility,
70    # so test to make sure it still works.
71    def testQuoteData(self):
72        teststr  = "abc\n.jkl\rfoo\r\n..blue"
73        expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
74        self.assertEqual(expected, smtplib.quotedata(teststr))
75
76    def testBasic1(self):
77        mock_socket.reply_with(b"220 Hola mundo")
78        # connects
79        client = self.client(HOST, self.port)
80        client.close()
81
82    def testSourceAddress(self):
83        mock_socket.reply_with(b"220 Hola mundo")
84        # connects
85        client = self.client(HOST, self.port,
86                             source_address=('127.0.0.1',19876))
87        self.assertEqual(client.source_address, ('127.0.0.1', 19876))
88        client.close()
89
90    def testBasic2(self):
91        mock_socket.reply_with(b"220 Hola mundo")
92        # connects, include port in host name
93        client = self.client("%s:%s" % (HOST, self.port))
94        client.close()
95
96    def testLocalHostName(self):
97        mock_socket.reply_with(b"220 Hola mundo")
98        # check that supplied local_hostname is used
99        client = self.client(HOST, self.port, local_hostname="testhost")
100        self.assertEqual(client.local_hostname, "testhost")
101        client.close()
102
103    def testTimeoutDefault(self):
104        mock_socket.reply_with(b"220 Hola mundo")
105        self.assertIsNone(mock_socket.getdefaulttimeout())
106        mock_socket.setdefaulttimeout(30)
107        self.assertEqual(mock_socket.getdefaulttimeout(), 30)
108        try:
109            client = self.client(HOST, self.port)
110        finally:
111            mock_socket.setdefaulttimeout(None)
112        self.assertEqual(client.sock.gettimeout(), 30)
113        client.close()
114
115    def testTimeoutNone(self):
116        mock_socket.reply_with(b"220 Hola mundo")
117        self.assertIsNone(socket.getdefaulttimeout())
118        socket.setdefaulttimeout(30)
119        try:
120            client = self.client(HOST, self.port, timeout=None)
121        finally:
122            socket.setdefaulttimeout(None)
123        self.assertIsNone(client.sock.gettimeout())
124        client.close()
125
126    def testTimeoutZero(self):
127        mock_socket.reply_with(b"220 Hola mundo")
128        with self.assertRaises(ValueError):
129            self.client(HOST, self.port, timeout=0)
130
131    def testTimeoutValue(self):
132        mock_socket.reply_with(b"220 Hola mundo")
133        client = self.client(HOST, self.port, timeout=30)
134        self.assertEqual(client.sock.gettimeout(), 30)
135        client.close()
136
137    def test_debuglevel(self):
138        mock_socket.reply_with(b"220 Hello world")
139        client = self.client()
140        client.set_debuglevel(1)
141        with support.captured_stderr() as stderr:
142            client.connect(HOST, self.port)
143        client.close()
144        expected = re.compile(r"^connect:", re.MULTILINE)
145        self.assertRegex(stderr.getvalue(), expected)
146
147    def test_debuglevel_2(self):
148        mock_socket.reply_with(b"220 Hello world")
149        client = self.client()
150        client.set_debuglevel(2)
151        with support.captured_stderr() as stderr:
152            client.connect(HOST, self.port)
153        client.close()
154        expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
155                              re.MULTILINE)
156        self.assertRegex(stderr.getvalue(), expected)
157
158
159class SMTPGeneralTests(GeneralTests, unittest.TestCase):
160
161    client = smtplib.SMTP
162
163
164class LMTPGeneralTests(GeneralTests, unittest.TestCase):
165
166    client = smtplib.LMTP
167
168    @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), "test requires Unix domain socket")
169    def testUnixDomainSocketTimeoutDefault(self):
170        local_host = '/some/local/lmtp/delivery/program'
171        mock_socket.reply_with(b"220 Hello world")
172        try:
173            client = self.client(local_host, self.port)
174        finally:
175            mock_socket.setdefaulttimeout(None)
176        self.assertIsNone(client.sock.gettimeout())
177        client.close()
178
179    def testTimeoutZero(self):
180        super().testTimeoutZero()
181        local_host = '/some/local/lmtp/delivery/program'
182        with self.assertRaises(ValueError):
183            self.client(local_host, timeout=0)
184
185# Test server thread using the specified SMTP server class
186def debugging_server(serv, serv_evt, client_evt):
187    serv_evt.set()
188
189    try:
190        if hasattr(select, 'poll'):
191            poll_fun = asyncore.poll2
192        else:
193            poll_fun = asyncore.poll
194
195        n = 1000
196        while asyncore.socket_map and n > 0:
197            poll_fun(0.01, asyncore.socket_map)
198
199            # when the client conversation is finished, it will
200            # set client_evt, and it's then ok to kill the server
201            if client_evt.is_set():
202                serv.close()
203                break
204
205            n -= 1
206
207    except socket.timeout:
208        pass
209    finally:
210        if not client_evt.is_set():
211            # allow some time for the client to read the result
212            time.sleep(0.5)
213            serv.close()
214        asyncore.close_all()
215        serv_evt.set()
216
217MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
218MSG_END = '------------ END MESSAGE ------------\n'
219
220# NOTE: Some SMTP objects in the tests below are created with a non-default
221# local_hostname argument to the constructor, since (on some systems) the FQDN
222# lookup caused by the default local_hostname sometimes takes so long that the
223# test server times out, causing the test to fail.
224
225# Test behavior of smtpd.DebuggingServer
226class DebuggingServerTests(unittest.TestCase):
227
228    maxDiff = None
229
230    def setUp(self):
231        self.thread_key = threading_setup()
232        self.real_getfqdn = socket.getfqdn
233        socket.getfqdn = mock_socket.getfqdn
234        # temporarily replace sys.stdout to capture DebuggingServer output
235        self.old_stdout = sys.stdout
236        self.output = io.StringIO()
237        sys.stdout = self.output
238
239        self.serv_evt = threading.Event()
240        self.client_evt = threading.Event()
241        # Capture SMTPChannel debug output
242        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
243        smtpd.DEBUGSTREAM = io.StringIO()
244        # Pick a random unused port by passing 0 for the port number
245        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
246                                          decode_data=True)
247        # Keep a note of what server host and port were assigned
248        self.host, self.port = self.serv.socket.getsockname()[:2]
249        serv_args = (self.serv, self.serv_evt, self.client_evt)
250        self.thread = threading.Thread(target=debugging_server, args=serv_args)
251        self.thread.start()
252
253        # wait until server thread has assigned a port number
254        self.serv_evt.wait()
255        self.serv_evt.clear()
256
257    def tearDown(self):
258        socket.getfqdn = self.real_getfqdn
259        # indicate that the client is finished
260        self.client_evt.set()
261        # wait for the server thread to terminate
262        self.serv_evt.wait()
263        join_thread(self.thread)
264        # restore sys.stdout
265        sys.stdout = self.old_stdout
266        # restore DEBUGSTREAM
267        smtpd.DEBUGSTREAM.close()
268        smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
269        del self.thread
270        self.doCleanups()
271        threading_cleanup(*self.thread_key)
272
273    def get_output_without_xpeer(self):
274        test_output = self.output.getvalue()
275        return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2',
276                      test_output, flags=re.MULTILINE|re.DOTALL)
277
278    def testBasic(self):
279        # connect
280        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
281                            timeout=support.LOOPBACK_TIMEOUT)
282        smtp.quit()
283
284    def testSourceAddress(self):
285        # connect
286        src_port = socket_helper.find_unused_port()
287        try:
288            smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
289                                timeout=support.LOOPBACK_TIMEOUT,
290                                source_address=(self.host, src_port))
291            self.addCleanup(smtp.close)
292            self.assertEqual(smtp.source_address, (self.host, src_port))
293            self.assertEqual(smtp.local_hostname, 'localhost')
294            smtp.quit()
295        except OSError as e:
296            if e.errno == errno.EADDRINUSE:
297                self.skipTest("couldn't bind to source port %d" % src_port)
298            raise
299
300    def testNOOP(self):
301        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
302                            timeout=support.LOOPBACK_TIMEOUT)
303        self.addCleanup(smtp.close)
304        expected = (250, b'OK')
305        self.assertEqual(smtp.noop(), expected)
306        smtp.quit()
307
308    def testRSET(self):
309        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
310                            timeout=support.LOOPBACK_TIMEOUT)
311        self.addCleanup(smtp.close)
312        expected = (250, b'OK')
313        self.assertEqual(smtp.rset(), expected)
314        smtp.quit()
315
316    def testELHO(self):
317        # EHLO isn't implemented in DebuggingServer
318        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
319                            timeout=support.LOOPBACK_TIMEOUT)
320        self.addCleanup(smtp.close)
321        expected = (250, b'\nSIZE 33554432\nHELP')
322        self.assertEqual(smtp.ehlo(), expected)
323        smtp.quit()
324
325    def testEXPNNotImplemented(self):
326        # EXPN isn't implemented in DebuggingServer
327        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
328                            timeout=support.LOOPBACK_TIMEOUT)
329        self.addCleanup(smtp.close)
330        expected = (502, b'EXPN not implemented')
331        smtp.putcmd('EXPN')
332        self.assertEqual(smtp.getreply(), expected)
333        smtp.quit()
334
335    def testVRFY(self):
336        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
337                            timeout=support.LOOPBACK_TIMEOUT)
338        self.addCleanup(smtp.close)
339        expected = (252, b'Cannot VRFY user, but will accept message ' + \
340                         b'and attempt delivery')
341        self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
342        self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
343        smtp.quit()
344
345    def testSecondHELO(self):
346        # check that a second HELO returns a message that it's a duplicate
347        # (this behavior is specific to smtpd.SMTPChannel)
348        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
349                            timeout=support.LOOPBACK_TIMEOUT)
350        self.addCleanup(smtp.close)
351        smtp.helo()
352        expected = (503, b'Duplicate HELO/EHLO')
353        self.assertEqual(smtp.helo(), expected)
354        smtp.quit()
355
356    def testHELP(self):
357        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
358                            timeout=support.LOOPBACK_TIMEOUT)
359        self.addCleanup(smtp.close)
360        self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
361                                      b'RCPT DATA RSET NOOP QUIT VRFY')
362        smtp.quit()
363
364    def testSend(self):
365        # connect and send mail
366        m = 'A test message'
367        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
368                            timeout=support.LOOPBACK_TIMEOUT)
369        self.addCleanup(smtp.close)
370        smtp.sendmail('John', 'Sally', m)
371        # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
372        # in asyncore.  This sleep might help, but should really be fixed
373        # properly by using an Event variable.
374        time.sleep(0.01)
375        smtp.quit()
376
377        self.client_evt.set()
378        self.serv_evt.wait()
379        self.output.flush()
380        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
381        self.assertEqual(self.output.getvalue(), mexpect)
382
383    def testSendBinary(self):
384        m = b'A test message'
385        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
386                            timeout=support.LOOPBACK_TIMEOUT)
387        self.addCleanup(smtp.close)
388        smtp.sendmail('John', 'Sally', m)
389        # XXX (see comment in testSend)
390        time.sleep(0.01)
391        smtp.quit()
392
393        self.client_evt.set()
394        self.serv_evt.wait()
395        self.output.flush()
396        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
397        self.assertEqual(self.output.getvalue(), mexpect)
398
399    def testSendNeedingDotQuote(self):
400        # Issue 12283
401        m = '.A test\n.mes.sage.'
402        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
403                            timeout=support.LOOPBACK_TIMEOUT)
404        self.addCleanup(smtp.close)
405        smtp.sendmail('John', 'Sally', m)
406        # XXX (see comment in testSend)
407        time.sleep(0.01)
408        smtp.quit()
409
410        self.client_evt.set()
411        self.serv_evt.wait()
412        self.output.flush()
413        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
414        self.assertEqual(self.output.getvalue(), mexpect)
415
416    def testSendNullSender(self):
417        m = 'A test message'
418        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
419                            timeout=support.LOOPBACK_TIMEOUT)
420        self.addCleanup(smtp.close)
421        smtp.sendmail('<>', 'Sally', m)
422        # XXX (see comment in testSend)
423        time.sleep(0.01)
424        smtp.quit()
425
426        self.client_evt.set()
427        self.serv_evt.wait()
428        self.output.flush()
429        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
430        self.assertEqual(self.output.getvalue(), mexpect)
431        debugout = smtpd.DEBUGSTREAM.getvalue()
432        sender = re.compile("^sender: <>$", re.MULTILINE)
433        self.assertRegex(debugout, sender)
434
435    def testSendMessage(self):
436        m = email.mime.text.MIMEText('A test message')
437        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
438                            timeout=support.LOOPBACK_TIMEOUT)
439        self.addCleanup(smtp.close)
440        smtp.send_message(m, from_addr='John', to_addrs='Sally')
441        # XXX (see comment in testSend)
442        time.sleep(0.01)
443        smtp.quit()
444
445        self.client_evt.set()
446        self.serv_evt.wait()
447        self.output.flush()
448        # Remove the X-Peer header that DebuggingServer adds as figuring out
449        # exactly what IP address format is put there is not easy (and
450        # irrelevant to our test).  Typically 127.0.0.1 or ::1, but it is
451        # not always the same as socket.gethostbyname(HOST). :(
452        test_output = self.get_output_without_xpeer()
453        del m['X-Peer']
454        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
455        self.assertEqual(test_output, mexpect)
456
457    def testSendMessageWithAddresses(self):
458        m = email.mime.text.MIMEText('A test message')
459        m['From'] = 'foo@bar.com'
460        m['To'] = 'John'
461        m['CC'] = 'Sally, Fred'
462        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
463        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
464                            timeout=support.LOOPBACK_TIMEOUT)
465        self.addCleanup(smtp.close)
466        smtp.send_message(m)
467        # XXX (see comment in testSend)
468        time.sleep(0.01)
469        smtp.quit()
470        # make sure the Bcc header is still in the message.
471        self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
472                                    '<warped@silly.walks.com>')
473
474        self.client_evt.set()
475        self.serv_evt.wait()
476        self.output.flush()
477        # Remove the X-Peer header that DebuggingServer adds.
478        test_output = self.get_output_without_xpeer()
479        del m['X-Peer']
480        # The Bcc header should not be transmitted.
481        del m['Bcc']
482        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
483        self.assertEqual(test_output, mexpect)
484        debugout = smtpd.DEBUGSTREAM.getvalue()
485        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
486        self.assertRegex(debugout, sender)
487        for addr in ('John', 'Sally', 'Fred', 'root@localhost',
488                     'warped@silly.walks.com'):
489            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
490                                 re.MULTILINE)
491            self.assertRegex(debugout, to_addr)
492
493    def testSendMessageWithSomeAddresses(self):
494        # Make sure nothing breaks if not all of the three 'to' headers exist
495        m = email.mime.text.MIMEText('A test message')
496        m['From'] = 'foo@bar.com'
497        m['To'] = 'John, Dinsdale'
498        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
499                            timeout=support.LOOPBACK_TIMEOUT)
500        self.addCleanup(smtp.close)
501        smtp.send_message(m)
502        # XXX (see comment in testSend)
503        time.sleep(0.01)
504        smtp.quit()
505
506        self.client_evt.set()
507        self.serv_evt.wait()
508        self.output.flush()
509        # Remove the X-Peer header that DebuggingServer adds.
510        test_output = self.get_output_without_xpeer()
511        del m['X-Peer']
512        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
513        self.assertEqual(test_output, mexpect)
514        debugout = smtpd.DEBUGSTREAM.getvalue()
515        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
516        self.assertRegex(debugout, sender)
517        for addr in ('John', 'Dinsdale'):
518            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
519                                 re.MULTILINE)
520            self.assertRegex(debugout, to_addr)
521
522    def testSendMessageWithSpecifiedAddresses(self):
523        # Make sure addresses specified in call override those in message.
524        m = email.mime.text.MIMEText('A test message')
525        m['From'] = 'foo@bar.com'
526        m['To'] = 'John, Dinsdale'
527        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
528                            timeout=support.LOOPBACK_TIMEOUT)
529        self.addCleanup(smtp.close)
530        smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net')
531        # XXX (see comment in testSend)
532        time.sleep(0.01)
533        smtp.quit()
534
535        self.client_evt.set()
536        self.serv_evt.wait()
537        self.output.flush()
538        # Remove the X-Peer header that DebuggingServer adds.
539        test_output = self.get_output_without_xpeer()
540        del m['X-Peer']
541        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
542        self.assertEqual(test_output, mexpect)
543        debugout = smtpd.DEBUGSTREAM.getvalue()
544        sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
545        self.assertRegex(debugout, sender)
546        for addr in ('John', 'Dinsdale'):
547            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
548                                 re.MULTILINE)
549            self.assertNotRegex(debugout, to_addr)
550        recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
551        self.assertRegex(debugout, recip)
552
553    def testSendMessageWithMultipleFrom(self):
554        # Sender overrides To
555        m = email.mime.text.MIMEText('A test message')
556        m['From'] = 'Bernard, Bianca'
557        m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
558        m['To'] = 'John, Dinsdale'
559        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
560                            timeout=support.LOOPBACK_TIMEOUT)
561        self.addCleanup(smtp.close)
562        smtp.send_message(m)
563        # XXX (see comment in testSend)
564        time.sleep(0.01)
565        smtp.quit()
566
567        self.client_evt.set()
568        self.serv_evt.wait()
569        self.output.flush()
570        # Remove the X-Peer header that DebuggingServer adds.
571        test_output = self.get_output_without_xpeer()
572        del m['X-Peer']
573        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
574        self.assertEqual(test_output, mexpect)
575        debugout = smtpd.DEBUGSTREAM.getvalue()
576        sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE)
577        self.assertRegex(debugout, sender)
578        for addr in ('John', 'Dinsdale'):
579            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
580                                 re.MULTILINE)
581            self.assertRegex(debugout, to_addr)
582
583    def testSendMessageResent(self):
584        m = email.mime.text.MIMEText('A test message')
585        m['From'] = 'foo@bar.com'
586        m['To'] = 'John'
587        m['CC'] = 'Sally, Fred'
588        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
589        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
590        m['Resent-From'] = 'holy@grail.net'
591        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
592        m['Resent-Bcc'] = 'doe@losthope.net'
593        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
594                            timeout=support.LOOPBACK_TIMEOUT)
595        self.addCleanup(smtp.close)
596        smtp.send_message(m)
597        # XXX (see comment in testSend)
598        time.sleep(0.01)
599        smtp.quit()
600
601        self.client_evt.set()
602        self.serv_evt.wait()
603        self.output.flush()
604        # The Resent-Bcc headers are deleted before serialization.
605        del m['Bcc']
606        del m['Resent-Bcc']
607        # Remove the X-Peer header that DebuggingServer adds.
608        test_output = self.get_output_without_xpeer()
609        del m['X-Peer']
610        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
611        self.assertEqual(test_output, mexpect)
612        debugout = smtpd.DEBUGSTREAM.getvalue()
613        sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
614        self.assertRegex(debugout, sender)
615        for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
616            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
617                                 re.MULTILINE)
618            self.assertRegex(debugout, to_addr)
619
620    def testSendMessageMultipleResentRaises(self):
621        m = email.mime.text.MIMEText('A test message')
622        m['From'] = 'foo@bar.com'
623        m['To'] = 'John'
624        m['CC'] = 'Sally, Fred'
625        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
626        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
627        m['Resent-From'] = 'holy@grail.net'
628        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
629        m['Resent-Bcc'] = 'doe@losthope.net'
630        m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
631        m['Resent-To'] = 'holy@grail.net'
632        m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
633        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
634                            timeout=support.LOOPBACK_TIMEOUT)
635        self.addCleanup(smtp.close)
636        with self.assertRaises(ValueError):
637            smtp.send_message(m)
638        smtp.close()
639
640class NonConnectingTests(unittest.TestCase):
641
642    def testNotConnected(self):
643        # Test various operations on an unconnected SMTP object that
644        # should raise exceptions (at present the attempt in SMTP.send
645        # to reference the nonexistent 'sock' attribute of the SMTP object
646        # causes an AttributeError)
647        smtp = smtplib.SMTP()
648        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
649        self.assertRaises(smtplib.SMTPServerDisconnected,
650                          smtp.send, 'test msg')
651
652    def testNonnumericPort(self):
653        # check that non-numeric port raises OSError
654        self.assertRaises(OSError, smtplib.SMTP,
655                          "localhost", "bogus")
656        self.assertRaises(OSError, smtplib.SMTP,
657                          "localhost:bogus")
658
659    def testSockAttributeExists(self):
660        # check that sock attribute is present outside of a connect() call
661        # (regression test, the previous behavior raised an
662        #  AttributeError: 'SMTP' object has no attribute 'sock')
663        with smtplib.SMTP() as smtp:
664            self.assertIsNone(smtp.sock)
665
666
667class DefaultArgumentsTests(unittest.TestCase):
668
669    def setUp(self):
670        self.msg = EmailMessage()
671        self.msg['From'] = 'Páolo <főo@bar.com>'
672        self.smtp = smtplib.SMTP()
673        self.smtp.ehlo = Mock(return_value=(200, 'OK'))
674        self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock()
675
676    def testSendMessage(self):
677        expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME')
678        self.smtp.send_message(self.msg)
679        self.smtp.send_message(self.msg)
680        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
681                         expected_mail_options)
682        self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3],
683                         expected_mail_options)
684
685    def testSendMessageWithMailOptions(self):
686        mail_options = ['STARTTLS']
687        expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME')
688        self.smtp.send_message(self.msg, None, None, mail_options)
689        self.assertEqual(mail_options, ['STARTTLS'])
690        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
691                         expected_mail_options)
692
693
694# test response of client to a non-successful HELO message
695class BadHELOServerTests(unittest.TestCase):
696
697    def setUp(self):
698        smtplib.socket = mock_socket
699        mock_socket.reply_with(b"199 no hello for you!")
700        self.old_stdout = sys.stdout
701        self.output = io.StringIO()
702        sys.stdout = self.output
703        self.port = 25
704
705    def tearDown(self):
706        smtplib.socket = socket
707        sys.stdout = self.old_stdout
708
709    def testFailingHELO(self):
710        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
711                            HOST, self.port, 'localhost', 3)
712
713
714class TooLongLineTests(unittest.TestCase):
715    respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
716
717    def setUp(self):
718        self.thread_key = threading_setup()
719        self.old_stdout = sys.stdout
720        self.output = io.StringIO()
721        sys.stdout = self.output
722
723        self.evt = threading.Event()
724        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
725        self.sock.settimeout(15)
726        self.port = socket_helper.bind_port(self.sock)
727        servargs = (self.evt, self.respdata, self.sock)
728        self.thread = threading.Thread(target=server, args=servargs)
729        self.thread.start()
730        self.evt.wait()
731        self.evt.clear()
732
733    def tearDown(self):
734        self.evt.wait()
735        sys.stdout = self.old_stdout
736        join_thread(self.thread)
737        del self.thread
738        self.doCleanups()
739        threading_cleanup(*self.thread_key)
740
741    def testLineTooLong(self):
742        self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
743                          HOST, self.port, 'localhost', 3)
744
745
746sim_users = {'Mr.A@somewhere.com':'John A',
747             'Ms.B@xn--fo-fka.com':'Sally B',
748             'Mrs.C@somewhereesle.com':'Ruth C',
749            }
750
751sim_auth = ('Mr.A@somewhere.com', 'somepassword')
752sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
753                          'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
754sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
755             'list-2':['Ms.B@xn--fo-fka.com',],
756            }
757
758# Simulated SMTP channel & server
759class ResponseException(Exception): pass
760class SimSMTPChannel(smtpd.SMTPChannel):
761
762    quit_response = None
763    mail_response = None
764    rcpt_response = None
765    data_response = None
766    rcpt_count = 0
767    rset_count = 0
768    disconnect = 0
769    AUTH = 99    # Add protocol state to enable auth testing.
770    authenticated_user = None
771
772    def __init__(self, extra_features, *args, **kw):
773        self._extrafeatures = ''.join(
774            [ "250-{0}\r\n".format(x) for x in extra_features ])
775        super(SimSMTPChannel, self).__init__(*args, **kw)
776
777    # AUTH related stuff.  It would be nice if support for this were in smtpd.
778    def found_terminator(self):
779        if self.smtp_state == self.AUTH:
780            line = self._emptystring.join(self.received_lines)
781            print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
782            self.received_lines = []
783            try:
784                self.auth_object(line)
785            except ResponseException as e:
786                self.smtp_state = self.COMMAND
787                self.push('%s %s' % (e.smtp_code, e.smtp_error))
788                return
789        super().found_terminator()
790
791
792    def smtp_AUTH(self, arg):
793        if not self.seen_greeting:
794            self.push('503 Error: send EHLO first')
795            return
796        if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
797            self.push('500 Error: command "AUTH" not recognized')
798            return
799        if self.authenticated_user is not None:
800            self.push(
801                '503 Bad sequence of commands: already authenticated')
802            return
803        args = arg.split()
804        if len(args) not in [1, 2]:
805            self.push('501 Syntax: AUTH <mechanism> [initial-response]')
806            return
807        auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
808        try:
809            self.auth_object = getattr(self, auth_object_name)
810        except AttributeError:
811            self.push('504 Command parameter not implemented: unsupported '
812                      ' authentication mechanism {!r}'.format(auth_object_name))
813            return
814        self.smtp_state = self.AUTH
815        self.auth_object(args[1] if len(args) == 2 else None)
816
817    def _authenticated(self, user, valid):
818        if valid:
819            self.authenticated_user = user
820            self.push('235 Authentication Succeeded')
821        else:
822            self.push('535 Authentication credentials invalid')
823        self.smtp_state = self.COMMAND
824
825    def _decode_base64(self, string):
826        return base64.decodebytes(string.encode('ascii')).decode('utf-8')
827
828    def _auth_plain(self, arg=None):
829        if arg is None:
830            self.push('334 ')
831        else:
832            logpass = self._decode_base64(arg)
833            try:
834                *_, user, password = logpass.split('\0')
835            except ValueError as e:
836                self.push('535 Splitting response {!r} into user and password'
837                          ' failed: {}'.format(logpass, e))
838                return
839            self._authenticated(user, password == sim_auth[1])
840
841    def _auth_login(self, arg=None):
842        if arg is None:
843            # base64 encoded 'Username:'
844            self.push('334 VXNlcm5hbWU6')
845        elif not hasattr(self, '_auth_login_user'):
846            self._auth_login_user = self._decode_base64(arg)
847            # base64 encoded 'Password:'
848            self.push('334 UGFzc3dvcmQ6')
849        else:
850            password = self._decode_base64(arg)
851            self._authenticated(self._auth_login_user, password == sim_auth[1])
852            del self._auth_login_user
853
854    def _auth_cram_md5(self, arg=None):
855        if arg is None:
856            self.push('334 {}'.format(sim_cram_md5_challenge))
857        else:
858            logpass = self._decode_base64(arg)
859            try:
860                user, hashed_pass = logpass.split()
861            except ValueError as e:
862                self.push('535 Splitting response {!r} into user and password '
863                          'failed: {}'.format(logpass, e))
864                return False
865            valid_hashed_pass = hmac.HMAC(
866                sim_auth[1].encode('ascii'),
867                self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
868                'md5').hexdigest()
869            self._authenticated(user, hashed_pass == valid_hashed_pass)
870    # end AUTH related stuff.
871
872    def smtp_EHLO(self, arg):
873        resp = ('250-testhost\r\n'
874                '250-EXPN\r\n'
875                '250-SIZE 20000000\r\n'
876                '250-STARTTLS\r\n'
877                '250-DELIVERBY\r\n')
878        resp = resp + self._extrafeatures + '250 HELP'
879        self.push(resp)
880        self.seen_greeting = arg
881        self.extended_smtp = True
882
883    def smtp_VRFY(self, arg):
884        # For max compatibility smtplib should be sending the raw address.
885        if arg in sim_users:
886            self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
887        else:
888            self.push('550 No such user: %s' % arg)
889
890    def smtp_EXPN(self, arg):
891        list_name = arg.lower()
892        if list_name in sim_lists:
893            user_list = sim_lists[list_name]
894            for n, user_email in enumerate(user_list):
895                quoted_addr = smtplib.quoteaddr(user_email)
896                if n < len(user_list) - 1:
897                    self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
898                else:
899                    self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
900        else:
901            self.push('550 No access for you!')
902
903    def smtp_QUIT(self, arg):
904        if self.quit_response is None:
905            super(SimSMTPChannel, self).smtp_QUIT(arg)
906        else:
907            self.push(self.quit_response)
908            self.close_when_done()
909
910    def smtp_MAIL(self, arg):
911        if self.mail_response is None:
912            super().smtp_MAIL(arg)
913        else:
914            self.push(self.mail_response)
915            if self.disconnect:
916                self.close_when_done()
917
918    def smtp_RCPT(self, arg):
919        if self.rcpt_response is None:
920            super().smtp_RCPT(arg)
921            return
922        self.rcpt_count += 1
923        self.push(self.rcpt_response[self.rcpt_count-1])
924
925    def smtp_RSET(self, arg):
926        self.rset_count += 1
927        super().smtp_RSET(arg)
928
929    def smtp_DATA(self, arg):
930        if self.data_response is None:
931            super().smtp_DATA(arg)
932        else:
933            self.push(self.data_response)
934
935    def handle_error(self):
936        raise
937
938
939class SimSMTPServer(smtpd.SMTPServer):
940
941    channel_class = SimSMTPChannel
942
943    def __init__(self, *args, **kw):
944        self._extra_features = []
945        self._addresses = {}
946        smtpd.SMTPServer.__init__(self, *args, **kw)
947
948    def handle_accepted(self, conn, addr):
949        self._SMTPchannel = self.channel_class(
950            self._extra_features, self, conn, addr,
951            decode_data=self._decode_data)
952
953    def process_message(self, peer, mailfrom, rcpttos, data):
954        self._addresses['from'] = mailfrom
955        self._addresses['tos'] = rcpttos
956
957    def add_feature(self, feature):
958        self._extra_features.append(feature)
959
960    def handle_error(self):
961        raise
962
963
964# Test various SMTP & ESMTP commands/behaviors that require a simulated server
965# (i.e., something with more features than DebuggingServer)
966class SMTPSimTests(unittest.TestCase):
967
968    def setUp(self):
969        self.thread_key = threading_setup()
970        self.real_getfqdn = socket.getfqdn
971        socket.getfqdn = mock_socket.getfqdn
972        self.serv_evt = threading.Event()
973        self.client_evt = threading.Event()
974        # Pick a random unused port by passing 0 for the port number
975        self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
976        # Keep a note of what port was assigned
977        self.port = self.serv.socket.getsockname()[1]
978        serv_args = (self.serv, self.serv_evt, self.client_evt)
979        self.thread = threading.Thread(target=debugging_server, args=serv_args)
980        self.thread.start()
981
982        # wait until server thread has assigned a port number
983        self.serv_evt.wait()
984        self.serv_evt.clear()
985
986    def tearDown(self):
987        socket.getfqdn = self.real_getfqdn
988        # indicate that the client is finished
989        self.client_evt.set()
990        # wait for the server thread to terminate
991        self.serv_evt.wait()
992        join_thread(self.thread)
993        del self.thread
994        self.doCleanups()
995        threading_cleanup(*self.thread_key)
996
997    def testBasic(self):
998        # smoke test
999        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1000                            timeout=support.LOOPBACK_TIMEOUT)
1001        smtp.quit()
1002
1003    def testEHLO(self):
1004        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1005                            timeout=support.LOOPBACK_TIMEOUT)
1006
1007        # no features should be present before the EHLO
1008        self.assertEqual(smtp.esmtp_features, {})
1009
1010        # features expected from the test server
1011        expected_features = {'expn':'',
1012                             'size': '20000000',
1013                             'starttls': '',
1014                             'deliverby': '',
1015                             'help': '',
1016                             }
1017
1018        smtp.ehlo()
1019        self.assertEqual(smtp.esmtp_features, expected_features)
1020        for k in expected_features:
1021            self.assertTrue(smtp.has_extn(k))
1022        self.assertFalse(smtp.has_extn('unsupported-feature'))
1023        smtp.quit()
1024
1025    def testVRFY(self):
1026        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1027                            timeout=support.LOOPBACK_TIMEOUT)
1028
1029        for addr_spec, name in sim_users.items():
1030            expected_known = (250, bytes('%s %s' %
1031                                         (name, smtplib.quoteaddr(addr_spec)),
1032                                         "ascii"))
1033            self.assertEqual(smtp.vrfy(addr_spec), expected_known)
1034
1035        u = 'nobody@nowhere.com'
1036        expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
1037        self.assertEqual(smtp.vrfy(u), expected_unknown)
1038        smtp.quit()
1039
1040    def testEXPN(self):
1041        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1042                            timeout=support.LOOPBACK_TIMEOUT)
1043
1044        for listname, members in sim_lists.items():
1045            users = []
1046            for m in members:
1047                users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
1048            expected_known = (250, bytes('\n'.join(users), "ascii"))
1049            self.assertEqual(smtp.expn(listname), expected_known)
1050
1051        u = 'PSU-Members-List'
1052        expected_unknown = (550, b'No access for you!')
1053        self.assertEqual(smtp.expn(u), expected_unknown)
1054        smtp.quit()
1055
1056    def testAUTH_PLAIN(self):
1057        self.serv.add_feature("AUTH PLAIN")
1058        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1059                            timeout=support.LOOPBACK_TIMEOUT)
1060        resp = smtp.login(sim_auth[0], sim_auth[1])
1061        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1062        smtp.close()
1063
1064    def testAUTH_LOGIN(self):
1065        self.serv.add_feature("AUTH LOGIN")
1066        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1067                            timeout=support.LOOPBACK_TIMEOUT)
1068        resp = smtp.login(sim_auth[0], sim_auth[1])
1069        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1070        smtp.close()
1071
1072    @hashlib_helper.requires_hashdigest('md5')
1073    def testAUTH_CRAM_MD5(self):
1074        self.serv.add_feature("AUTH CRAM-MD5")
1075        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1076                            timeout=support.LOOPBACK_TIMEOUT)
1077        resp = smtp.login(sim_auth[0], sim_auth[1])
1078        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1079        smtp.close()
1080
1081    @hashlib_helper.requires_hashdigest('md5')
1082    def testAUTH_multiple(self):
1083        # Test that multiple authentication methods are tried.
1084        self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
1085        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1086                            timeout=support.LOOPBACK_TIMEOUT)
1087        resp = smtp.login(sim_auth[0], sim_auth[1])
1088        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1089        smtp.close()
1090
1091    def test_auth_function(self):
1092        supported = {'PLAIN', 'LOGIN'}
1093        try:
1094            hashlib.md5()
1095        except ValueError:
1096            pass
1097        else:
1098            supported.add('CRAM-MD5')
1099        for mechanism in supported:
1100            self.serv.add_feature("AUTH {}".format(mechanism))
1101        for mechanism in supported:
1102            with self.subTest(mechanism=mechanism):
1103                smtp = smtplib.SMTP(HOST, self.port,
1104                                    local_hostname='localhost',
1105                                    timeout=support.LOOPBACK_TIMEOUT)
1106                smtp.ehlo('foo')
1107                smtp.user, smtp.password = sim_auth[0], sim_auth[1]
1108                method = 'auth_' + mechanism.lower().replace('-', '_')
1109                resp = smtp.auth(mechanism, getattr(smtp, method))
1110                self.assertEqual(resp, (235, b'Authentication Succeeded'))
1111                smtp.close()
1112
1113    def test_quit_resets_greeting(self):
1114        smtp = smtplib.SMTP(HOST, self.port,
1115                            local_hostname='localhost',
1116                            timeout=support.LOOPBACK_TIMEOUT)
1117        code, message = smtp.ehlo()
1118        self.assertEqual(code, 250)
1119        self.assertIn('size', smtp.esmtp_features)
1120        smtp.quit()
1121        self.assertNotIn('size', smtp.esmtp_features)
1122        smtp.connect(HOST, self.port)
1123        self.assertNotIn('size', smtp.esmtp_features)
1124        smtp.ehlo_or_helo_if_needed()
1125        self.assertIn('size', smtp.esmtp_features)
1126        smtp.quit()
1127
1128    def test_with_statement(self):
1129        with smtplib.SMTP(HOST, self.port) as smtp:
1130            code, message = smtp.noop()
1131            self.assertEqual(code, 250)
1132        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1133        with smtplib.SMTP(HOST, self.port) as smtp:
1134            smtp.close()
1135        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1136
1137    def test_with_statement_QUIT_failure(self):
1138        with self.assertRaises(smtplib.SMTPResponseException) as error:
1139            with smtplib.SMTP(HOST, self.port) as smtp:
1140                smtp.noop()
1141                self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
1142        self.assertEqual(error.exception.smtp_code, 421)
1143        self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
1144
1145    #TODO: add tests for correct AUTH method fallback now that the
1146    #test infrastructure can support it.
1147
1148    # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
1149    def test__rest_from_mail_cmd(self):
1150        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1151                            timeout=support.LOOPBACK_TIMEOUT)
1152        smtp.noop()
1153        self.serv._SMTPchannel.mail_response = '451 Requested action aborted'
1154        self.serv._SMTPchannel.disconnect = True
1155        with self.assertRaises(smtplib.SMTPSenderRefused):
1156            smtp.sendmail('John', 'Sally', 'test message')
1157        self.assertIsNone(smtp.sock)
1158
1159    # Issue 5713: make sure close, not rset, is called if we get a 421 error
1160    def test_421_from_mail_cmd(self):
1161        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1162                            timeout=support.LOOPBACK_TIMEOUT)
1163        smtp.noop()
1164        self.serv._SMTPchannel.mail_response = '421 closing connection'
1165        with self.assertRaises(smtplib.SMTPSenderRefused):
1166            smtp.sendmail('John', 'Sally', 'test message')
1167        self.assertIsNone(smtp.sock)
1168        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1169
1170    def test_421_from_rcpt_cmd(self):
1171        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1172                            timeout=support.LOOPBACK_TIMEOUT)
1173        smtp.noop()
1174        self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing']
1175        with self.assertRaises(smtplib.SMTPRecipientsRefused) as r:
1176            smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message')
1177        self.assertIsNone(smtp.sock)
1178        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1179        self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')})
1180
1181    def test_421_from_data_cmd(self):
1182        class MySimSMTPChannel(SimSMTPChannel):
1183            def found_terminator(self):
1184                if self.smtp_state == self.DATA:
1185                    self.push('421 closing')
1186                else:
1187                    super().found_terminator()
1188        self.serv.channel_class = MySimSMTPChannel
1189        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1190                            timeout=support.LOOPBACK_TIMEOUT)
1191        smtp.noop()
1192        with self.assertRaises(smtplib.SMTPDataError):
1193            smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message')
1194        self.assertIsNone(smtp.sock)
1195        self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
1196
1197    def test_smtputf8_NotSupportedError_if_no_server_support(self):
1198        smtp = smtplib.SMTP(
1199            HOST, self.port, local_hostname='localhost',
1200            timeout=support.LOOPBACK_TIMEOUT)
1201        self.addCleanup(smtp.close)
1202        smtp.ehlo()
1203        self.assertTrue(smtp.does_esmtp)
1204        self.assertFalse(smtp.has_extn('smtputf8'))
1205        self.assertRaises(
1206            smtplib.SMTPNotSupportedError,
1207            smtp.sendmail,
1208            'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1209        self.assertRaises(
1210            smtplib.SMTPNotSupportedError,
1211            smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
1212
1213    def test_send_unicode_without_SMTPUTF8(self):
1214        smtp = smtplib.SMTP(
1215            HOST, self.port, local_hostname='localhost',
1216            timeout=support.LOOPBACK_TIMEOUT)
1217        self.addCleanup(smtp.close)
1218        self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
1219        self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
1220
1221    def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
1222        # This test is located here and not in the SMTPUTF8SimTests
1223        # class because it needs a "regular" SMTP server to work
1224        msg = EmailMessage()
1225        msg['From'] = "Páolo <főo@bar.com>"
1226        msg['To'] = 'Dinsdale'
1227        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1228        smtp = smtplib.SMTP(
1229            HOST, self.port, local_hostname='localhost',
1230            timeout=support.LOOPBACK_TIMEOUT)
1231        self.addCleanup(smtp.close)
1232        with self.assertRaises(smtplib.SMTPNotSupportedError):
1233            smtp.send_message(msg)
1234
1235    def test_name_field_not_included_in_envelop_addresses(self):
1236        smtp = smtplib.SMTP(
1237            HOST, self.port, local_hostname='localhost',
1238            timeout=support.LOOPBACK_TIMEOUT)
1239        self.addCleanup(smtp.close)
1240
1241        message = EmailMessage()
1242        message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com'))
1243        message['To'] = email.utils.formataddr(('René', 'rene@example.com'))
1244
1245        self.assertDictEqual(smtp.send_message(message), {})
1246
1247        self.assertEqual(self.serv._addresses['from'], 'michael@example.com')
1248        self.assertEqual(self.serv._addresses['tos'], ['rene@example.com'])
1249
1250
1251class SimSMTPUTF8Server(SimSMTPServer):
1252
1253    def __init__(self, *args, **kw):
1254        # The base SMTP server turns these on automatically, but our test
1255        # server is set up to munge the EHLO response, so we need to provide
1256        # them as well.  And yes, the call is to SMTPServer not SimSMTPServer.
1257        self._extra_features = ['SMTPUTF8', '8BITMIME']
1258        smtpd.SMTPServer.__init__(self, *args, **kw)
1259
1260    def handle_accepted(self, conn, addr):
1261        self._SMTPchannel = self.channel_class(
1262            self._extra_features, self, conn, addr,
1263            decode_data=self._decode_data,
1264            enable_SMTPUTF8=self.enable_SMTPUTF8,
1265        )
1266
1267    def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
1268                                                             rcpt_options=None):
1269        self.last_peer = peer
1270        self.last_mailfrom = mailfrom
1271        self.last_rcpttos = rcpttos
1272        self.last_message = data
1273        self.last_mail_options = mail_options
1274        self.last_rcpt_options = rcpt_options
1275
1276
1277class SMTPUTF8SimTests(unittest.TestCase):
1278
1279    maxDiff = None
1280
1281    def setUp(self):
1282        self.thread_key = threading_setup()
1283        self.real_getfqdn = socket.getfqdn
1284        socket.getfqdn = mock_socket.getfqdn
1285        self.serv_evt = threading.Event()
1286        self.client_evt = threading.Event()
1287        # Pick a random unused port by passing 0 for the port number
1288        self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
1289                                      decode_data=False,
1290                                      enable_SMTPUTF8=True)
1291        # Keep a note of what port was assigned
1292        self.port = self.serv.socket.getsockname()[1]
1293        serv_args = (self.serv, self.serv_evt, self.client_evt)
1294        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1295        self.thread.start()
1296
1297        # wait until server thread has assigned a port number
1298        self.serv_evt.wait()
1299        self.serv_evt.clear()
1300
1301    def tearDown(self):
1302        socket.getfqdn = self.real_getfqdn
1303        # indicate that the client is finished
1304        self.client_evt.set()
1305        # wait for the server thread to terminate
1306        self.serv_evt.wait()
1307        join_thread(self.thread)
1308        del self.thread
1309        self.doCleanups()
1310        threading_cleanup(*self.thread_key)
1311
1312    def test_test_server_supports_extensions(self):
1313        smtp = smtplib.SMTP(
1314            HOST, self.port, local_hostname='localhost',
1315            timeout=support.LOOPBACK_TIMEOUT)
1316        self.addCleanup(smtp.close)
1317        smtp.ehlo()
1318        self.assertTrue(smtp.does_esmtp)
1319        self.assertTrue(smtp.has_extn('smtputf8'))
1320
1321    def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
1322        m = '¡a test message containing unicode!'.encode('utf-8')
1323        smtp = smtplib.SMTP(
1324            HOST, self.port, local_hostname='localhost',
1325            timeout=support.LOOPBACK_TIMEOUT)
1326        self.addCleanup(smtp.close)
1327        smtp.sendmail('Jőhn', 'Sálly', m,
1328                      mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1329        self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
1330        self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
1331        self.assertEqual(self.serv.last_message, m)
1332        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1333        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1334        self.assertEqual(self.serv.last_rcpt_options, [])
1335
1336    def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
1337        m = '¡a test message containing unicode!'.encode('utf-8')
1338        smtp = smtplib.SMTP(
1339            HOST, self.port, local_hostname='localhost',
1340            timeout=support.LOOPBACK_TIMEOUT)
1341        self.addCleanup(smtp.close)
1342        smtp.ehlo()
1343        self.assertEqual(
1344            smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']),
1345            (250, b'OK'))
1346        self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
1347        self.assertEqual(smtp.data(m), (250, b'OK'))
1348        self.assertEqual(self.serv.last_mailfrom, 'Jő')
1349        self.assertEqual(self.serv.last_rcpttos, ['János'])
1350        self.assertEqual(self.serv.last_message, m)
1351        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1352        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1353        self.assertEqual(self.serv.last_rcpt_options, [])
1354
1355    def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
1356        msg = EmailMessage()
1357        msg['From'] = "Páolo <főo@bar.com>"
1358        msg['To'] = 'Dinsdale'
1359        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1360        # XXX I don't know why I need two \n's here, but this is an existing
1361        # bug (if it is one) and not a problem with the new functionality.
1362        msg.set_content("oh là là, know what I mean, know what I mean?\n\n")
1363        # XXX smtpd converts received /r/n to /n, so we can't easily test that
1364        # we are successfully sending /r/n :(.
1365        expected = textwrap.dedent("""\
1366            From: Páolo <főo@bar.com>
1367            To: Dinsdale
1368            Subject: Nudge nudge, wink, wink \u1F609
1369            Content-Type: text/plain; charset="utf-8"
1370            Content-Transfer-Encoding: 8bit
1371            MIME-Version: 1.0
1372
1373            oh là là, know what I mean, know what I mean?
1374            """)
1375        smtp = smtplib.SMTP(
1376            HOST, self.port, local_hostname='localhost',
1377            timeout=support.LOOPBACK_TIMEOUT)
1378        self.addCleanup(smtp.close)
1379        self.assertEqual(smtp.send_message(msg), {})
1380        self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com')
1381        self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
1382        self.assertEqual(self.serv.last_message.decode(), expected)
1383        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1384        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1385        self.assertEqual(self.serv.last_rcpt_options, [])
1386
1387
1388EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
1389
1390class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel):
1391    def smtp_AUTH(self, arg):
1392        # RFC 4954's AUTH command allows for an optional initial-response.
1393        # Not all AUTH methods support this; some require a challenge.  AUTH
1394        # PLAIN does those, so test that here.  See issue #15014.
1395        args = arg.split()
1396        if args[0].lower() == 'plain':
1397            if len(args) == 2:
1398                # AUTH PLAIN <initial-response> with the response base 64
1399                # encoded.  Hard code the expected response for the test.
1400                if args[1] == EXPECTED_RESPONSE:
1401                    self.push('235 Ok')
1402                    return
1403        self.push('571 Bad authentication')
1404
1405class SimSMTPAUTHInitialResponseServer(SimSMTPServer):
1406    channel_class = SimSMTPAUTHInitialResponseChannel
1407
1408
1409class SMTPAUTHInitialResponseSimTests(unittest.TestCase):
1410    def setUp(self):
1411        self.thread_key = threading_setup()
1412        self.real_getfqdn = socket.getfqdn
1413        socket.getfqdn = mock_socket.getfqdn
1414        self.serv_evt = threading.Event()
1415        self.client_evt = threading.Event()
1416        # Pick a random unused port by passing 0 for the port number
1417        self.serv = SimSMTPAUTHInitialResponseServer(
1418            (HOST, 0), ('nowhere', -1), decode_data=True)
1419        # Keep a note of what port was assigned
1420        self.port = self.serv.socket.getsockname()[1]
1421        serv_args = (self.serv, self.serv_evt, self.client_evt)
1422        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1423        self.thread.start()
1424
1425        # wait until server thread has assigned a port number
1426        self.serv_evt.wait()
1427        self.serv_evt.clear()
1428
1429    def tearDown(self):
1430        socket.getfqdn = self.real_getfqdn
1431        # indicate that the client is finished
1432        self.client_evt.set()
1433        # wait for the server thread to terminate
1434        self.serv_evt.wait()
1435        join_thread(self.thread)
1436        del self.thread
1437        self.doCleanups()
1438        threading_cleanup(*self.thread_key)
1439
1440    def testAUTH_PLAIN_initial_response_login(self):
1441        self.serv.add_feature('AUTH PLAIN')
1442        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1443                            timeout=support.LOOPBACK_TIMEOUT)
1444        smtp.login('psu', 'doesnotexist')
1445        smtp.close()
1446
1447    def testAUTH_PLAIN_initial_response_auth(self):
1448        self.serv.add_feature('AUTH PLAIN')
1449        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1450                            timeout=support.LOOPBACK_TIMEOUT)
1451        smtp.user = 'psu'
1452        smtp.password = 'doesnotexist'
1453        code, response = smtp.auth('plain', smtp.auth_plain)
1454        smtp.close()
1455        self.assertEqual(code, 235)
1456
1457
1458if __name__ == '__main__':
1459    unittest.main()
1460