1import unittest 2import textwrap 3from test import support, mock_socket 4from test.support import socket_helper 5from test.support import warnings_helper 6import socket 7import io 8 9import warnings 10with warnings.catch_warnings(): 11 warnings.simplefilter('ignore', DeprecationWarning) 12 import smtpd 13 import asyncore 14 15 16class DummyServer(smtpd.SMTPServer): 17 def __init__(self, *args, **kwargs): 18 smtpd.SMTPServer.__init__(self, *args, **kwargs) 19 self.messages = [] 20 if self._decode_data: 21 self.return_status = 'return status' 22 else: 23 self.return_status = b'return status' 24 25 def process_message(self, peer, mailfrom, rcpttos, data, **kw): 26 self.messages.append((peer, mailfrom, rcpttos, data)) 27 if data == self.return_status: 28 return '250 Okish' 29 if 'mail_options' in kw and 'SMTPUTF8' in kw['mail_options']: 30 return '250 SMTPUTF8 message okish' 31 32 33class DummyDispatcherBroken(Exception): 34 pass 35 36 37class BrokenDummyServer(DummyServer): 38 def listen(self, num): 39 raise DummyDispatcherBroken() 40 41 42class SMTPDServerTest(unittest.TestCase): 43 def setUp(self): 44 smtpd.socket = asyncore.socket = mock_socket 45 46 def test_process_message_unimplemented(self): 47 server = smtpd.SMTPServer((socket_helper.HOST, 0), ('b', 0), 48 decode_data=True) 49 conn, addr = server.accept() 50 channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) 51 52 def write_line(line): 53 channel.socket.queue_recv(line) 54 channel.handle_read() 55 56 write_line(b'HELO example') 57 write_line(b'MAIL From:eggs@example') 58 write_line(b'RCPT To:spam@example') 59 write_line(b'DATA') 60 self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n') 61 62 def test_decode_data_and_enable_SMTPUTF8_raises(self): 63 self.assertRaises( 64 ValueError, 65 smtpd.SMTPServer, 66 (socket_helper.HOST, 0), 67 ('b', 0), 68 enable_SMTPUTF8=True, 69 decode_data=True) 70 71 def tearDown(self): 72 asyncore.close_all() 73 asyncore.socket = smtpd.socket = socket 74 75 76class DebuggingServerTest(unittest.TestCase): 77 78 def setUp(self): 79 smtpd.socket = asyncore.socket = mock_socket 80 81 def send_data(self, channel, data, enable_SMTPUTF8=False): 82 def write_line(line): 83 channel.socket.queue_recv(line) 84 channel.handle_read() 85 write_line(b'EHLO example') 86 if enable_SMTPUTF8: 87 write_line(b'MAIL From:eggs@example BODY=8BITMIME SMTPUTF8') 88 else: 89 write_line(b'MAIL From:eggs@example') 90 write_line(b'RCPT To:spam@example') 91 write_line(b'DATA') 92 write_line(data) 93 write_line(b'.') 94 95 def test_process_message_with_decode_data_true(self): 96 server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0), 97 decode_data=True) 98 conn, addr = server.accept() 99 channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) 100 with support.captured_stdout() as s: 101 self.send_data(channel, b'From: test\n\nhello\n') 102 stdout = s.getvalue() 103 self.assertEqual(stdout, textwrap.dedent("""\ 104 ---------- MESSAGE FOLLOWS ---------- 105 From: test 106 X-Peer: peer-address 107 108 hello 109 ------------ END MESSAGE ------------ 110 """)) 111 112 def test_process_message_with_decode_data_false(self): 113 server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0)) 114 conn, addr = server.accept() 115 channel = smtpd.SMTPChannel(server, conn, addr) 116 with support.captured_stdout() as s: 117 self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n') 118 stdout = s.getvalue() 119 self.assertEqual(stdout, textwrap.dedent("""\ 120 ---------- MESSAGE FOLLOWS ---------- 121 b'From: test' 122 b'X-Peer: peer-address' 123 b'' 124 b'h\\xc3\\xa9llo\\xff' 125 ------------ END MESSAGE ------------ 126 """)) 127 128 def test_process_message_with_enable_SMTPUTF8_true(self): 129 server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0), 130 enable_SMTPUTF8=True) 131 conn, addr = server.accept() 132 channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) 133 with support.captured_stdout() as s: 134 self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n') 135 stdout = s.getvalue() 136 self.assertEqual(stdout, textwrap.dedent("""\ 137 ---------- MESSAGE FOLLOWS ---------- 138 b'From: test' 139 b'X-Peer: peer-address' 140 b'' 141 b'h\\xc3\\xa9llo\\xff' 142 ------------ END MESSAGE ------------ 143 """)) 144 145 def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self): 146 server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0), 147 enable_SMTPUTF8=True) 148 conn, addr = server.accept() 149 channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) 150 with support.captured_stdout() as s: 151 self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n', 152 enable_SMTPUTF8=True) 153 stdout = s.getvalue() 154 self.assertEqual(stdout, textwrap.dedent("""\ 155 ---------- MESSAGE FOLLOWS ---------- 156 mail options: ['BODY=8BITMIME', 'SMTPUTF8'] 157 b'From: test' 158 b'X-Peer: peer-address' 159 b'' 160 b'h\\xc3\\xa9llo\\xff' 161 ------------ END MESSAGE ------------ 162 """)) 163 164 def tearDown(self): 165 asyncore.close_all() 166 asyncore.socket = smtpd.socket = socket 167 168 169class TestFamilyDetection(unittest.TestCase): 170 def setUp(self): 171 smtpd.socket = asyncore.socket = mock_socket 172 173 def tearDown(self): 174 asyncore.close_all() 175 asyncore.socket = smtpd.socket = socket 176 177 @unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled") 178 def test_socket_uses_IPv6(self): 179 server = smtpd.SMTPServer((socket_helper.HOSTv6, 0), (socket_helper.HOSTv4, 0)) 180 self.assertEqual(server.socket.family, socket.AF_INET6) 181 182 def test_socket_uses_IPv4(self): 183 server = smtpd.SMTPServer((socket_helper.HOSTv4, 0), (socket_helper.HOSTv6, 0)) 184 self.assertEqual(server.socket.family, socket.AF_INET) 185 186 187class TestRcptOptionParsing(unittest.TestCase): 188 error_response = (b'555 RCPT TO parameters not recognized or not ' 189 b'implemented\r\n') 190 191 def setUp(self): 192 smtpd.socket = asyncore.socket = mock_socket 193 self.old_debugstream = smtpd.DEBUGSTREAM 194 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 195 196 def tearDown(self): 197 asyncore.close_all() 198 asyncore.socket = smtpd.socket = socket 199 smtpd.DEBUGSTREAM = self.old_debugstream 200 201 def write_line(self, channel, line): 202 channel.socket.queue_recv(line) 203 channel.handle_read() 204 205 def test_params_rejected(self): 206 server = DummyServer((socket_helper.HOST, 0), ('b', 0)) 207 conn, addr = server.accept() 208 channel = smtpd.SMTPChannel(server, conn, addr) 209 self.write_line(channel, b'EHLO example') 210 self.write_line(channel, b'MAIL from: <foo@example.com> size=20') 211 self.write_line(channel, b'RCPT to: <foo@example.com> foo=bar') 212 self.assertEqual(channel.socket.last, self.error_response) 213 214 def test_nothing_accepted(self): 215 server = DummyServer((socket_helper.HOST, 0), ('b', 0)) 216 conn, addr = server.accept() 217 channel = smtpd.SMTPChannel(server, conn, addr) 218 self.write_line(channel, b'EHLO example') 219 self.write_line(channel, b'MAIL from: <foo@example.com> size=20') 220 self.write_line(channel, b'RCPT to: <foo@example.com>') 221 self.assertEqual(channel.socket.last, b'250 OK\r\n') 222 223 224class TestMailOptionParsing(unittest.TestCase): 225 error_response = (b'555 MAIL FROM parameters not recognized or not ' 226 b'implemented\r\n') 227 228 def setUp(self): 229 smtpd.socket = asyncore.socket = mock_socket 230 self.old_debugstream = smtpd.DEBUGSTREAM 231 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 232 233 def tearDown(self): 234 asyncore.close_all() 235 asyncore.socket = smtpd.socket = socket 236 smtpd.DEBUGSTREAM = self.old_debugstream 237 238 def write_line(self, channel, line): 239 channel.socket.queue_recv(line) 240 channel.handle_read() 241 242 def test_with_decode_data_true(self): 243 server = DummyServer((socket_helper.HOST, 0), ('b', 0), decode_data=True) 244 conn, addr = server.accept() 245 channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) 246 self.write_line(channel, b'EHLO example') 247 for line in [ 248 b'MAIL from: <foo@example.com> size=20 SMTPUTF8', 249 b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME', 250 b'MAIL from: <foo@example.com> size=20 BODY=UNKNOWN', 251 b'MAIL from: <foo@example.com> size=20 body=8bitmime', 252 ]: 253 self.write_line(channel, line) 254 self.assertEqual(channel.socket.last, self.error_response) 255 self.write_line(channel, b'MAIL from: <foo@example.com> size=20') 256 self.assertEqual(channel.socket.last, b'250 OK\r\n') 257 258 def test_with_decode_data_false(self): 259 server = DummyServer((socket_helper.HOST, 0), ('b', 0)) 260 conn, addr = server.accept() 261 channel = smtpd.SMTPChannel(server, conn, addr) 262 self.write_line(channel, b'EHLO example') 263 for line in [ 264 b'MAIL from: <foo@example.com> size=20 SMTPUTF8', 265 b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME', 266 ]: 267 self.write_line(channel, line) 268 self.assertEqual(channel.socket.last, self.error_response) 269 self.write_line( 270 channel, 271 b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=UNKNOWN') 272 self.assertEqual( 273 channel.socket.last, 274 b'501 Error: BODY can only be one of 7BIT, 8BITMIME\r\n') 275 self.write_line( 276 channel, b'MAIL from: <foo@example.com> size=20 body=8bitmime') 277 self.assertEqual(channel.socket.last, b'250 OK\r\n') 278 279 def test_with_enable_smtputf8_true(self): 280 server = DummyServer((socket_helper.HOST, 0), ('b', 0), enable_SMTPUTF8=True) 281 conn, addr = server.accept() 282 channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) 283 self.write_line(channel, b'EHLO example') 284 self.write_line( 285 channel, 286 b'MAIL from: <foo@example.com> size=20 body=8bitmime smtputf8') 287 self.assertEqual(channel.socket.last, b'250 OK\r\n') 288 289 290class SMTPDChannelTest(unittest.TestCase): 291 def setUp(self): 292 smtpd.socket = asyncore.socket = mock_socket 293 self.old_debugstream = smtpd.DEBUGSTREAM 294 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 295 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), 296 decode_data=True) 297 conn, addr = self.server.accept() 298 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 299 decode_data=True) 300 301 def tearDown(self): 302 asyncore.close_all() 303 asyncore.socket = smtpd.socket = socket 304 smtpd.DEBUGSTREAM = self.old_debugstream 305 306 def write_line(self, line): 307 self.channel.socket.queue_recv(line) 308 self.channel.handle_read() 309 310 def test_broken_connect(self): 311 self.assertRaises( 312 DummyDispatcherBroken, BrokenDummyServer, 313 (socket_helper.HOST, 0), ('b', 0), decode_data=True) 314 315 def test_decode_data_and_enable_SMTPUTF8_raises(self): 316 self.assertRaises( 317 ValueError, smtpd.SMTPChannel, 318 self.server, self.channel.conn, self.channel.addr, 319 enable_SMTPUTF8=True, decode_data=True) 320 321 def test_server_accept(self): 322 self.server.handle_accept() 323 324 def test_missing_data(self): 325 self.write_line(b'') 326 self.assertEqual(self.channel.socket.last, 327 b'500 Error: bad syntax\r\n') 328 329 def test_EHLO(self): 330 self.write_line(b'EHLO example') 331 self.assertEqual(self.channel.socket.last, b'250 HELP\r\n') 332 333 def test_EHLO_bad_syntax(self): 334 self.write_line(b'EHLO') 335 self.assertEqual(self.channel.socket.last, 336 b'501 Syntax: EHLO hostname\r\n') 337 338 def test_EHLO_duplicate(self): 339 self.write_line(b'EHLO example') 340 self.write_line(b'EHLO example') 341 self.assertEqual(self.channel.socket.last, 342 b'503 Duplicate HELO/EHLO\r\n') 343 344 def test_EHLO_HELO_duplicate(self): 345 self.write_line(b'EHLO example') 346 self.write_line(b'HELO example') 347 self.assertEqual(self.channel.socket.last, 348 b'503 Duplicate HELO/EHLO\r\n') 349 350 def test_HELO(self): 351 name = smtpd.socket.getfqdn() 352 self.write_line(b'HELO example') 353 self.assertEqual(self.channel.socket.last, 354 '250 {}\r\n'.format(name).encode('ascii')) 355 356 def test_HELO_EHLO_duplicate(self): 357 self.write_line(b'HELO example') 358 self.write_line(b'EHLO example') 359 self.assertEqual(self.channel.socket.last, 360 b'503 Duplicate HELO/EHLO\r\n') 361 362 def test_HELP(self): 363 self.write_line(b'HELP') 364 self.assertEqual(self.channel.socket.last, 365 b'250 Supported commands: EHLO HELO MAIL RCPT ' + \ 366 b'DATA RSET NOOP QUIT VRFY\r\n') 367 368 def test_HELP_command(self): 369 self.write_line(b'HELP MAIL') 370 self.assertEqual(self.channel.socket.last, 371 b'250 Syntax: MAIL FROM: <address>\r\n') 372 373 def test_HELP_command_unknown(self): 374 self.write_line(b'HELP SPAM') 375 self.assertEqual(self.channel.socket.last, 376 b'501 Supported commands: EHLO HELO MAIL RCPT ' + \ 377 b'DATA RSET NOOP QUIT VRFY\r\n') 378 379 def test_HELO_bad_syntax(self): 380 self.write_line(b'HELO') 381 self.assertEqual(self.channel.socket.last, 382 b'501 Syntax: HELO hostname\r\n') 383 384 def test_HELO_duplicate(self): 385 self.write_line(b'HELO example') 386 self.write_line(b'HELO example') 387 self.assertEqual(self.channel.socket.last, 388 b'503 Duplicate HELO/EHLO\r\n') 389 390 def test_HELO_parameter_rejected_when_extensions_not_enabled(self): 391 self.extended_smtp = False 392 self.write_line(b'HELO example') 393 self.write_line(b'MAIL from:<foo@example.com> SIZE=1234') 394 self.assertEqual(self.channel.socket.last, 395 b'501 Syntax: MAIL FROM: <address>\r\n') 396 397 def test_MAIL_allows_space_after_colon(self): 398 self.write_line(b'HELO example') 399 self.write_line(b'MAIL from: <foo@example.com>') 400 self.assertEqual(self.channel.socket.last, 401 b'250 OK\r\n') 402 403 def test_extended_MAIL_allows_space_after_colon(self): 404 self.write_line(b'EHLO example') 405 self.write_line(b'MAIL from: <foo@example.com> size=20') 406 self.assertEqual(self.channel.socket.last, 407 b'250 OK\r\n') 408 409 def test_NOOP(self): 410 self.write_line(b'NOOP') 411 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 412 413 def test_HELO_NOOP(self): 414 self.write_line(b'HELO example') 415 self.write_line(b'NOOP') 416 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 417 418 def test_NOOP_bad_syntax(self): 419 self.write_line(b'NOOP hi') 420 self.assertEqual(self.channel.socket.last, 421 b'501 Syntax: NOOP\r\n') 422 423 def test_QUIT(self): 424 self.write_line(b'QUIT') 425 self.assertEqual(self.channel.socket.last, b'221 Bye\r\n') 426 427 def test_HELO_QUIT(self): 428 self.write_line(b'HELO example') 429 self.write_line(b'QUIT') 430 self.assertEqual(self.channel.socket.last, b'221 Bye\r\n') 431 432 def test_QUIT_arg_ignored(self): 433 self.write_line(b'QUIT bye bye') 434 self.assertEqual(self.channel.socket.last, b'221 Bye\r\n') 435 436 def test_bad_state(self): 437 self.channel.smtp_state = 'BAD STATE' 438 self.write_line(b'HELO example') 439 self.assertEqual(self.channel.socket.last, 440 b'451 Internal confusion\r\n') 441 442 def test_command_too_long(self): 443 self.write_line(b'HELO example') 444 self.write_line(b'MAIL from: ' + 445 b'a' * self.channel.command_size_limit + 446 b'@example') 447 self.assertEqual(self.channel.socket.last, 448 b'500 Error: line too long\r\n') 449 450 def test_MAIL_command_limit_extended_with_SIZE(self): 451 self.write_line(b'EHLO example') 452 fill_len = self.channel.command_size_limit - len('MAIL from:<@example>') 453 self.write_line(b'MAIL from:<' + 454 b'a' * fill_len + 455 b'@example> SIZE=1234') 456 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 457 458 self.write_line(b'MAIL from:<' + 459 b'a' * (fill_len + 26) + 460 b'@example> SIZE=1234') 461 self.assertEqual(self.channel.socket.last, 462 b'500 Error: line too long\r\n') 463 464 def test_MAIL_command_rejects_SMTPUTF8_by_default(self): 465 self.write_line(b'EHLO example') 466 self.write_line( 467 b'MAIL from: <naive@example.com> BODY=8BITMIME SMTPUTF8') 468 self.assertEqual(self.channel.socket.last[0:1], b'5') 469 470 def test_data_longer_than_default_data_size_limit(self): 471 # Hack the default so we don't have to generate so much data. 472 self.channel.data_size_limit = 1048 473 self.write_line(b'HELO example') 474 self.write_line(b'MAIL From:eggs@example') 475 self.write_line(b'RCPT To:spam@example') 476 self.write_line(b'DATA') 477 self.write_line(b'A' * self.channel.data_size_limit + 478 b'A\r\n.') 479 self.assertEqual(self.channel.socket.last, 480 b'552 Error: Too much mail data\r\n') 481 482 def test_MAIL_size_parameter(self): 483 self.write_line(b'EHLO example') 484 self.write_line(b'MAIL FROM:<eggs@example> SIZE=512') 485 self.assertEqual(self.channel.socket.last, 486 b'250 OK\r\n') 487 488 def test_MAIL_invalid_size_parameter(self): 489 self.write_line(b'EHLO example') 490 self.write_line(b'MAIL FROM:<eggs@example> SIZE=invalid') 491 self.assertEqual(self.channel.socket.last, 492 b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n') 493 494 def test_MAIL_RCPT_unknown_parameters(self): 495 self.write_line(b'EHLO example') 496 self.write_line(b'MAIL FROM:<eggs@example> ham=green') 497 self.assertEqual(self.channel.socket.last, 498 b'555 MAIL FROM parameters not recognized or not implemented\r\n') 499 500 self.write_line(b'MAIL FROM:<eggs@example>') 501 self.write_line(b'RCPT TO:<eggs@example> ham=green') 502 self.assertEqual(self.channel.socket.last, 503 b'555 RCPT TO parameters not recognized or not implemented\r\n') 504 505 def test_MAIL_size_parameter_larger_than_default_data_size_limit(self): 506 self.channel.data_size_limit = 1048 507 self.write_line(b'EHLO example') 508 self.write_line(b'MAIL FROM:<eggs@example> SIZE=2096') 509 self.assertEqual(self.channel.socket.last, 510 b'552 Error: message size exceeds fixed maximum message size\r\n') 511 512 def test_need_MAIL(self): 513 self.write_line(b'HELO example') 514 self.write_line(b'RCPT to:spam@example') 515 self.assertEqual(self.channel.socket.last, 516 b'503 Error: need MAIL command\r\n') 517 518 def test_MAIL_syntax_HELO(self): 519 self.write_line(b'HELO example') 520 self.write_line(b'MAIL from eggs@example') 521 self.assertEqual(self.channel.socket.last, 522 b'501 Syntax: MAIL FROM: <address>\r\n') 523 524 def test_MAIL_syntax_EHLO(self): 525 self.write_line(b'EHLO example') 526 self.write_line(b'MAIL from eggs@example') 527 self.assertEqual(self.channel.socket.last, 528 b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n') 529 530 def test_MAIL_missing_address(self): 531 self.write_line(b'HELO example') 532 self.write_line(b'MAIL from:') 533 self.assertEqual(self.channel.socket.last, 534 b'501 Syntax: MAIL FROM: <address>\r\n') 535 536 def test_MAIL_chevrons(self): 537 self.write_line(b'HELO example') 538 self.write_line(b'MAIL from:<eggs@example>') 539 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 540 541 def test_MAIL_empty_chevrons(self): 542 self.write_line(b'EHLO example') 543 self.write_line(b'MAIL from:<>') 544 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 545 546 def test_MAIL_quoted_localpart(self): 547 self.write_line(b'EHLO example') 548 self.write_line(b'MAIL from: <"Fred Blogs"@example.com>') 549 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 550 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 551 552 def test_MAIL_quoted_localpart_no_angles(self): 553 self.write_line(b'EHLO example') 554 self.write_line(b'MAIL from: "Fred Blogs"@example.com') 555 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 556 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 557 558 def test_MAIL_quoted_localpart_with_size(self): 559 self.write_line(b'EHLO example') 560 self.write_line(b'MAIL from: <"Fred Blogs"@example.com> SIZE=1000') 561 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 562 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 563 564 def test_MAIL_quoted_localpart_with_size_no_angles(self): 565 self.write_line(b'EHLO example') 566 self.write_line(b'MAIL from: "Fred Blogs"@example.com SIZE=1000') 567 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 568 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 569 570 def test_nested_MAIL(self): 571 self.write_line(b'HELO example') 572 self.write_line(b'MAIL from:eggs@example') 573 self.write_line(b'MAIL from:spam@example') 574 self.assertEqual(self.channel.socket.last, 575 b'503 Error: nested MAIL command\r\n') 576 577 def test_VRFY(self): 578 self.write_line(b'VRFY eggs@example') 579 self.assertEqual(self.channel.socket.last, 580 b'252 Cannot VRFY user, but will accept message and attempt ' + \ 581 b'delivery\r\n') 582 583 def test_VRFY_syntax(self): 584 self.write_line(b'VRFY') 585 self.assertEqual(self.channel.socket.last, 586 b'501 Syntax: VRFY <address>\r\n') 587 588 def test_EXPN_not_implemented(self): 589 self.write_line(b'EXPN') 590 self.assertEqual(self.channel.socket.last, 591 b'502 EXPN not implemented\r\n') 592 593 def test_no_HELO_MAIL(self): 594 self.write_line(b'MAIL from:<foo@example.com>') 595 self.assertEqual(self.channel.socket.last, 596 b'503 Error: send HELO first\r\n') 597 598 def test_need_RCPT(self): 599 self.write_line(b'HELO example') 600 self.write_line(b'MAIL From:eggs@example') 601 self.write_line(b'DATA') 602 self.assertEqual(self.channel.socket.last, 603 b'503 Error: need RCPT command\r\n') 604 605 def test_RCPT_syntax_HELO(self): 606 self.write_line(b'HELO example') 607 self.write_line(b'MAIL From: eggs@example') 608 self.write_line(b'RCPT to eggs@example') 609 self.assertEqual(self.channel.socket.last, 610 b'501 Syntax: RCPT TO: <address>\r\n') 611 612 def test_RCPT_syntax_EHLO(self): 613 self.write_line(b'EHLO example') 614 self.write_line(b'MAIL From: eggs@example') 615 self.write_line(b'RCPT to eggs@example') 616 self.assertEqual(self.channel.socket.last, 617 b'501 Syntax: RCPT TO: <address> [SP <mail-parameters>]\r\n') 618 619 def test_RCPT_lowercase_to_OK(self): 620 self.write_line(b'HELO example') 621 self.write_line(b'MAIL From: eggs@example') 622 self.write_line(b'RCPT to: <eggs@example>') 623 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 624 625 def test_no_HELO_RCPT(self): 626 self.write_line(b'RCPT to eggs@example') 627 self.assertEqual(self.channel.socket.last, 628 b'503 Error: send HELO first\r\n') 629 630 def test_data_dialog(self): 631 self.write_line(b'HELO example') 632 self.write_line(b'MAIL From:eggs@example') 633 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 634 self.write_line(b'RCPT To:spam@example') 635 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 636 637 self.write_line(b'DATA') 638 self.assertEqual(self.channel.socket.last, 639 b'354 End data with <CR><LF>.<CR><LF>\r\n') 640 self.write_line(b'data\r\nmore\r\n.') 641 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 642 self.assertEqual(self.server.messages, 643 [(('peer-address', 'peer-port'), 644 'eggs@example', 645 ['spam@example'], 646 'data\nmore')]) 647 648 def test_DATA_syntax(self): 649 self.write_line(b'HELO example') 650 self.write_line(b'MAIL From:eggs@example') 651 self.write_line(b'RCPT To:spam@example') 652 self.write_line(b'DATA spam') 653 self.assertEqual(self.channel.socket.last, b'501 Syntax: DATA\r\n') 654 655 def test_no_HELO_DATA(self): 656 self.write_line(b'DATA spam') 657 self.assertEqual(self.channel.socket.last, 658 b'503 Error: send HELO first\r\n') 659 660 def test_data_transparency_section_4_5_2(self): 661 self.write_line(b'HELO example') 662 self.write_line(b'MAIL From:eggs@example') 663 self.write_line(b'RCPT To:spam@example') 664 self.write_line(b'DATA') 665 self.write_line(b'..\r\n.\r\n') 666 self.assertEqual(self.channel.received_data, '.') 667 668 def test_multiple_RCPT(self): 669 self.write_line(b'HELO example') 670 self.write_line(b'MAIL From:eggs@example') 671 self.write_line(b'RCPT To:spam@example') 672 self.write_line(b'RCPT To:ham@example') 673 self.write_line(b'DATA') 674 self.write_line(b'data\r\n.') 675 self.assertEqual(self.server.messages, 676 [(('peer-address', 'peer-port'), 677 'eggs@example', 678 ['spam@example','ham@example'], 679 'data')]) 680 681 def test_manual_status(self): 682 # checks that the Channel is able to return a custom status message 683 self.write_line(b'HELO example') 684 self.write_line(b'MAIL From:eggs@example') 685 self.write_line(b'RCPT To:spam@example') 686 self.write_line(b'DATA') 687 self.write_line(b'return status\r\n.') 688 self.assertEqual(self.channel.socket.last, b'250 Okish\r\n') 689 690 def test_RSET(self): 691 self.write_line(b'HELO example') 692 self.write_line(b'MAIL From:eggs@example') 693 self.write_line(b'RCPT To:spam@example') 694 self.write_line(b'RSET') 695 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 696 self.write_line(b'MAIL From:foo@example') 697 self.write_line(b'RCPT To:eggs@example') 698 self.write_line(b'DATA') 699 self.write_line(b'data\r\n.') 700 self.assertEqual(self.server.messages, 701 [(('peer-address', 'peer-port'), 702 'foo@example', 703 ['eggs@example'], 704 'data')]) 705 706 def test_HELO_RSET(self): 707 self.write_line(b'HELO example') 708 self.write_line(b'RSET') 709 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 710 711 def test_RSET_syntax(self): 712 self.write_line(b'RSET hi') 713 self.assertEqual(self.channel.socket.last, b'501 Syntax: RSET\r\n') 714 715 def test_unknown_command(self): 716 self.write_line(b'UNKNOWN_CMD') 717 self.assertEqual(self.channel.socket.last, 718 b'500 Error: command "UNKNOWN_CMD" not ' + \ 719 b'recognized\r\n') 720 721 def test_attribute_deprecations(self): 722 with warnings_helper.check_warnings(('', DeprecationWarning)): 723 spam = self.channel._SMTPChannel__server 724 with warnings_helper.check_warnings(('', DeprecationWarning)): 725 self.channel._SMTPChannel__server = 'spam' 726 with warnings_helper.check_warnings(('', DeprecationWarning)): 727 spam = self.channel._SMTPChannel__line 728 with warnings_helper.check_warnings(('', DeprecationWarning)): 729 self.channel._SMTPChannel__line = 'spam' 730 with warnings_helper.check_warnings(('', DeprecationWarning)): 731 spam = self.channel._SMTPChannel__state 732 with warnings_helper.check_warnings(('', DeprecationWarning)): 733 self.channel._SMTPChannel__state = 'spam' 734 with warnings_helper.check_warnings(('', DeprecationWarning)): 735 spam = self.channel._SMTPChannel__greeting 736 with warnings_helper.check_warnings(('', DeprecationWarning)): 737 self.channel._SMTPChannel__greeting = 'spam' 738 with warnings_helper.check_warnings(('', DeprecationWarning)): 739 spam = self.channel._SMTPChannel__mailfrom 740 with warnings_helper.check_warnings(('', DeprecationWarning)): 741 self.channel._SMTPChannel__mailfrom = 'spam' 742 with warnings_helper.check_warnings(('', DeprecationWarning)): 743 spam = self.channel._SMTPChannel__rcpttos 744 with warnings_helper.check_warnings(('', DeprecationWarning)): 745 self.channel._SMTPChannel__rcpttos = 'spam' 746 with warnings_helper.check_warnings(('', DeprecationWarning)): 747 spam = self.channel._SMTPChannel__data 748 with warnings_helper.check_warnings(('', DeprecationWarning)): 749 self.channel._SMTPChannel__data = 'spam' 750 with warnings_helper.check_warnings(('', DeprecationWarning)): 751 spam = self.channel._SMTPChannel__fqdn 752 with warnings_helper.check_warnings(('', DeprecationWarning)): 753 self.channel._SMTPChannel__fqdn = 'spam' 754 with warnings_helper.check_warnings(('', DeprecationWarning)): 755 spam = self.channel._SMTPChannel__peer 756 with warnings_helper.check_warnings(('', DeprecationWarning)): 757 self.channel._SMTPChannel__peer = 'spam' 758 with warnings_helper.check_warnings(('', DeprecationWarning)): 759 spam = self.channel._SMTPChannel__conn 760 with warnings_helper.check_warnings(('', DeprecationWarning)): 761 self.channel._SMTPChannel__conn = 'spam' 762 with warnings_helper.check_warnings(('', DeprecationWarning)): 763 spam = self.channel._SMTPChannel__addr 764 with warnings_helper.check_warnings(('', DeprecationWarning)): 765 self.channel._SMTPChannel__addr = 'spam' 766 767@unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled") 768class SMTPDChannelIPv6Test(SMTPDChannelTest): 769 def setUp(self): 770 smtpd.socket = asyncore.socket = mock_socket 771 self.old_debugstream = smtpd.DEBUGSTREAM 772 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 773 self.server = DummyServer((socket_helper.HOSTv6, 0), ('b', 0), 774 decode_data=True) 775 conn, addr = self.server.accept() 776 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 777 decode_data=True) 778 779class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase): 780 781 def setUp(self): 782 smtpd.socket = asyncore.socket = mock_socket 783 self.old_debugstream = smtpd.DEBUGSTREAM 784 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 785 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), 786 decode_data=True) 787 conn, addr = self.server.accept() 788 # Set DATA size limit to 32 bytes for easy testing 789 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32, 790 decode_data=True) 791 792 def tearDown(self): 793 asyncore.close_all() 794 asyncore.socket = smtpd.socket = socket 795 smtpd.DEBUGSTREAM = self.old_debugstream 796 797 def write_line(self, line): 798 self.channel.socket.queue_recv(line) 799 self.channel.handle_read() 800 801 def test_data_limit_dialog(self): 802 self.write_line(b'HELO example') 803 self.write_line(b'MAIL From:eggs@example') 804 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 805 self.write_line(b'RCPT To:spam@example') 806 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 807 808 self.write_line(b'DATA') 809 self.assertEqual(self.channel.socket.last, 810 b'354 End data with <CR><LF>.<CR><LF>\r\n') 811 self.write_line(b'data\r\nmore\r\n.') 812 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 813 self.assertEqual(self.server.messages, 814 [(('peer-address', 'peer-port'), 815 'eggs@example', 816 ['spam@example'], 817 'data\nmore')]) 818 819 def test_data_limit_dialog_too_much_data(self): 820 self.write_line(b'HELO example') 821 self.write_line(b'MAIL From:eggs@example') 822 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 823 self.write_line(b'RCPT To:spam@example') 824 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 825 826 self.write_line(b'DATA') 827 self.assertEqual(self.channel.socket.last, 828 b'354 End data with <CR><LF>.<CR><LF>\r\n') 829 self.write_line(b'This message is longer than 32 bytes\r\n.') 830 self.assertEqual(self.channel.socket.last, 831 b'552 Error: Too much mail data\r\n') 832 833 834class SMTPDChannelWithDecodeDataFalse(unittest.TestCase): 835 836 def setUp(self): 837 smtpd.socket = asyncore.socket = mock_socket 838 self.old_debugstream = smtpd.DEBUGSTREAM 839 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 840 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0)) 841 conn, addr = self.server.accept() 842 self.channel = smtpd.SMTPChannel(self.server, conn, addr) 843 844 def tearDown(self): 845 asyncore.close_all() 846 asyncore.socket = smtpd.socket = socket 847 smtpd.DEBUGSTREAM = self.old_debugstream 848 849 def write_line(self, line): 850 self.channel.socket.queue_recv(line) 851 self.channel.handle_read() 852 853 def test_ascii_data(self): 854 self.write_line(b'HELO example') 855 self.write_line(b'MAIL From:eggs@example') 856 self.write_line(b'RCPT To:spam@example') 857 self.write_line(b'DATA') 858 self.write_line(b'plain ascii text') 859 self.write_line(b'.') 860 self.assertEqual(self.channel.received_data, b'plain ascii text') 861 862 def test_utf8_data(self): 863 self.write_line(b'HELO example') 864 self.write_line(b'MAIL From:eggs@example') 865 self.write_line(b'RCPT To:spam@example') 866 self.write_line(b'DATA') 867 self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 868 self.write_line(b'and some plain ascii') 869 self.write_line(b'.') 870 self.assertEqual( 871 self.channel.received_data, 872 b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87\n' 873 b'and some plain ascii') 874 875 876class SMTPDChannelWithDecodeDataTrue(unittest.TestCase): 877 878 def setUp(self): 879 smtpd.socket = asyncore.socket = mock_socket 880 self.old_debugstream = smtpd.DEBUGSTREAM 881 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 882 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), 883 decode_data=True) 884 conn, addr = self.server.accept() 885 # Set decode_data to True 886 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 887 decode_data=True) 888 889 def tearDown(self): 890 asyncore.close_all() 891 asyncore.socket = smtpd.socket = socket 892 smtpd.DEBUGSTREAM = self.old_debugstream 893 894 def write_line(self, line): 895 self.channel.socket.queue_recv(line) 896 self.channel.handle_read() 897 898 def test_ascii_data(self): 899 self.write_line(b'HELO example') 900 self.write_line(b'MAIL From:eggs@example') 901 self.write_line(b'RCPT To:spam@example') 902 self.write_line(b'DATA') 903 self.write_line(b'plain ascii text') 904 self.write_line(b'.') 905 self.assertEqual(self.channel.received_data, 'plain ascii text') 906 907 def test_utf8_data(self): 908 self.write_line(b'HELO example') 909 self.write_line(b'MAIL From:eggs@example') 910 self.write_line(b'RCPT To:spam@example') 911 self.write_line(b'DATA') 912 self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 913 self.write_line(b'and some plain ascii') 914 self.write_line(b'.') 915 self.assertEqual( 916 self.channel.received_data, 917 'utf8 enriched text: żźć\nand some plain ascii') 918 919 920class SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase): 921 def setUp(self): 922 smtpd.socket = asyncore.socket = mock_socket 923 self.old_debugstream = smtpd.DEBUGSTREAM 924 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 925 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), 926 enable_SMTPUTF8=True) 927 conn, addr = self.server.accept() 928 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 929 enable_SMTPUTF8=True) 930 931 def tearDown(self): 932 asyncore.close_all() 933 asyncore.socket = smtpd.socket = socket 934 smtpd.DEBUGSTREAM = self.old_debugstream 935 936 def write_line(self, line): 937 self.channel.socket.queue_recv(line) 938 self.channel.handle_read() 939 940 def test_MAIL_command_accepts_SMTPUTF8_when_announced(self): 941 self.write_line(b'EHLO example') 942 self.write_line( 943 'MAIL from: <naïve@example.com> BODY=8BITMIME SMTPUTF8'.encode( 944 'utf-8') 945 ) 946 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 947 948 def test_process_smtputf8_message(self): 949 self.write_line(b'EHLO example') 950 for mail_parameters in [b'', b'BODY=8BITMIME SMTPUTF8']: 951 self.write_line(b'MAIL from: <a@example> ' + mail_parameters) 952 self.assertEqual(self.channel.socket.last[0:3], b'250') 953 self.write_line(b'rcpt to:<b@example.com>') 954 self.assertEqual(self.channel.socket.last[0:3], b'250') 955 self.write_line(b'data') 956 self.assertEqual(self.channel.socket.last[0:3], b'354') 957 self.write_line(b'c\r\n.') 958 if mail_parameters == b'': 959 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 960 else: 961 self.assertEqual(self.channel.socket.last, 962 b'250 SMTPUTF8 message okish\r\n') 963 964 def test_utf8_data(self): 965 self.write_line(b'EHLO example') 966 self.write_line( 967 'MAIL From: naïve@examplé BODY=8BITMIME SMTPUTF8'.encode('utf-8')) 968 self.assertEqual(self.channel.socket.last[0:3], b'250') 969 self.write_line('RCPT To:späm@examplé'.encode('utf-8')) 970 self.assertEqual(self.channel.socket.last[0:3], b'250') 971 self.write_line(b'DATA') 972 self.assertEqual(self.channel.socket.last[0:3], b'354') 973 self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 974 self.write_line(b'.') 975 self.assertEqual( 976 self.channel.received_data, 977 b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 978 979 def test_MAIL_command_limit_extended_with_SIZE_and_SMTPUTF8(self): 980 self.write_line(b'ehlo example') 981 fill_len = (512 + 26 + 10) - len('mail from:<@example>') 982 self.write_line(b'MAIL from:<' + 983 b'a' * (fill_len + 1) + 984 b'@example>') 985 self.assertEqual(self.channel.socket.last, 986 b'500 Error: line too long\r\n') 987 self.write_line(b'MAIL from:<' + 988 b'a' * fill_len + 989 b'@example>') 990 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 991 992 def test_multiple_emails_with_extended_command_length(self): 993 self.write_line(b'ehlo example') 994 fill_len = (512 + 26 + 10) - len('mail from:<@example>') 995 for char in [b'a', b'b', b'c']: 996 self.write_line(b'MAIL from:<' + char * fill_len + b'a@example>') 997 self.assertEqual(self.channel.socket.last[0:3], b'500') 998 self.write_line(b'MAIL from:<' + char * fill_len + b'@example>') 999 self.assertEqual(self.channel.socket.last[0:3], b'250') 1000 self.write_line(b'rcpt to:<hans@example.com>') 1001 self.assertEqual(self.channel.socket.last[0:3], b'250') 1002 self.write_line(b'data') 1003 self.assertEqual(self.channel.socket.last[0:3], b'354') 1004 self.write_line(b'test\r\n.') 1005 self.assertEqual(self.channel.socket.last[0:3], b'250') 1006 1007 1008class MiscTestCase(unittest.TestCase): 1009 def test__all__(self): 1010 not_exported = { 1011 "program", "Devnull", "DEBUGSTREAM", "NEWLINE", "COMMASPACE", 1012 "DATA_SIZE_DEFAULT", "usage", "Options", "parseargs", 1013 } 1014 support.check__all__(self, smtpd, not_exported=not_exported) 1015 1016 1017if __name__ == "__main__": 1018 unittest.main() 1019