• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import io
2import email
3import unittest
4from email.message import Message, EmailMessage
5from email.policy import default
6from test.test_email import TestEmailBase
7
8
9class TestCustomMessage(TestEmailBase):
10
11    class MyMessage(Message):
12        def __init__(self, policy):
13            self.check_policy = policy
14            super().__init__()
15
16    MyPolicy = TestEmailBase.policy.clone(linesep='boo')
17
18    def test_custom_message_gets_policy_if_possible_from_string(self):
19        msg = email.message_from_string("Subject: bogus\n\nmsg\n",
20                                        self.MyMessage,
21                                        policy=self.MyPolicy)
22        self.assertIsInstance(msg, self.MyMessage)
23        self.assertIs(msg.check_policy, self.MyPolicy)
24
25    def test_custom_message_gets_policy_if_possible_from_file(self):
26        source_file = io.StringIO("Subject: bogus\n\nmsg\n")
27        msg = email.message_from_file(source_file,
28                                      self.MyMessage,
29                                      policy=self.MyPolicy)
30        self.assertIsInstance(msg, self.MyMessage)
31        self.assertIs(msg.check_policy, self.MyPolicy)
32
33    # XXX add tests for other functions that take Message arg.
34
35
36class TestParserBase:
37
38    def test_only_split_on_cr_lf(self):
39        # The unicode line splitter splits on unicode linebreaks, which are
40        # more numerous than allowed by the email RFCs; make sure we are only
41        # splitting on those two.
42        for parser in self.parsers:
43            with self.subTest(parser=parser.__name__):
44                msg = parser(
45                    "Next-Line: not\x85broken\r\n"
46                    "Null: not\x00broken\r\n"
47                    "Vertical-Tab: not\vbroken\r\n"
48                    "Form-Feed: not\fbroken\r\n"
49                    "File-Separator: not\x1Cbroken\r\n"
50                    "Group-Separator: not\x1Dbroken\r\n"
51                    "Record-Separator: not\x1Ebroken\r\n"
52                    "Line-Separator: not\u2028broken\r\n"
53                    "Paragraph-Separator: not\u2029broken\r\n"
54                    "\r\n",
55                    policy=default,
56                )
57                self.assertEqual(msg.items(), [
58                    ("Next-Line", "not\x85broken"),
59                    ("Null", "not\x00broken"),
60                    ("Vertical-Tab", "not\vbroken"),
61                    ("Form-Feed", "not\fbroken"),
62                    ("File-Separator", "not\x1Cbroken"),
63                    ("Group-Separator", "not\x1Dbroken"),
64                    ("Record-Separator", "not\x1Ebroken"),
65                    ("Line-Separator", "not\u2028broken"),
66                    ("Paragraph-Separator", "not\u2029broken"),
67                ])
68                self.assertEqual(msg.get_payload(), "")
69
70    class MyMessage(EmailMessage):
71        pass
72
73    def test_custom_message_factory_on_policy(self):
74        for parser in self.parsers:
75            with self.subTest(parser=parser.__name__):
76                MyPolicy = default.clone(message_factory=self.MyMessage)
77                msg = parser("To: foo\n\ntest", policy=MyPolicy)
78                self.assertIsInstance(msg, self.MyMessage)
79
80    def test_factory_arg_overrides_policy(self):
81        for parser in self.parsers:
82            with self.subTest(parser=parser.__name__):
83                MyPolicy = default.clone(message_factory=self.MyMessage)
84                msg = parser("To: foo\n\ntest", Message, policy=MyPolicy)
85                self.assertNotIsInstance(msg, self.MyMessage)
86                self.assertIsInstance(msg, Message)
87
88# Play some games to get nice output in subTest.  This code could be clearer
89# if staticmethod supported __name__.
90
91def message_from_file(s, *args, **kw):
92    f = io.StringIO(s)
93    return email.message_from_file(f, *args, **kw)
94
95class TestParser(TestParserBase, TestEmailBase):
96    parsers = (email.message_from_string, message_from_file)
97
98def message_from_bytes(s, *args, **kw):
99    return email.message_from_bytes(s.encode(), *args, **kw)
100
101def message_from_binary_file(s, *args, **kw):
102    f = io.BytesIO(s.encode())
103    return email.message_from_binary_file(f, *args, **kw)
104
105class TestBytesParser(TestParserBase, TestEmailBase):
106    parsers = (message_from_bytes, message_from_binary_file)
107
108
109if __name__ == '__main__':
110    unittest.main()
111