1import unittest 2from test.test_email import TestEmailBase, parameterize 3import textwrap 4from email import policy 5from email.message import EmailMessage 6from email.contentmanager import ContentManager, raw_data_manager 7 8 9@parameterize 10class TestContentManager(TestEmailBase): 11 12 policy = policy.default 13 message = EmailMessage 14 15 get_key_params = { 16 'full_type': (1, 'text/plain',), 17 'maintype_only': (2, 'text',), 18 'null_key': (3, '',), 19 } 20 21 def get_key_as_get_content_key(self, order, key): 22 def foo_getter(msg, foo=None): 23 bar = msg['X-Bar-Header'] 24 return foo, bar 25 cm = ContentManager() 26 cm.add_get_handler(key, foo_getter) 27 m = self._make_message() 28 m['Content-Type'] = 'text/plain' 29 m['X-Bar-Header'] = 'foo' 30 self.assertEqual(cm.get_content(m, foo='bar'), ('bar', 'foo')) 31 32 def get_key_as_get_content_key_order(self, order, key): 33 def bar_getter(msg): 34 return msg['X-Bar-Header'] 35 def foo_getter(msg): 36 return msg['X-Foo-Header'] 37 cm = ContentManager() 38 cm.add_get_handler(key, foo_getter) 39 for precedence, key in self.get_key_params.values(): 40 if precedence > order: 41 cm.add_get_handler(key, bar_getter) 42 m = self._make_message() 43 m['Content-Type'] = 'text/plain' 44 m['X-Bar-Header'] = 'bar' 45 m['X-Foo-Header'] = 'foo' 46 self.assertEqual(cm.get_content(m), ('foo')) 47 48 def test_get_content_raises_if_unknown_mimetype_and_no_default(self): 49 cm = ContentManager() 50 m = self._make_message() 51 m['Content-Type'] = 'text/plain' 52 with self.assertRaisesRegex(KeyError, 'text/plain'): 53 cm.get_content(m) 54 55 class BaseThing(str): 56 pass 57 baseobject_full_path = __name__ + '.' + 'TestContentManager.BaseThing' 58 class Thing(BaseThing): 59 pass 60 testobject_full_path = __name__ + '.' + 'TestContentManager.Thing' 61 62 set_key_params = { 63 'type': (0, Thing,), 64 'full_path': (1, testobject_full_path,), 65 'qualname': (2, 'TestContentManager.Thing',), 66 'name': (3, 'Thing',), 67 'base_type': (4, BaseThing,), 68 'base_full_path': (5, baseobject_full_path,), 69 'base_qualname': (6, 'TestContentManager.BaseThing',), 70 'base_name': (7, 'BaseThing',), 71 'str_type': (8, str,), 72 'str_full_path': (9, 'builtins.str',), 73 'str_name': (10, 'str',), # str name and qualname are the same 74 'null_key': (11, None,), 75 } 76 77 def set_key_as_set_content_key(self, order, key): 78 def foo_setter(msg, obj, foo=None): 79 msg['X-Foo-Header'] = foo 80 msg.set_payload(obj) 81 cm = ContentManager() 82 cm.add_set_handler(key, foo_setter) 83 m = self._make_message() 84 msg_obj = self.Thing() 85 cm.set_content(m, msg_obj, foo='bar') 86 self.assertEqual(m['X-Foo-Header'], 'bar') 87 self.assertEqual(m.get_payload(), msg_obj) 88 89 def set_key_as_set_content_key_order(self, order, key): 90 def foo_setter(msg, obj): 91 msg['X-FooBar-Header'] = 'foo' 92 msg.set_payload(obj) 93 def bar_setter(msg, obj): 94 msg['X-FooBar-Header'] = 'bar' 95 cm = ContentManager() 96 cm.add_set_handler(key, foo_setter) 97 for precedence, key in self.get_key_params.values(): 98 if precedence > order: 99 cm.add_set_handler(key, bar_setter) 100 m = self._make_message() 101 msg_obj = self.Thing() 102 cm.set_content(m, msg_obj) 103 self.assertEqual(m['X-FooBar-Header'], 'foo') 104 self.assertEqual(m.get_payload(), msg_obj) 105 106 def test_set_content_raises_if_unknown_type_and_no_default(self): 107 cm = ContentManager() 108 m = self._make_message() 109 msg_obj = self.Thing() 110 with self.assertRaisesRegex(KeyError, self.testobject_full_path): 111 cm.set_content(m, msg_obj) 112 113 def test_set_content_raises_if_called_on_multipart(self): 114 cm = ContentManager() 115 m = self._make_message() 116 m['Content-Type'] = 'multipart/foo' 117 with self.assertRaises(TypeError): 118 cm.set_content(m, 'test') 119 120 def test_set_content_calls_clear_content(self): 121 m = self._make_message() 122 m['Content-Foo'] = 'bar' 123 m['Content-Type'] = 'text/html' 124 m['To'] = 'test' 125 m.set_payload('abc') 126 cm = ContentManager() 127 cm.add_set_handler(str, lambda *args, **kw: None) 128 m.set_content('xyz', content_manager=cm) 129 self.assertIsNone(m['Content-Foo']) 130 self.assertIsNone(m['Content-Type']) 131 self.assertEqual(m['To'], 'test') 132 self.assertIsNone(m.get_payload()) 133 134 135@parameterize 136class TestRawDataManager(TestEmailBase): 137 # Note: these tests are dependent on the order in which headers are added 138 # to the message objects by the code. There's no defined ordering in 139 # RFC5322/MIME, so this makes the tests more fragile than the standards 140 # require. However, if the header order changes it is best to understand 141 # *why*, and make sure it isn't a subtle bug in whatever change was 142 # applied. 143 144 policy = policy.default.clone(max_line_length=60, 145 content_manager=raw_data_manager) 146 message = EmailMessage 147 148 def test_get_text_plain(self): 149 m = self._str_msg(textwrap.dedent("""\ 150 Content-Type: text/plain 151 152 Basic text. 153 """)) 154 self.assertEqual(raw_data_manager.get_content(m), "Basic text.\n") 155 156 def test_get_text_html(self): 157 m = self._str_msg(textwrap.dedent("""\ 158 Content-Type: text/html 159 160 <p>Basic text.</p> 161 """)) 162 self.assertEqual(raw_data_manager.get_content(m), 163 "<p>Basic text.</p>\n") 164 165 def test_get_text_plain_latin1(self): 166 m = self._bytes_msg(textwrap.dedent("""\ 167 Content-Type: text/plain; charset=latin1 168 169 Basìc tëxt. 170 """).encode('latin1')) 171 self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n") 172 173 def test_get_text_plain_latin1_quoted_printable(self): 174 m = self._str_msg(textwrap.dedent("""\ 175 Content-Type: text/plain; charset="latin-1" 176 Content-Transfer-Encoding: quoted-printable 177 178 Bas=ECc t=EBxt. 179 """)) 180 self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n") 181 182 def test_get_text_plain_utf8_base64(self): 183 m = self._str_msg(textwrap.dedent("""\ 184 Content-Type: text/plain; charset="utf8" 185 Content-Transfer-Encoding: base64 186 187 QmFzw6xjIHTDq3h0Lgo= 188 """)) 189 self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n") 190 191 def test_get_text_plain_bad_utf8_quoted_printable(self): 192 m = self._str_msg(textwrap.dedent("""\ 193 Content-Type: text/plain; charset="utf8" 194 Content-Transfer-Encoding: quoted-printable 195 196 Bas=c3=acc t=c3=abxt=fd. 197 """)) 198 self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt�.\n") 199 200 def test_get_text_plain_bad_utf8_quoted_printable_ignore_errors(self): 201 m = self._str_msg(textwrap.dedent("""\ 202 Content-Type: text/plain; charset="utf8" 203 Content-Transfer-Encoding: quoted-printable 204 205 Bas=c3=acc t=c3=abxt=fd. 206 """)) 207 self.assertEqual(raw_data_manager.get_content(m, errors='ignore'), 208 "Basìc tëxt.\n") 209 210 def test_get_text_plain_utf8_base64_recoverable_bad_CTE_data(self): 211 m = self._str_msg(textwrap.dedent("""\ 212 Content-Type: text/plain; charset="utf8" 213 Content-Transfer-Encoding: base64 214 215 QmFzw6xjIHTDq3h0Lgo\xFF= 216 """)) 217 self.assertEqual(raw_data_manager.get_content(m, errors='ignore'), 218 "Basìc tëxt.\n") 219 220 def test_get_text_invalid_keyword(self): 221 m = self._str_msg(textwrap.dedent("""\ 222 Content-Type: text/plain 223 224 Basic text. 225 """)) 226 with self.assertRaises(TypeError): 227 raw_data_manager.get_content(m, foo='ignore') 228 229 def test_get_non_text(self): 230 template = textwrap.dedent("""\ 231 Content-Type: {} 232 Content-Transfer-Encoding: base64 233 234 Ym9ndXMgZGF0YQ== 235 """) 236 for maintype in 'audio image video application'.split(): 237 with self.subTest(maintype=maintype): 238 m = self._str_msg(template.format(maintype+'/foo')) 239 self.assertEqual(raw_data_manager.get_content(m), b"bogus data") 240 241 def test_get_non_text_invalid_keyword(self): 242 m = self._str_msg(textwrap.dedent("""\ 243 Content-Type: image/jpg 244 Content-Transfer-Encoding: base64 245 246 Ym9ndXMgZGF0YQ== 247 """)) 248 with self.assertRaises(TypeError): 249 raw_data_manager.get_content(m, errors='ignore') 250 251 def test_get_raises_on_multipart(self): 252 m = self._str_msg(textwrap.dedent("""\ 253 Content-Type: multipart/mixed; boundary="===" 254 255 --=== 256 --===-- 257 """)) 258 with self.assertRaises(KeyError): 259 raw_data_manager.get_content(m) 260 261 def test_get_message_rfc822_and_external_body(self): 262 template = textwrap.dedent("""\ 263 Content-Type: message/{} 264 265 To: foo@example.com 266 From: bar@example.com 267 Subject: example 268 269 an example message 270 """) 271 for subtype in 'rfc822 external-body'.split(): 272 with self.subTest(subtype=subtype): 273 m = self._str_msg(template.format(subtype)) 274 sub_msg = raw_data_manager.get_content(m) 275 self.assertIsInstance(sub_msg, self.message) 276 self.assertEqual(raw_data_manager.get_content(sub_msg), 277 "an example message\n") 278 self.assertEqual(sub_msg['to'], 'foo@example.com') 279 self.assertEqual(sub_msg['from'].addresses[0].username, 'bar') 280 281 def test_get_message_non_rfc822_or_external_body_yields_bytes(self): 282 m = self._str_msg(textwrap.dedent("""\ 283 Content-Type: message/partial 284 285 To: foo@example.com 286 From: bar@example.com 287 Subject: example 288 289 The real body is in another message. 290 """)) 291 self.assertEqual(raw_data_manager.get_content(m)[:10], b'To: foo@ex') 292 293 def test_set_text_plain(self): 294 m = self._make_message() 295 content = "Simple message.\n" 296 raw_data_manager.set_content(m, content) 297 self.assertEqual(str(m), textwrap.dedent("""\ 298 Content-Type: text/plain; charset="utf-8" 299 Content-Transfer-Encoding: 7bit 300 301 Simple message. 302 """)) 303 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 304 self.assertEqual(m.get_content(), content) 305 306 def test_set_text_plain_null(self): 307 m = self._make_message() 308 content = '' 309 raw_data_manager.set_content(m, content) 310 self.assertEqual(str(m), textwrap.dedent("""\ 311 Content-Type: text/plain; charset="utf-8" 312 Content-Transfer-Encoding: 7bit 313 314 315 """)) 316 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), '\n') 317 self.assertEqual(m.get_content(), '\n') 318 319 def test_set_text_html(self): 320 m = self._make_message() 321 content = "<p>Simple message.</p>\n" 322 raw_data_manager.set_content(m, content, subtype='html') 323 self.assertEqual(str(m), textwrap.dedent("""\ 324 Content-Type: text/html; charset="utf-8" 325 Content-Transfer-Encoding: 7bit 326 327 <p>Simple message.</p> 328 """)) 329 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 330 self.assertEqual(m.get_content(), content) 331 332 def test_set_text_charset_latin_1(self): 333 m = self._make_message() 334 content = "Simple message.\n" 335 raw_data_manager.set_content(m, content, charset='latin-1') 336 self.assertEqual(str(m), textwrap.dedent("""\ 337 Content-Type: text/plain; charset="iso-8859-1" 338 Content-Transfer-Encoding: 7bit 339 340 Simple message. 341 """)) 342 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 343 self.assertEqual(m.get_content(), content) 344 345 def test_set_text_plain_long_line_heuristics(self): 346 m = self._make_message() 347 content = ("Simple but long message that is over 78 characters" 348 " long to force transfer encoding.\n") 349 raw_data_manager.set_content(m, content) 350 self.assertEqual(str(m), textwrap.dedent("""\ 351 Content-Type: text/plain; charset="utf-8" 352 Content-Transfer-Encoding: quoted-printable 353 354 Simple but long message that is over 78 characters long to = 355 force transfer encoding. 356 """)) 357 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 358 self.assertEqual(m.get_content(), content) 359 360 def test_set_text_short_line_minimal_non_ascii_heuristics(self): 361 m = self._make_message() 362 content = "et là il est monté sur moi et il commence à m'éto.\n" 363 raw_data_manager.set_content(m, content) 364 self.assertEqual(bytes(m), textwrap.dedent("""\ 365 Content-Type: text/plain; charset="utf-8" 366 Content-Transfer-Encoding: 8bit 367 368 et là il est monté sur moi et il commence à m'éto. 369 """).encode('utf-8')) 370 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 371 self.assertEqual(m.get_content(), content) 372 373 def test_set_text_long_line_minimal_non_ascii_heuristics(self): 374 m = self._make_message() 375 content = ("j'ai un problème de python. il est sorti de son" 376 " vivarium. et là il est monté sur moi et il commence" 377 " à m'éto.\n") 378 raw_data_manager.set_content(m, content) 379 self.assertEqual(bytes(m), textwrap.dedent("""\ 380 Content-Type: text/plain; charset="utf-8" 381 Content-Transfer-Encoding: quoted-printable 382 383 j'ai un probl=C3=A8me de python. il est sorti de son vivari= 384 um. et l=C3=A0 il est mont=C3=A9 sur moi et il commence = 385 =C3=A0 m'=C3=A9to. 386 """).encode('utf-8')) 387 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 388 self.assertEqual(m.get_content(), content) 389 390 def test_set_text_11_lines_long_line_minimal_non_ascii_heuristics(self): 391 m = self._make_message() 392 content = '\n'*10 + ( 393 "j'ai un problème de python. il est sorti de son" 394 " vivarium. et là il est monté sur moi et il commence" 395 " à m'éto.\n") 396 raw_data_manager.set_content(m, content) 397 self.assertEqual(bytes(m), textwrap.dedent("""\ 398 Content-Type: text/plain; charset="utf-8" 399 Content-Transfer-Encoding: quoted-printable 400 """ + '\n'*10 + """ 401 j'ai un probl=C3=A8me de python. il est sorti de son vivari= 402 um. et l=C3=A0 il est mont=C3=A9 sur moi et il commence = 403 =C3=A0 m'=C3=A9to. 404 """).encode('utf-8')) 405 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 406 self.assertEqual(m.get_content(), content) 407 408 def test_set_text_maximal_non_ascii_heuristics(self): 409 m = self._make_message() 410 content = "áàäéèęöő.\n" 411 raw_data_manager.set_content(m, content) 412 self.assertEqual(bytes(m), textwrap.dedent("""\ 413 Content-Type: text/plain; charset="utf-8" 414 Content-Transfer-Encoding: 8bit 415 416 áàäéèęöő. 417 """).encode('utf-8')) 418 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 419 self.assertEqual(m.get_content(), content) 420 421 def test_set_text_11_lines_maximal_non_ascii_heuristics(self): 422 m = self._make_message() 423 content = '\n'*10 + "áàäéèęöő.\n" 424 raw_data_manager.set_content(m, content) 425 self.assertEqual(bytes(m), textwrap.dedent("""\ 426 Content-Type: text/plain; charset="utf-8" 427 Content-Transfer-Encoding: 8bit 428 """ + '\n'*10 + """ 429 áàäéèęöő. 430 """).encode('utf-8')) 431 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 432 self.assertEqual(m.get_content(), content) 433 434 def test_set_text_long_line_maximal_non_ascii_heuristics(self): 435 m = self._make_message() 436 content = ("áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" 437 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" 438 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n") 439 raw_data_manager.set_content(m, content) 440 self.assertEqual(bytes(m), textwrap.dedent("""\ 441 Content-Type: text/plain; charset="utf-8" 442 Content-Transfer-Encoding: base64 443 444 w6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOoxJnD 445 tsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOo 446 xJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TD 447 qcOoxJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOg 448 w6TDqcOoxJnDtsWRLgo= 449 """).encode('utf-8')) 450 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 451 self.assertEqual(m.get_content(), content) 452 453 def test_set_text_11_lines_long_line_maximal_non_ascii_heuristics(self): 454 # Yes, it chooses "wrong" here. It's a heuristic. So this result 455 # could change if we come up with a better heuristic. 456 m = self._make_message() 457 content = ('\n'*10 + 458 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" 459 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" 460 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n") 461 raw_data_manager.set_content(m, "\n"*10 + 462 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" 463 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" 464 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n") 465 self.assertEqual(bytes(m), textwrap.dedent("""\ 466 Content-Type: text/plain; charset="utf-8" 467 Content-Transfer-Encoding: quoted-printable 468 """ + '\n'*10 + """ 469 =C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3= 470 =A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4= 471 =C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3= 472 =A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99= 473 =C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5= 474 =91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1= 475 =C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3= 476 =A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9= 477 =C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4= 478 =99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6= 479 =C5=91. 480 """).encode('utf-8')) 481 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 482 self.assertEqual(m.get_content(), content) 483 484 def test_set_text_non_ascii_with_cte_7bit_raises(self): 485 m = self._make_message() 486 with self.assertRaises(UnicodeError): 487 raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit') 488 489 def test_set_text_non_ascii_with_charset_ascii_raises(self): 490 m = self._make_message() 491 with self.assertRaises(UnicodeError): 492 raw_data_manager.set_content(m,"áàäéèęöő.\n", charset='ascii') 493 494 def test_set_text_non_ascii_with_cte_7bit_and_charset_ascii_raises(self): 495 m = self._make_message() 496 with self.assertRaises(UnicodeError): 497 raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit', charset='ascii') 498 499 def test_set_message(self): 500 m = self._make_message() 501 m['Subject'] = "Forwarded message" 502 content = self._make_message() 503 content['To'] = 'python@vivarium.org' 504 content['From'] = 'police@monty.org' 505 content['Subject'] = "get back in your box" 506 content.set_content("Or face the comfy chair.") 507 raw_data_manager.set_content(m, content) 508 self.assertEqual(str(m), textwrap.dedent("""\ 509 Subject: Forwarded message 510 Content-Type: message/rfc822 511 Content-Transfer-Encoding: 8bit 512 513 To: python@vivarium.org 514 From: police@monty.org 515 Subject: get back in your box 516 Content-Type: text/plain; charset="utf-8" 517 Content-Transfer-Encoding: 7bit 518 MIME-Version: 1.0 519 520 Or face the comfy chair. 521 """)) 522 payload = m.get_payload(0) 523 self.assertIsInstance(payload, self.message) 524 self.assertEqual(str(payload), str(content)) 525 self.assertIsInstance(m.get_content(), self.message) 526 self.assertEqual(str(m.get_content()), str(content)) 527 528 def test_set_message_with_non_ascii_and_coercion_to_7bit(self): 529 m = self._make_message() 530 m['Subject'] = "Escape report" 531 content = self._make_message() 532 content['To'] = 'police@monty.org' 533 content['From'] = 'victim@monty.org' 534 content['Subject'] = "Help" 535 content.set_content("j'ai un problème de python. il est sorti de son" 536 " vivarium.") 537 raw_data_manager.set_content(m, content) 538 self.assertEqual(bytes(m), textwrap.dedent("""\ 539 Subject: Escape report 540 Content-Type: message/rfc822 541 Content-Transfer-Encoding: 8bit 542 543 To: police@monty.org 544 From: victim@monty.org 545 Subject: Help 546 Content-Type: text/plain; charset="utf-8" 547 Content-Transfer-Encoding: 8bit 548 MIME-Version: 1.0 549 550 j'ai un problème de python. il est sorti de son vivarium. 551 """).encode('utf-8')) 552 # The choice of base64 for the body encoding is because generator 553 # doesn't bother with heuristics and uses it unconditionally for utf-8 554 # text. 555 # XXX: the first cte should be 7bit, too...that's a generator bug. 556 # XXX: the line length in the body also looks like a generator bug. 557 self.assertEqual(m.as_string(maxheaderlen=self.policy.max_line_length), 558 textwrap.dedent("""\ 559 Subject: Escape report 560 Content-Type: message/rfc822 561 Content-Transfer-Encoding: 8bit 562 563 To: police@monty.org 564 From: victim@monty.org 565 Subject: Help 566 Content-Type: text/plain; charset="utf-8" 567 Content-Transfer-Encoding: base64 568 MIME-Version: 1.0 569 570 aidhaSB1biBwcm9ibMOobWUgZGUgcHl0aG9uLiBpbCBlc3Qgc29ydGkgZGUgc29uIHZpdmFyaXVt 571 Lgo= 572 """)) 573 self.assertIsInstance(m.get_content(), self.message) 574 self.assertEqual(str(m.get_content()), str(content)) 575 576 def test_set_message_invalid_cte_raises(self): 577 m = self._make_message() 578 content = self._make_message() 579 for cte in 'quoted-printable base64'.split(): 580 for subtype in 'rfc822 external-body'.split(): 581 with self.subTest(cte=cte, subtype=subtype): 582 with self.assertRaises(ValueError) as ar: 583 m.set_content(content, subtype, cte=cte) 584 exc = str(ar.exception) 585 self.assertIn(cte, exc) 586 self.assertIn(subtype, exc) 587 subtype = 'external-body' 588 for cte in '8bit binary'.split(): 589 with self.subTest(cte=cte, subtype=subtype): 590 with self.assertRaises(ValueError) as ar: 591 m.set_content(content, subtype, cte=cte) 592 exc = str(ar.exception) 593 self.assertIn(cte, exc) 594 self.assertIn(subtype, exc) 595 596 def test_set_image_jpg(self): 597 for content in (b"bogus content", 598 bytearray(b"bogus content"), 599 memoryview(b"bogus content")): 600 with self.subTest(content=content): 601 m = self._make_message() 602 raw_data_manager.set_content(m, content, 'image', 'jpeg') 603 self.assertEqual(str(m), textwrap.dedent("""\ 604 Content-Type: image/jpeg 605 Content-Transfer-Encoding: base64 606 607 Ym9ndXMgY29udGVudA== 608 """)) 609 self.assertEqual(m.get_payload(decode=True), content) 610 self.assertEqual(m.get_content(), content) 611 612 def test_set_audio_aif_with_quoted_printable_cte(self): 613 # Why you would use qp, I don't know, but it is technically supported. 614 # XXX: the incorrect line length is because binascii.b2a_qp doesn't 615 # support a line length parameter, but we must use it to get newline 616 # encoding. 617 # XXX: what about that lack of tailing newline? Do we actually handle 618 # that correctly in all cases? That is, if the *source* has an 619 # unencoded newline, do we add an extra newline to the returned payload 620 # or not? And can that actually be disambiguated based on the RFC? 621 m = self._make_message() 622 content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100 623 m.set_content(content, 'audio', 'aif', cte='quoted-printable') 624 self.assertEqual(bytes(m), textwrap.dedent("""\ 625 Content-Type: audio/aif 626 Content-Transfer-Encoding: quoted-printable 627 MIME-Version: 1.0 628 629 b=FFgus=09con=0At=0Dent=20zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz= 630 zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz""").encode('latin-1')) 631 self.assertEqual(m.get_payload(decode=True), content) 632 self.assertEqual(m.get_content(), content) 633 634 def test_set_video_mpeg_with_binary_cte(self): 635 m = self._make_message() 636 content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100 637 m.set_content(content, 'video', 'mpeg', cte='binary') 638 self.assertEqual(bytes(m), textwrap.dedent("""\ 639 Content-Type: video/mpeg 640 Content-Transfer-Encoding: binary 641 MIME-Version: 1.0 642 643 """).encode('ascii') + 644 # XXX: the second \n ought to be a \r, but generator gets it wrong. 645 # THIS MEANS WE DON'T ACTUALLY SUPPORT THE 'binary' CTE. 646 b'b\xFFgus\tcon\nt\nent zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz' + 647 b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz') 648 self.assertEqual(m.get_payload(decode=True), content) 649 self.assertEqual(m.get_content(), content) 650 651 def test_set_application_octet_stream_with_8bit_cte(self): 652 # In 8bit mode, universal line end logic applies. It is up to the 653 # application to make sure the lines are short enough; we don't check. 654 m = self._make_message() 655 content = b'b\xFFgus\tcon\nt\rent\n' + b'z'*60 + b'\n' 656 m.set_content(content, 'application', 'octet-stream', cte='8bit') 657 self.assertEqual(bytes(m), textwrap.dedent("""\ 658 Content-Type: application/octet-stream 659 Content-Transfer-Encoding: 8bit 660 MIME-Version: 1.0 661 662 """).encode('ascii') + 663 b'b\xFFgus\tcon\nt\nent\n' + 664 b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz\n') 665 self.assertEqual(m.get_payload(decode=True), content) 666 self.assertEqual(m.get_content(), content) 667 668 def test_set_headers_from_header_objects(self): 669 m = self._make_message() 670 content = "Simple message.\n" 671 header_factory = self.policy.header_factory 672 raw_data_manager.set_content(m, content, headers=( 673 header_factory("To", "foo@example.com"), 674 header_factory("From", "foo@example.com"), 675 header_factory("Subject", "I'm talking to myself."))) 676 self.assertEqual(str(m), textwrap.dedent("""\ 677 Content-Type: text/plain; charset="utf-8" 678 To: foo@example.com 679 From: foo@example.com 680 Subject: I'm talking to myself. 681 Content-Transfer-Encoding: 7bit 682 683 Simple message. 684 """)) 685 686 def test_set_headers_from_strings(self): 687 m = self._make_message() 688 content = "Simple message.\n" 689 raw_data_manager.set_content(m, content, headers=( 690 "X-Foo-Header: foo", 691 "X-Bar-Header: bar",)) 692 self.assertEqual(str(m), textwrap.dedent("""\ 693 Content-Type: text/plain; charset="utf-8" 694 X-Foo-Header: foo 695 X-Bar-Header: bar 696 Content-Transfer-Encoding: 7bit 697 698 Simple message. 699 """)) 700 701 def test_set_headers_with_invalid_duplicate_string_header_raises(self): 702 m = self._make_message() 703 content = "Simple message.\n" 704 with self.assertRaisesRegex(ValueError, 'Content-Type'): 705 raw_data_manager.set_content(m, content, headers=( 706 "Content-Type: foo/bar",) 707 ) 708 709 def test_set_headers_with_invalid_duplicate_header_header_raises(self): 710 m = self._make_message() 711 content = "Simple message.\n" 712 header_factory = self.policy.header_factory 713 with self.assertRaisesRegex(ValueError, 'Content-Type'): 714 raw_data_manager.set_content(m, content, headers=( 715 header_factory("Content-Type", " foo/bar"),) 716 ) 717 718 def test_set_headers_with_defective_string_header_raises(self): 719 m = self._make_message() 720 content = "Simple message.\n" 721 with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'): 722 raw_data_manager.set_content(m, content, headers=( 723 'To: a@fairly@@invalid@address',) 724 ) 725 print(m['To'].defects) 726 727 def test_set_headers_with_defective_header_header_raises(self): 728 m = self._make_message() 729 content = "Simple message.\n" 730 header_factory = self.policy.header_factory 731 with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'): 732 raw_data_manager.set_content(m, content, headers=( 733 header_factory('To', 'a@fairly@@invalid@address'),) 734 ) 735 print(m['To'].defects) 736 737 def test_set_disposition_inline(self): 738 m = self._make_message() 739 m.set_content('foo', disposition='inline') 740 self.assertEqual(m['Content-Disposition'], 'inline') 741 742 def test_set_disposition_attachment(self): 743 m = self._make_message() 744 m.set_content('foo', disposition='attachment') 745 self.assertEqual(m['Content-Disposition'], 'attachment') 746 747 def test_set_disposition_foo(self): 748 m = self._make_message() 749 m.set_content('foo', disposition='foo') 750 self.assertEqual(m['Content-Disposition'], 'foo') 751 752 # XXX: we should have a 'strict' policy mode (beyond raise_on_defect) that 753 # would cause 'foo' above to raise. 754 755 def test_set_filename(self): 756 m = self._make_message() 757 m.set_content('foo', filename='bar.txt') 758 self.assertEqual(m['Content-Disposition'], 759 'attachment; filename="bar.txt"') 760 761 def test_set_filename_and_disposition_inline(self): 762 m = self._make_message() 763 m.set_content('foo', disposition='inline', filename='bar.txt') 764 self.assertEqual(m['Content-Disposition'], 'inline; filename="bar.txt"') 765 766 def test_set_non_ascii_filename(self): 767 m = self._make_message() 768 m.set_content('foo', filename='ábárî.txt') 769 self.assertEqual(bytes(m), textwrap.dedent("""\ 770 Content-Type: text/plain; charset="utf-8" 771 Content-Transfer-Encoding: 7bit 772 Content-Disposition: attachment; 773 filename*=utf-8''%C3%A1b%C3%A1r%C3%AE.txt 774 MIME-Version: 1.0 775 776 foo 777 """).encode('ascii')) 778 779 def test_set_content_bytes_cte_7bit(self): 780 m = self._make_message() 781 m.set_content(b'ASCII-only message.\n', 782 maintype='application', subtype='octet-stream', cte='7bit') 783 self.assertEqual(str(m), textwrap.dedent("""\ 784 Content-Type: application/octet-stream 785 Content-Transfer-Encoding: 7bit 786 MIME-Version: 1.0 787 788 ASCII-only message. 789 """)) 790 791 content_object_params = { 792 'text_plain': ('content', ()), 793 'text_html': ('content', ('html',)), 794 'application_octet_stream': (b'content', 795 ('application', 'octet_stream')), 796 'image_jpeg': (b'content', ('image', 'jpeg')), 797 'message_rfc822': (message(), ()), 798 'message_external_body': (message(), ('external-body',)), 799 } 800 801 def content_object_as_header_receiver(self, obj, mimetype): 802 m = self._make_message() 803 m.set_content(obj, *mimetype, headers=( 804 'To: foo@example.com', 805 'From: bar@simple.net')) 806 self.assertEqual(m['to'], 'foo@example.com') 807 self.assertEqual(m['from'], 'bar@simple.net') 808 809 def content_object_as_disposition_inline_receiver(self, obj, mimetype): 810 m = self._make_message() 811 m.set_content(obj, *mimetype, disposition='inline') 812 self.assertEqual(m['Content-Disposition'], 'inline') 813 814 def content_object_as_non_ascii_filename_receiver(self, obj, mimetype): 815 m = self._make_message() 816 m.set_content(obj, *mimetype, disposition='inline', filename='bár.txt') 817 self.assertEqual(m['Content-Disposition'], 'inline; filename="bár.txt"') 818 self.assertEqual(m.get_filename(), "bár.txt") 819 self.assertEqual(m['Content-Disposition'].params['filename'], "bár.txt") 820 821 def content_object_as_cid_receiver(self, obj, mimetype): 822 m = self._make_message() 823 m.set_content(obj, *mimetype, cid='some_random_stuff') 824 self.assertEqual(m['Content-ID'], 'some_random_stuff') 825 826 def content_object_as_params_receiver(self, obj, mimetype): 827 m = self._make_message() 828 params = {'foo': 'bár', 'abc': 'xyz'} 829 m.set_content(obj, *mimetype, params=params) 830 if isinstance(obj, str): 831 params['charset'] = 'utf-8' 832 self.assertEqual(m['Content-Type'].params, params) 833 834 835if __name__ == '__main__': 836 unittest.main() 837