• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import asyncore
2import base64
3import email.mime.text
4from email.message import EmailMessage
5from email.base64mime import body_encode as encode_base64
6import email.utils
7import hmac
8import socket
9import smtpd
10import smtplib
11import io
12import re
13import sys
14import time
15import select
16import errno
17import textwrap
18
19import unittest
20from test import support, mock_socket
21
22try:
23    import threading
24except ImportError:
25    threading = None
26
27HOST = support.HOST
28
29if sys.platform == 'darwin':
30    # select.poll returns a select.POLLHUP at the end of the tests
31    # on darwin, so just ignore it
32    def handle_expt(self):
33        pass
34    smtpd.SMTPChannel.handle_expt = handle_expt
35
36
37def server(evt, buf, serv):
38    serv.listen()
39    evt.set()
40    try:
41        conn, addr = serv.accept()
42    except socket.timeout:
43        pass
44    else:
45        n = 500
46        while buf and n > 0:
47            r, w, e = select.select([], [conn], [])
48            if w:
49                sent = conn.send(buf)
50                buf = buf[sent:]
51
52            n -= 1
53
54        conn.close()
55    finally:
56        serv.close()
57        evt.set()
58
59class GeneralTests(unittest.TestCase):
60
61    def setUp(self):
62        smtplib.socket = mock_socket
63        self.port = 25
64
65    def tearDown(self):
66        smtplib.socket = socket
67
68    # This method is no longer used but is retained for backward compatibility,
69    # so test to make sure it still works.
70    def testQuoteData(self):
71        teststr  = "abc\n.jkl\rfoo\r\n..blue"
72        expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
73        self.assertEqual(expected, smtplib.quotedata(teststr))
74
75    def testBasic1(self):
76        mock_socket.reply_with(b"220 Hola mundo")
77        # connects
78        smtp = smtplib.SMTP(HOST, self.port)
79        smtp.close()
80
81    def testSourceAddress(self):
82        mock_socket.reply_with(b"220 Hola mundo")
83        # connects
84        smtp = smtplib.SMTP(HOST, self.port,
85                source_address=('127.0.0.1',19876))
86        self.assertEqual(smtp.source_address, ('127.0.0.1', 19876))
87        smtp.close()
88
89    def testBasic2(self):
90        mock_socket.reply_with(b"220 Hola mundo")
91        # connects, include port in host name
92        smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))
93        smtp.close()
94
95    def testLocalHostName(self):
96        mock_socket.reply_with(b"220 Hola mundo")
97        # check that supplied local_hostname is used
98        smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")
99        self.assertEqual(smtp.local_hostname, "testhost")
100        smtp.close()
101
102    def testTimeoutDefault(self):
103        mock_socket.reply_with(b"220 Hola mundo")
104        self.assertIsNone(mock_socket.getdefaulttimeout())
105        mock_socket.setdefaulttimeout(30)
106        self.assertEqual(mock_socket.getdefaulttimeout(), 30)
107        try:
108            smtp = smtplib.SMTP(HOST, self.port)
109        finally:
110            mock_socket.setdefaulttimeout(None)
111        self.assertEqual(smtp.sock.gettimeout(), 30)
112        smtp.close()
113
114    def testTimeoutNone(self):
115        mock_socket.reply_with(b"220 Hola mundo")
116        self.assertIsNone(socket.getdefaulttimeout())
117        socket.setdefaulttimeout(30)
118        try:
119            smtp = smtplib.SMTP(HOST, self.port, timeout=None)
120        finally:
121            socket.setdefaulttimeout(None)
122        self.assertIsNone(smtp.sock.gettimeout())
123        smtp.close()
124
125    def testTimeoutValue(self):
126        mock_socket.reply_with(b"220 Hola mundo")
127        smtp = smtplib.SMTP(HOST, self.port, timeout=30)
128        self.assertEqual(smtp.sock.gettimeout(), 30)
129        smtp.close()
130
131    def test_debuglevel(self):
132        mock_socket.reply_with(b"220 Hello world")
133        smtp = smtplib.SMTP()
134        smtp.set_debuglevel(1)
135        with support.captured_stderr() as stderr:
136            smtp.connect(HOST, self.port)
137        smtp.close()
138        expected = re.compile(r"^connect:", re.MULTILINE)
139        self.assertRegex(stderr.getvalue(), expected)
140
141    def test_debuglevel_2(self):
142        mock_socket.reply_with(b"220 Hello world")
143        smtp = smtplib.SMTP()
144        smtp.set_debuglevel(2)
145        with support.captured_stderr() as stderr:
146            smtp.connect(HOST, self.port)
147        smtp.close()
148        expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
149                              re.MULTILINE)
150        self.assertRegex(stderr.getvalue(), expected)
151
152
153# Test server thread using the specified SMTP server class
154def debugging_server(serv, serv_evt, client_evt):
155    serv_evt.set()
156
157    try:
158        if hasattr(select, 'poll'):
159            poll_fun = asyncore.poll2
160        else:
161            poll_fun = asyncore.poll
162
163        n = 1000
164        while asyncore.socket_map and n > 0:
165            poll_fun(0.01, asyncore.socket_map)
166
167            # when the client conversation is finished, it will
168            # set client_evt, and it's then ok to kill the server
169            if client_evt.is_set():
170                serv.close()
171                break
172
173            n -= 1
174
175    except socket.timeout:
176        pass
177    finally:
178        if not client_evt.is_set():
179            # allow some time for the client to read the result
180            time.sleep(0.5)
181            serv.close()
182        asyncore.close_all()
183        serv_evt.set()
184
185MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
186MSG_END = '------------ END MESSAGE ------------\n'
187
188# NOTE: Some SMTP objects in the tests below are created with a non-default
189# local_hostname argument to the constructor, since (on some systems) the FQDN
190# lookup caused by the default local_hostname sometimes takes so long that the
191# test server times out, causing the test to fail.
192
193# Test behavior of smtpd.DebuggingServer
194@unittest.skipUnless(threading, 'Threading required for this test.')
195class DebuggingServerTests(unittest.TestCase):
196
197    maxDiff = None
198
199    def setUp(self):
200        self.real_getfqdn = socket.getfqdn
201        socket.getfqdn = mock_socket.getfqdn
202        # temporarily replace sys.stdout to capture DebuggingServer output
203        self.old_stdout = sys.stdout
204        self.output = io.StringIO()
205        sys.stdout = self.output
206
207        self.serv_evt = threading.Event()
208        self.client_evt = threading.Event()
209        # Capture SMTPChannel debug output
210        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
211        smtpd.DEBUGSTREAM = io.StringIO()
212        # Pick a random unused port by passing 0 for the port number
213        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
214                                          decode_data=True)
215        # Keep a note of what port was assigned
216        self.port = self.serv.socket.getsockname()[1]
217        serv_args = (self.serv, self.serv_evt, self.client_evt)
218        self.thread = threading.Thread(target=debugging_server, args=serv_args)
219        self.thread.start()
220
221        # wait until server thread has assigned a port number
222        self.serv_evt.wait()
223        self.serv_evt.clear()
224
225    def tearDown(self):
226        socket.getfqdn = self.real_getfqdn
227        # indicate that the client is finished
228        self.client_evt.set()
229        # wait for the server thread to terminate
230        self.serv_evt.wait()
231        self.thread.join()
232        # restore sys.stdout
233        sys.stdout = self.old_stdout
234        # restore DEBUGSTREAM
235        smtpd.DEBUGSTREAM.close()
236        smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
237
238    def testBasic(self):
239        # connect
240        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
241        smtp.quit()
242
243    def testSourceAddress(self):
244        # connect
245        port = support.find_unused_port()
246        try:
247            smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
248                    timeout=3, source_address=('127.0.0.1', port))
249            self.assertEqual(smtp.source_address, ('127.0.0.1', port))
250            self.assertEqual(smtp.local_hostname, 'localhost')
251            smtp.quit()
252        except OSError as e:
253            if e.errno == errno.EADDRINUSE:
254                self.skipTest("couldn't bind to port %d" % port)
255            raise
256
257    def testNOOP(self):
258        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
259        expected = (250, b'OK')
260        self.assertEqual(smtp.noop(), expected)
261        smtp.quit()
262
263    def testRSET(self):
264        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
265        expected = (250, b'OK')
266        self.assertEqual(smtp.rset(), expected)
267        smtp.quit()
268
269    def testELHO(self):
270        # EHLO isn't implemented in DebuggingServer
271        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
272        expected = (250, b'\nSIZE 33554432\nHELP')
273        self.assertEqual(smtp.ehlo(), expected)
274        smtp.quit()
275
276    def testEXPNNotImplemented(self):
277        # EXPN isn't implemented in DebuggingServer
278        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
279        expected = (502, b'EXPN not implemented')
280        smtp.putcmd('EXPN')
281        self.assertEqual(smtp.getreply(), expected)
282        smtp.quit()
283
284    def testVRFY(self):
285        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
286        expected = (252, b'Cannot VRFY user, but will accept message ' + \
287                         b'and attempt delivery')
288        self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
289        self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
290        smtp.quit()
291
292    def testSecondHELO(self):
293        # check that a second HELO returns a message that it's a duplicate
294        # (this behavior is specific to smtpd.SMTPChannel)
295        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
296        smtp.helo()
297        expected = (503, b'Duplicate HELO/EHLO')
298        self.assertEqual(smtp.helo(), expected)
299        smtp.quit()
300
301    def testHELP(self):
302        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
303        self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
304                                      b'RCPT DATA RSET NOOP QUIT VRFY')
305        smtp.quit()
306
307    def testSend(self):
308        # connect and send mail
309        m = 'A test message'
310        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
311        smtp.sendmail('John', 'Sally', m)
312        # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
313        # in asyncore.  This sleep might help, but should really be fixed
314        # properly by using an Event variable.
315        time.sleep(0.01)
316        smtp.quit()
317
318        self.client_evt.set()
319        self.serv_evt.wait()
320        self.output.flush()
321        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
322        self.assertEqual(self.output.getvalue(), mexpect)
323
324    def testSendBinary(self):
325        m = b'A test message'
326        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
327        smtp.sendmail('John', 'Sally', m)
328        # XXX (see comment in testSend)
329        time.sleep(0.01)
330        smtp.quit()
331
332        self.client_evt.set()
333        self.serv_evt.wait()
334        self.output.flush()
335        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
336        self.assertEqual(self.output.getvalue(), mexpect)
337
338    def testSendNeedingDotQuote(self):
339        # Issue 12283
340        m = '.A test\n.mes.sage.'
341        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
342        smtp.sendmail('John', 'Sally', m)
343        # XXX (see comment in testSend)
344        time.sleep(0.01)
345        smtp.quit()
346
347        self.client_evt.set()
348        self.serv_evt.wait()
349        self.output.flush()
350        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
351        self.assertEqual(self.output.getvalue(), mexpect)
352
353    def testSendNullSender(self):
354        m = 'A test message'
355        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
356        smtp.sendmail('<>', 'Sally', m)
357        # XXX (see comment in testSend)
358        time.sleep(0.01)
359        smtp.quit()
360
361        self.client_evt.set()
362        self.serv_evt.wait()
363        self.output.flush()
364        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
365        self.assertEqual(self.output.getvalue(), mexpect)
366        debugout = smtpd.DEBUGSTREAM.getvalue()
367        sender = re.compile("^sender: <>$", re.MULTILINE)
368        self.assertRegex(debugout, sender)
369
370    def testSendMessage(self):
371        m = email.mime.text.MIMEText('A test message')
372        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
373        smtp.send_message(m, from_addr='John', to_addrs='Sally')
374        # XXX (see comment in testSend)
375        time.sleep(0.01)
376        smtp.quit()
377
378        self.client_evt.set()
379        self.serv_evt.wait()
380        self.output.flush()
381        # Add the X-Peer header that DebuggingServer adds
382        m['X-Peer'] = socket.gethostbyname('localhost')
383        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
384        self.assertEqual(self.output.getvalue(), mexpect)
385
386    def testSendMessageWithAddresses(self):
387        m = email.mime.text.MIMEText('A test message')
388        m['From'] = 'foo@bar.com'
389        m['To'] = 'John'
390        m['CC'] = 'Sally, Fred'
391        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
392        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
393        smtp.send_message(m)
394        # XXX (see comment in testSend)
395        time.sleep(0.01)
396        smtp.quit()
397        # make sure the Bcc header is still in the message.
398        self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
399                                    '<warped@silly.walks.com>')
400
401        self.client_evt.set()
402        self.serv_evt.wait()
403        self.output.flush()
404        # Add the X-Peer header that DebuggingServer adds
405        m['X-Peer'] = socket.gethostbyname('localhost')
406        # The Bcc header should not be transmitted.
407        del m['Bcc']
408        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
409        self.assertEqual(self.output.getvalue(), mexpect)
410        debugout = smtpd.DEBUGSTREAM.getvalue()
411        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
412        self.assertRegex(debugout, sender)
413        for addr in ('John', 'Sally', 'Fred', 'root@localhost',
414                     'warped@silly.walks.com'):
415            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
416                                 re.MULTILINE)
417            self.assertRegex(debugout, to_addr)
418
419    def testSendMessageWithSomeAddresses(self):
420        # Make sure nothing breaks if not all of the three 'to' headers exist
421        m = email.mime.text.MIMEText('A test message')
422        m['From'] = 'foo@bar.com'
423        m['To'] = 'John, Dinsdale'
424        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
425        smtp.send_message(m)
426        # XXX (see comment in testSend)
427        time.sleep(0.01)
428        smtp.quit()
429
430        self.client_evt.set()
431        self.serv_evt.wait()
432        self.output.flush()
433        # Add the X-Peer header that DebuggingServer adds
434        m['X-Peer'] = socket.gethostbyname('localhost')
435        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
436        self.assertEqual(self.output.getvalue(), mexpect)
437        debugout = smtpd.DEBUGSTREAM.getvalue()
438        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
439        self.assertRegex(debugout, sender)
440        for addr in ('John', 'Dinsdale'):
441            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
442                                 re.MULTILINE)
443            self.assertRegex(debugout, to_addr)
444
445    def testSendMessageWithSpecifiedAddresses(self):
446        # Make sure addresses specified in call override those in message.
447        m = email.mime.text.MIMEText('A test message')
448        m['From'] = 'foo@bar.com'
449        m['To'] = 'John, Dinsdale'
450        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
451        smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net')
452        # XXX (see comment in testSend)
453        time.sleep(0.01)
454        smtp.quit()
455
456        self.client_evt.set()
457        self.serv_evt.wait()
458        self.output.flush()
459        # Add the X-Peer header that DebuggingServer adds
460        m['X-Peer'] = socket.gethostbyname('localhost')
461        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
462        self.assertEqual(self.output.getvalue(), mexpect)
463        debugout = smtpd.DEBUGSTREAM.getvalue()
464        sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
465        self.assertRegex(debugout, sender)
466        for addr in ('John', 'Dinsdale'):
467            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
468                                 re.MULTILINE)
469            self.assertNotRegex(debugout, to_addr)
470        recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
471        self.assertRegex(debugout, recip)
472
473    def testSendMessageWithMultipleFrom(self):
474        # Sender overrides To
475        m = email.mime.text.MIMEText('A test message')
476        m['From'] = 'Bernard, Bianca'
477        m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
478        m['To'] = 'John, Dinsdale'
479        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
480        smtp.send_message(m)
481        # XXX (see comment in testSend)
482        time.sleep(0.01)
483        smtp.quit()
484
485        self.client_evt.set()
486        self.serv_evt.wait()
487        self.output.flush()
488        # Add the X-Peer header that DebuggingServer adds
489        m['X-Peer'] = socket.gethostbyname('localhost')
490        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
491        self.assertEqual(self.output.getvalue(), mexpect)
492        debugout = smtpd.DEBUGSTREAM.getvalue()
493        sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE)
494        self.assertRegex(debugout, sender)
495        for addr in ('John', 'Dinsdale'):
496            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
497                                 re.MULTILINE)
498            self.assertRegex(debugout, to_addr)
499
500    def testSendMessageResent(self):
501        m = email.mime.text.MIMEText('A test message')
502        m['From'] = 'foo@bar.com'
503        m['To'] = 'John'
504        m['CC'] = 'Sally, Fred'
505        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
506        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
507        m['Resent-From'] = 'holy@grail.net'
508        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
509        m['Resent-Bcc'] = 'doe@losthope.net'
510        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
511        smtp.send_message(m)
512        # XXX (see comment in testSend)
513        time.sleep(0.01)
514        smtp.quit()
515
516        self.client_evt.set()
517        self.serv_evt.wait()
518        self.output.flush()
519        # The Resent-Bcc headers are deleted before serialization.
520        del m['Bcc']
521        del m['Resent-Bcc']
522        # Add the X-Peer header that DebuggingServer adds
523        m['X-Peer'] = socket.gethostbyname('localhost')
524        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
525        self.assertEqual(self.output.getvalue(), mexpect)
526        debugout = smtpd.DEBUGSTREAM.getvalue()
527        sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
528        self.assertRegex(debugout, sender)
529        for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
530            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
531                                 re.MULTILINE)
532            self.assertRegex(debugout, to_addr)
533
534    def testSendMessageMultipleResentRaises(self):
535        m = email.mime.text.MIMEText('A test message')
536        m['From'] = 'foo@bar.com'
537        m['To'] = 'John'
538        m['CC'] = 'Sally, Fred'
539        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
540        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
541        m['Resent-From'] = 'holy@grail.net'
542        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
543        m['Resent-Bcc'] = 'doe@losthope.net'
544        m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
545        m['Resent-To'] = 'holy@grail.net'
546        m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
547        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
548        with self.assertRaises(ValueError):
549            smtp.send_message(m)
550        smtp.close()
551
552class NonConnectingTests(unittest.TestCase):
553
554    def testNotConnected(self):
555        # Test various operations on an unconnected SMTP object that
556        # should raise exceptions (at present the attempt in SMTP.send
557        # to reference the nonexistent 'sock' attribute of the SMTP object
558        # causes an AttributeError)
559        smtp = smtplib.SMTP()
560        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
561        self.assertRaises(smtplib.SMTPServerDisconnected,
562                          smtp.send, 'test msg')
563
564    def testNonnumericPort(self):
565        # check that non-numeric port raises OSError
566        self.assertRaises(OSError, smtplib.SMTP,
567                          "localhost", "bogus")
568        self.assertRaises(OSError, smtplib.SMTP,
569                          "localhost:bogus")
570
571
572# test response of client to a non-successful HELO message
573@unittest.skipUnless(threading, 'Threading required for this test.')
574class BadHELOServerTests(unittest.TestCase):
575
576    def setUp(self):
577        smtplib.socket = mock_socket
578        mock_socket.reply_with(b"199 no hello for you!")
579        self.old_stdout = sys.stdout
580        self.output = io.StringIO()
581        sys.stdout = self.output
582        self.port = 25
583
584    def tearDown(self):
585        smtplib.socket = socket
586        sys.stdout = self.old_stdout
587
588    def testFailingHELO(self):
589        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
590                            HOST, self.port, 'localhost', 3)
591
592
593@unittest.skipUnless(threading, 'Threading required for this test.')
594class TooLongLineTests(unittest.TestCase):
595    respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
596
597    def setUp(self):
598        self.old_stdout = sys.stdout
599        self.output = io.StringIO()
600        sys.stdout = self.output
601
602        self.evt = threading.Event()
603        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
604        self.sock.settimeout(15)
605        self.port = support.bind_port(self.sock)
606        servargs = (self.evt, self.respdata, self.sock)
607        threading.Thread(target=server, args=servargs).start()
608        self.evt.wait()
609        self.evt.clear()
610
611    def tearDown(self):
612        self.evt.wait()
613        sys.stdout = self.old_stdout
614
615    def testLineTooLong(self):
616        self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
617                          HOST, self.port, 'localhost', 3)
618
619
620sim_users = {'Mr.A@somewhere.com':'John A',
621             'Ms.B@xn--fo-fka.com':'Sally B',
622             'Mrs.C@somewhereesle.com':'Ruth C',
623            }
624
625sim_auth = ('Mr.A@somewhere.com', 'somepassword')
626sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
627                          'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
628sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
629             'list-2':['Ms.B@xn--fo-fka.com',],
630            }
631
632# Simulated SMTP channel & server
633class ResponseException(Exception): pass
634class SimSMTPChannel(smtpd.SMTPChannel):
635
636    quit_response = None
637    mail_response = None
638    rcpt_response = None
639    data_response = None
640    rcpt_count = 0
641    rset_count = 0
642    disconnect = 0
643    AUTH = 99    # Add protocol state to enable auth testing.
644    authenticated_user = None
645
646    def __init__(self, extra_features, *args, **kw):
647        self._extrafeatures = ''.join(
648            [ "250-{0}\r\n".format(x) for x in extra_features ])
649        super(SimSMTPChannel, self).__init__(*args, **kw)
650
651    # AUTH related stuff.  It would be nice if support for this were in smtpd.
652    def found_terminator(self):
653        if self.smtp_state == self.AUTH:
654            line = self._emptystring.join(self.received_lines)
655            print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
656            self.received_lines = []
657            try:
658                self.auth_object(line)
659            except ResponseException as e:
660                self.smtp_state = self.COMMAND
661                self.push('%s %s' % (e.smtp_code, e.smtp_error))
662                return
663        super().found_terminator()
664
665
666    def smtp_AUTH(self, arg):
667        if not self.seen_greeting:
668            self.push('503 Error: send EHLO first')
669            return
670        if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
671            self.push('500 Error: command "AUTH" not recognized')
672            return
673        if self.authenticated_user is not None:
674            self.push(
675                '503 Bad sequence of commands: already authenticated')
676            return
677        args = arg.split()
678        if len(args) not in [1, 2]:
679            self.push('501 Syntax: AUTH <mechanism> [initial-response]')
680            return
681        auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
682        try:
683            self.auth_object = getattr(self, auth_object_name)
684        except AttributeError:
685            self.push('504 Command parameter not implemented: unsupported '
686                      ' authentication mechanism {!r}'.format(auth_object_name))
687            return
688        self.smtp_state = self.AUTH
689        self.auth_object(args[1] if len(args) == 2 else None)
690
691    def _authenticated(self, user, valid):
692        if valid:
693            self.authenticated_user = user
694            self.push('235 Authentication Succeeded')
695        else:
696            self.push('535 Authentication credentials invalid')
697        self.smtp_state = self.COMMAND
698
699    def _decode_base64(self, string):
700        return base64.decodebytes(string.encode('ascii')).decode('utf-8')
701
702    def _auth_plain(self, arg=None):
703        if arg is None:
704            self.push('334 ')
705        else:
706            logpass = self._decode_base64(arg)
707            try:
708                *_, user, password = logpass.split('\0')
709            except ValueError as e:
710                self.push('535 Splitting response {!r} into user and password'
711                          ' failed: {}'.format(logpass, e))
712                return
713            self._authenticated(user, password == sim_auth[1])
714
715    def _auth_login(self, arg=None):
716        if arg is None:
717            # base64 encoded 'Username:'
718            self.push('334 VXNlcm5hbWU6')
719        elif not hasattr(self, '_auth_login_user'):
720            self._auth_login_user = self._decode_base64(arg)
721            # base64 encoded 'Password:'
722            self.push('334 UGFzc3dvcmQ6')
723        else:
724            password = self._decode_base64(arg)
725            self._authenticated(self._auth_login_user, password == sim_auth[1])
726            del self._auth_login_user
727
728    def _auth_cram_md5(self, arg=None):
729        if arg is None:
730            self.push('334 {}'.format(sim_cram_md5_challenge))
731        else:
732            logpass = self._decode_base64(arg)
733            try:
734                user, hashed_pass = logpass.split()
735            except ValueError as e:
736                self.push('535 Splitting response {!r} into user and password'
737                          'failed: {}'.format(logpass, e))
738                return False
739            valid_hashed_pass = hmac.HMAC(
740                sim_auth[1].encode('ascii'),
741                self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
742                'md5').hexdigest()
743            self._authenticated(user, hashed_pass == valid_hashed_pass)
744    # end AUTH related stuff.
745
746    def smtp_EHLO(self, arg):
747        resp = ('250-testhost\r\n'
748                '250-EXPN\r\n'
749                '250-SIZE 20000000\r\n'
750                '250-STARTTLS\r\n'
751                '250-DELIVERBY\r\n')
752        resp = resp + self._extrafeatures + '250 HELP'
753        self.push(resp)
754        self.seen_greeting = arg
755        self.extended_smtp = True
756
757    def smtp_VRFY(self, arg):
758        # For max compatibility smtplib should be sending the raw address.
759        if arg in sim_users:
760            self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
761        else:
762            self.push('550 No such user: %s' % arg)
763
764    def smtp_EXPN(self, arg):
765        list_name = arg.lower()
766        if list_name in sim_lists:
767            user_list = sim_lists[list_name]
768            for n, user_email in enumerate(user_list):
769                quoted_addr = smtplib.quoteaddr(user_email)
770                if n < len(user_list) - 1:
771                    self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
772                else:
773                    self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
774        else:
775            self.push('550 No access for you!')
776
777    def smtp_QUIT(self, arg):
778        if self.quit_response is None:
779            super(SimSMTPChannel, self).smtp_QUIT(arg)
780        else:
781            self.push(self.quit_response)
782            self.close_when_done()
783
784    def smtp_MAIL(self, arg):
785        if self.mail_response is None:
786            super().smtp_MAIL(arg)
787        else:
788            self.push(self.mail_response)
789            if self.disconnect:
790                self.close_when_done()
791
792    def smtp_RCPT(self, arg):
793        if self.rcpt_response is None:
794            super().smtp_RCPT(arg)
795            return
796        self.rcpt_count += 1
797        self.push(self.rcpt_response[self.rcpt_count-1])
798
799    def smtp_RSET(self, arg):
800        self.rset_count += 1
801        super().smtp_RSET(arg)
802
803    def smtp_DATA(self, arg):
804        if self.data_response is None:
805            super().smtp_DATA(arg)
806        else:
807            self.push(self.data_response)
808
809    def handle_error(self):
810        raise
811
812
813class SimSMTPServer(smtpd.SMTPServer):
814
815    channel_class = SimSMTPChannel
816
817    def __init__(self, *args, **kw):
818        self._extra_features = []
819        smtpd.SMTPServer.__init__(self, *args, **kw)
820
821    def handle_accepted(self, conn, addr):
822        self._SMTPchannel = self.channel_class(
823            self._extra_features, self, conn, addr,
824            decode_data=self._decode_data)
825
826    def process_message(self, peer, mailfrom, rcpttos, data):
827        pass
828
829    def add_feature(self, feature):
830        self._extra_features.append(feature)
831
832    def handle_error(self):
833        raise
834
835
836# Test various SMTP & ESMTP commands/behaviors that require a simulated server
837# (i.e., something with more features than DebuggingServer)
838@unittest.skipUnless(threading, 'Threading required for this test.')
839class SMTPSimTests(unittest.TestCase):
840
841    def setUp(self):
842        self.real_getfqdn = socket.getfqdn
843        socket.getfqdn = mock_socket.getfqdn
844        self.serv_evt = threading.Event()
845        self.client_evt = threading.Event()
846        # Pick a random unused port by passing 0 for the port number
847        self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
848        # Keep a note of what port was assigned
849        self.port = self.serv.socket.getsockname()[1]
850        serv_args = (self.serv, self.serv_evt, self.client_evt)
851        self.thread = threading.Thread(target=debugging_server, args=serv_args)
852        self.thread.start()
853
854        # wait until server thread has assigned a port number
855        self.serv_evt.wait()
856        self.serv_evt.clear()
857
858    def tearDown(self):
859        socket.getfqdn = self.real_getfqdn
860        # indicate that the client is finished
861        self.client_evt.set()
862        # wait for the server thread to terminate
863        self.serv_evt.wait()
864        self.thread.join()
865
866    def testBasic(self):
867        # smoke test
868        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
869        smtp.quit()
870
871    def testEHLO(self):
872        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
873
874        # no features should be present before the EHLO
875        self.assertEqual(smtp.esmtp_features, {})
876
877        # features expected from the test server
878        expected_features = {'expn':'',
879                             'size': '20000000',
880                             'starttls': '',
881                             'deliverby': '',
882                             'help': '',
883                             }
884
885        smtp.ehlo()
886        self.assertEqual(smtp.esmtp_features, expected_features)
887        for k in expected_features:
888            self.assertTrue(smtp.has_extn(k))
889        self.assertFalse(smtp.has_extn('unsupported-feature'))
890        smtp.quit()
891
892    def testVRFY(self):
893        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
894
895        for addr_spec, name in sim_users.items():
896            expected_known = (250, bytes('%s %s' %
897                                         (name, smtplib.quoteaddr(addr_spec)),
898                                         "ascii"))
899            self.assertEqual(smtp.vrfy(addr_spec), expected_known)
900
901        u = 'nobody@nowhere.com'
902        expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
903        self.assertEqual(smtp.vrfy(u), expected_unknown)
904        smtp.quit()
905
906    def testEXPN(self):
907        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
908
909        for listname, members in sim_lists.items():
910            users = []
911            for m in members:
912                users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
913            expected_known = (250, bytes('\n'.join(users), "ascii"))
914            self.assertEqual(smtp.expn(listname), expected_known)
915
916        u = 'PSU-Members-List'
917        expected_unknown = (550, b'No access for you!')
918        self.assertEqual(smtp.expn(u), expected_unknown)
919        smtp.quit()
920
921    def testAUTH_PLAIN(self):
922        self.serv.add_feature("AUTH PLAIN")
923        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
924        resp = smtp.login(sim_auth[0], sim_auth[1])
925        self.assertEqual(resp, (235, b'Authentication Succeeded'))
926        smtp.close()
927
928    def testAUTH_LOGIN(self):
929        self.serv.add_feature("AUTH LOGIN")
930        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
931        resp = smtp.login(sim_auth[0], sim_auth[1])
932        self.assertEqual(resp, (235, b'Authentication Succeeded'))
933        smtp.close()
934
935    def testAUTH_CRAM_MD5(self):
936        self.serv.add_feature("AUTH CRAM-MD5")
937        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
938        resp = smtp.login(sim_auth[0], sim_auth[1])
939        self.assertEqual(resp, (235, b'Authentication Succeeded'))
940        smtp.close()
941
942    def testAUTH_multiple(self):
943        # Test that multiple authentication methods are tried.
944        self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
945        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
946        resp = smtp.login(sim_auth[0], sim_auth[1])
947        self.assertEqual(resp, (235, b'Authentication Succeeded'))
948        smtp.close()
949
950    def test_auth_function(self):
951        supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'}
952        for mechanism in supported:
953            self.serv.add_feature("AUTH {}".format(mechanism))
954        for mechanism in supported:
955            with self.subTest(mechanism=mechanism):
956                smtp = smtplib.SMTP(HOST, self.port,
957                                    local_hostname='localhost', timeout=15)
958                smtp.ehlo('foo')
959                smtp.user, smtp.password = sim_auth[0], sim_auth[1]
960                method = 'auth_' + mechanism.lower().replace('-', '_')
961                resp = smtp.auth(mechanism, getattr(smtp, method))
962                self.assertEqual(resp, (235, b'Authentication Succeeded'))
963                smtp.close()
964
965    def test_quit_resets_greeting(self):
966        smtp = smtplib.SMTP(HOST, self.port,
967                            local_hostname='localhost',
968                            timeout=15)
969        code, message = smtp.ehlo()
970        self.assertEqual(code, 250)
971        self.assertIn('size', smtp.esmtp_features)
972        smtp.quit()
973        self.assertNotIn('size', smtp.esmtp_features)
974        smtp.connect(HOST, self.port)
975        self.assertNotIn('size', smtp.esmtp_features)
976        smtp.ehlo_or_helo_if_needed()
977        self.assertIn('size', smtp.esmtp_features)
978        smtp.quit()
979
980    def test_with_statement(self):
981        with smtplib.SMTP(HOST, self.port) as smtp:
982            code, message = smtp.noop()
983            self.assertEqual(code, 250)
984        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
985        with smtplib.SMTP(HOST, self.port) as smtp:
986            smtp.close()
987        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
988
989    def test_with_statement_QUIT_failure(self):
990        with self.assertRaises(smtplib.SMTPResponseException) as error:
991            with smtplib.SMTP(HOST, self.port) as smtp:
992                smtp.noop()
993                self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
994        self.assertEqual(error.exception.smtp_code, 421)
995        self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
996
997    #TODO: add tests for correct AUTH method fallback now that the
998    #test infrastructure can support it.
999
1000    # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
1001    def test__rest_from_mail_cmd(self):
1002        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1003        smtp.noop()
1004        self.serv._SMTPchannel.mail_response = '451 Requested action aborted'
1005        self.serv._SMTPchannel.disconnect = True
1006        with self.assertRaises(smtplib.SMTPSenderRefused):
1007            smtp.sendmail('John', 'Sally', 'test message')
1008        self.assertIsNone(smtp.sock)
1009
1010    # Issue 5713: make sure close, not rset, is called if we get a 421 error
1011    def test_421_from_mail_cmd(self):
1012        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1013        smtp.noop()
1014        self.serv._SMTPchannel.mail_response = '421 closing connection'
1015        with self.assertRaises(smtplib.SMTPSenderRefused):
1016            smtp.sendmail('John', 'Sally', 'test message')
1017        self.assertIsNone(smtp.sock)
1018        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1019
1020    def test_421_from_rcpt_cmd(self):
1021        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1022        smtp.noop()
1023        self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing']
1024        with self.assertRaises(smtplib.SMTPRecipientsRefused) as r:
1025            smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message')
1026        self.assertIsNone(smtp.sock)
1027        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1028        self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')})
1029
1030    def test_421_from_data_cmd(self):
1031        class MySimSMTPChannel(SimSMTPChannel):
1032            def found_terminator(self):
1033                if self.smtp_state == self.DATA:
1034                    self.push('421 closing')
1035                else:
1036                    super().found_terminator()
1037        self.serv.channel_class = MySimSMTPChannel
1038        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1039        smtp.noop()
1040        with self.assertRaises(smtplib.SMTPDataError):
1041            smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message')
1042        self.assertIsNone(smtp.sock)
1043        self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
1044
1045    def test_smtputf8_NotSupportedError_if_no_server_support(self):
1046        smtp = smtplib.SMTP(
1047            HOST, self.port, local_hostname='localhost', timeout=3)
1048        self.addCleanup(smtp.close)
1049        smtp.ehlo()
1050        self.assertTrue(smtp.does_esmtp)
1051        self.assertFalse(smtp.has_extn('smtputf8'))
1052        self.assertRaises(
1053            smtplib.SMTPNotSupportedError,
1054            smtp.sendmail,
1055            'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1056        self.assertRaises(
1057            smtplib.SMTPNotSupportedError,
1058            smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
1059
1060    def test_send_unicode_without_SMTPUTF8(self):
1061        smtp = smtplib.SMTP(
1062            HOST, self.port, local_hostname='localhost', timeout=3)
1063        self.addCleanup(smtp.close)
1064        self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
1065        self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
1066
1067
1068class SimSMTPUTF8Server(SimSMTPServer):
1069
1070    def __init__(self, *args, **kw):
1071        # The base SMTP server turns these on automatically, but our test
1072        # server is set up to munge the EHLO response, so we need to provide
1073        # them as well.  And yes, the call is to SMTPServer not SimSMTPServer.
1074        self._extra_features = ['SMTPUTF8', '8BITMIME']
1075        smtpd.SMTPServer.__init__(self, *args, **kw)
1076
1077    def handle_accepted(self, conn, addr):
1078        self._SMTPchannel = self.channel_class(
1079            self._extra_features, self, conn, addr,
1080            decode_data=self._decode_data,
1081            enable_SMTPUTF8=self.enable_SMTPUTF8,
1082        )
1083
1084    def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
1085                                                             rcpt_options=None):
1086        self.last_peer = peer
1087        self.last_mailfrom = mailfrom
1088        self.last_rcpttos = rcpttos
1089        self.last_message = data
1090        self.last_mail_options = mail_options
1091        self.last_rcpt_options = rcpt_options
1092
1093
1094@unittest.skipUnless(threading, 'Threading required for this test.')
1095class SMTPUTF8SimTests(unittest.TestCase):
1096
1097    maxDiff = None
1098
1099    def setUp(self):
1100        self.real_getfqdn = socket.getfqdn
1101        socket.getfqdn = mock_socket.getfqdn
1102        self.serv_evt = threading.Event()
1103        self.client_evt = threading.Event()
1104        # Pick a random unused port by passing 0 for the port number
1105        self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
1106                                      decode_data=False,
1107                                      enable_SMTPUTF8=True)
1108        # Keep a note of what port was assigned
1109        self.port = self.serv.socket.getsockname()[1]
1110        serv_args = (self.serv, self.serv_evt, self.client_evt)
1111        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1112        self.thread.start()
1113
1114        # wait until server thread has assigned a port number
1115        self.serv_evt.wait()
1116        self.serv_evt.clear()
1117
1118    def tearDown(self):
1119        socket.getfqdn = self.real_getfqdn
1120        # indicate that the client is finished
1121        self.client_evt.set()
1122        # wait for the server thread to terminate
1123        self.serv_evt.wait()
1124        self.thread.join()
1125
1126    def test_test_server_supports_extensions(self):
1127        smtp = smtplib.SMTP(
1128            HOST, self.port, local_hostname='localhost', timeout=3)
1129        self.addCleanup(smtp.close)
1130        smtp.ehlo()
1131        self.assertTrue(smtp.does_esmtp)
1132        self.assertTrue(smtp.has_extn('smtputf8'))
1133
1134    def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
1135        m = '¡a test message containing unicode!'.encode('utf-8')
1136        smtp = smtplib.SMTP(
1137            HOST, self.port, local_hostname='localhost', timeout=3)
1138        self.addCleanup(smtp.close)
1139        smtp.sendmail('Jőhn', 'Sálly', m,
1140                      mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1141        self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
1142        self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
1143        self.assertEqual(self.serv.last_message, m)
1144        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1145        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1146        self.assertEqual(self.serv.last_rcpt_options, [])
1147
1148    def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
1149        m = '¡a test message containing unicode!'.encode('utf-8')
1150        smtp = smtplib.SMTP(
1151            HOST, self.port, local_hostname='localhost', timeout=3)
1152        self.addCleanup(smtp.close)
1153        smtp.ehlo()
1154        self.assertEqual(
1155            smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']),
1156            (250, b'OK'))
1157        self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
1158        self.assertEqual(smtp.data(m), (250, b'OK'))
1159        self.assertEqual(self.serv.last_mailfrom, 'Jő')
1160        self.assertEqual(self.serv.last_rcpttos, ['János'])
1161        self.assertEqual(self.serv.last_message, m)
1162        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1163        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1164        self.assertEqual(self.serv.last_rcpt_options, [])
1165
1166    def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
1167        msg = EmailMessage()
1168        msg['From'] = "Páolo <főo@bar.com>"
1169        msg['To'] = 'Dinsdale'
1170        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1171        # XXX I don't know why I need two \n's here, but this is an existing
1172        # bug (if it is one) and not a problem with the new functionality.
1173        msg.set_content("oh là là, know what I mean, know what I mean?\n\n")
1174        # XXX smtpd converts received /r/n to /n, so we can't easily test that
1175        # we are successfully sending /r/n :(.
1176        expected = textwrap.dedent("""\
1177            From: Páolo <főo@bar.com>
1178            To: Dinsdale
1179            Subject: Nudge nudge, wink, wink \u1F609
1180            Content-Type: text/plain; charset="utf-8"
1181            Content-Transfer-Encoding: 8bit
1182            MIME-Version: 1.0
1183
1184            oh là là, know what I mean, know what I mean?
1185            """)
1186        smtp = smtplib.SMTP(
1187            HOST, self.port, local_hostname='localhost', timeout=3)
1188        self.addCleanup(smtp.close)
1189        self.assertEqual(smtp.send_message(msg), {})
1190        self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com')
1191        self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
1192        self.assertEqual(self.serv.last_message.decode(), expected)
1193        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1194        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1195        self.assertEqual(self.serv.last_rcpt_options, [])
1196
1197    def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
1198        msg = EmailMessage()
1199        msg['From'] = "Páolo <főo@bar.com>"
1200        msg['To'] = 'Dinsdale'
1201        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1202        smtp = smtplib.SMTP(
1203            HOST, self.port, local_hostname='localhost', timeout=3)
1204        self.addCleanup(smtp.close)
1205        self.assertRaises(smtplib.SMTPNotSupportedError,
1206                          smtp.send_message(msg))
1207
1208
1209EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
1210
1211class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel):
1212    def smtp_AUTH(self, arg):
1213        # RFC 4954's AUTH command allows for an optional initial-response.
1214        # Not all AUTH methods support this; some require a challenge.  AUTH
1215        # PLAIN does those, so test that here.  See issue #15014.
1216        args = arg.split()
1217        if args[0].lower() == 'plain':
1218            if len(args) == 2:
1219                # AUTH PLAIN <initial-response> with the response base 64
1220                # encoded.  Hard code the expected response for the test.
1221                if args[1] == EXPECTED_RESPONSE:
1222                    self.push('235 Ok')
1223                    return
1224        self.push('571 Bad authentication')
1225
1226class SimSMTPAUTHInitialResponseServer(SimSMTPServer):
1227    channel_class = SimSMTPAUTHInitialResponseChannel
1228
1229
1230@unittest.skipUnless(threading, 'Threading required for this test.')
1231class SMTPAUTHInitialResponseSimTests(unittest.TestCase):
1232    def setUp(self):
1233        self.real_getfqdn = socket.getfqdn
1234        socket.getfqdn = mock_socket.getfqdn
1235        self.serv_evt = threading.Event()
1236        self.client_evt = threading.Event()
1237        # Pick a random unused port by passing 0 for the port number
1238        self.serv = SimSMTPAUTHInitialResponseServer(
1239            (HOST, 0), ('nowhere', -1), decode_data=True)
1240        # Keep a note of what port was assigned
1241        self.port = self.serv.socket.getsockname()[1]
1242        serv_args = (self.serv, self.serv_evt, self.client_evt)
1243        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1244        self.thread.start()
1245
1246        # wait until server thread has assigned a port number
1247        self.serv_evt.wait()
1248        self.serv_evt.clear()
1249
1250    def tearDown(self):
1251        socket.getfqdn = self.real_getfqdn
1252        # indicate that the client is finished
1253        self.client_evt.set()
1254        # wait for the server thread to terminate
1255        self.serv_evt.wait()
1256        self.thread.join()
1257
1258    def testAUTH_PLAIN_initial_response_login(self):
1259        self.serv.add_feature('AUTH PLAIN')
1260        smtp = smtplib.SMTP(HOST, self.port,
1261                            local_hostname='localhost', timeout=15)
1262        smtp.login('psu', 'doesnotexist')
1263        smtp.close()
1264
1265    def testAUTH_PLAIN_initial_response_auth(self):
1266        self.serv.add_feature('AUTH PLAIN')
1267        smtp = smtplib.SMTP(HOST, self.port,
1268                            local_hostname='localhost', timeout=15)
1269        smtp.user = 'psu'
1270        smtp.password = 'doesnotexist'
1271        code, response = smtp.auth('plain', smtp.auth_plain)
1272        smtp.close()
1273        self.assertEqual(code, 235)
1274
1275
1276@support.reap_threads
1277def test_main(verbose=None):
1278    support.run_unittest(
1279        BadHELOServerTests,
1280        DebuggingServerTests,
1281        GeneralTests,
1282        NonConnectingTests,
1283        SMTPAUTHInitialResponseSimTests,
1284        SMTPSimTests,
1285        TooLongLineTests,
1286        )
1287
1288
1289if __name__ == '__main__':
1290    test_main()
1291