• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import io
2import types
3import textwrap
4import unittest
5import email.policy
6import email.parser
7import email.generator
8import email.message
9from email import headerregistry
10
11def make_defaults(base_defaults, differences):
12    defaults = base_defaults.copy()
13    defaults.update(differences)
14    return defaults
15
16class PolicyAPITests(unittest.TestCase):
17
18    longMessage = True
19
20    # Base default values.
21    compat32_defaults = {
22        'max_line_length':          78,
23        'linesep':                  '\n',
24        'cte_type':                 '8bit',
25        'raise_on_defect':          False,
26        'mangle_from_':             True,
27        'message_factory':          None,
28        }
29    # These default values are the ones set on email.policy.default.
30    # If any of these defaults change, the docs must be updated.
31    policy_defaults = compat32_defaults.copy()
32    policy_defaults.update({
33        'utf8':                     False,
34        'raise_on_defect':          False,
35        'header_factory':           email.policy.EmailPolicy.header_factory,
36        'refold_source':            'long',
37        'content_manager':          email.policy.EmailPolicy.content_manager,
38        'mangle_from_':             False,
39        'message_factory':          email.message.EmailMessage,
40        })
41
42    # For each policy under test, we give here what we expect the defaults to
43    # be for that policy.  The second argument to make defaults is the
44    # difference between the base defaults and that for the particular policy.
45    new_policy = email.policy.EmailPolicy()
46    policies = {
47        email.policy.compat32: make_defaults(compat32_defaults, {}),
48        email.policy.default: make_defaults(policy_defaults, {}),
49        email.policy.SMTP: make_defaults(policy_defaults,
50                                         {'linesep': '\r\n'}),
51        email.policy.SMTPUTF8: make_defaults(policy_defaults,
52                                             {'linesep': '\r\n',
53                                              'utf8': True}),
54        email.policy.HTTP: make_defaults(policy_defaults,
55                                         {'linesep': '\r\n',
56                                          'max_line_length': None}),
57        email.policy.strict: make_defaults(policy_defaults,
58                                           {'raise_on_defect': True}),
59        new_policy: make_defaults(policy_defaults, {}),
60        }
61    # Creating a new policy creates a new header factory.  There is a test
62    # later that proves this.
63    policies[new_policy]['header_factory'] = new_policy.header_factory
64
65    def test_defaults(self):
66        for policy, expected in self.policies.items():
67            for attr, value in expected.items():
68                with self.subTest(policy=policy, attr=attr):
69                    self.assertEqual(getattr(policy, attr), value,
70                                    ("change {} docs/docstrings if defaults have "
71                                    "changed").format(policy))
72
73    def test_all_attributes_covered(self):
74        for policy, expected in self.policies.items():
75            for attr in dir(policy):
76                with self.subTest(policy=policy, attr=attr):
77                    if (attr.startswith('_') or
78                            isinstance(getattr(email.policy.EmailPolicy, attr),
79                                  types.FunctionType)):
80                        continue
81                    else:
82                        self.assertIn(attr, expected,
83                                      "{} is not fully tested".format(attr))
84
85    def test_abc(self):
86        with self.assertRaises(TypeError) as cm:
87            email.policy.Policy()
88        msg = str(cm.exception)
89        abstract_methods = ('fold',
90                            'fold_binary',
91                            'header_fetch_parse',
92                            'header_source_parse',
93                            'header_store_parse')
94        for method in abstract_methods:
95            self.assertIn(method, msg)
96
97    def test_policy_is_immutable(self):
98        for policy, defaults in self.policies.items():
99            for attr in defaults:
100                with self.assertRaisesRegex(AttributeError, attr+".*read-only"):
101                    setattr(policy, attr, None)
102            with self.assertRaisesRegex(AttributeError, 'no attribute.*foo'):
103                policy.foo = None
104
105    def test_set_policy_attrs_when_cloned(self):
106        # None of the attributes has a default value of None, so we set them
107        # all to None in the clone call and check that it worked.
108        for policyclass, defaults in self.policies.items():
109            testattrdict = {attr: None for attr in defaults}
110            policy = policyclass.clone(**testattrdict)
111            for attr in defaults:
112                self.assertIsNone(getattr(policy, attr))
113
114    def test_reject_non_policy_keyword_when_called(self):
115        for policyclass in self.policies:
116            with self.assertRaises(TypeError):
117                policyclass(this_keyword_should_not_be_valid=None)
118            with self.assertRaises(TypeError):
119                policyclass(newtline=None)
120
121    def test_policy_addition(self):
122        expected = self.policy_defaults.copy()
123        p1 = email.policy.default.clone(max_line_length=100)
124        p2 = email.policy.default.clone(max_line_length=50)
125        added = p1 + p2
126        expected.update(max_line_length=50)
127        for attr, value in expected.items():
128            self.assertEqual(getattr(added, attr), value)
129        added = p2 + p1
130        expected.update(max_line_length=100)
131        for attr, value in expected.items():
132            self.assertEqual(getattr(added, attr), value)
133        added = added + email.policy.default
134        for attr, value in expected.items():
135            self.assertEqual(getattr(added, attr), value)
136
137    def test_register_defect(self):
138        class Dummy:
139            def __init__(self):
140                self.defects = []
141        obj = Dummy()
142        defect = object()
143        policy = email.policy.EmailPolicy()
144        policy.register_defect(obj, defect)
145        self.assertEqual(obj.defects, [defect])
146        defect2 = object()
147        policy.register_defect(obj, defect2)
148        self.assertEqual(obj.defects, [defect, defect2])
149
150    class MyObj:
151        def __init__(self):
152            self.defects = []
153
154    class MyDefect(Exception):
155        pass
156
157    def test_handle_defect_raises_on_strict(self):
158        foo = self.MyObj()
159        defect = self.MyDefect("the telly is broken")
160        with self.assertRaisesRegex(self.MyDefect, "the telly is broken"):
161            email.policy.strict.handle_defect(foo, defect)
162
163    def test_handle_defect_registers_defect(self):
164        foo = self.MyObj()
165        defect1 = self.MyDefect("one")
166        email.policy.default.handle_defect(foo, defect1)
167        self.assertEqual(foo.defects, [defect1])
168        defect2 = self.MyDefect("two")
169        email.policy.default.handle_defect(foo, defect2)
170        self.assertEqual(foo.defects, [defect1, defect2])
171
172    class MyPolicy(email.policy.EmailPolicy):
173        defects = None
174        def __init__(self, *args, **kw):
175            super().__init__(*args, defects=[], **kw)
176        def register_defect(self, obj, defect):
177            self.defects.append(defect)
178
179    def test_overridden_register_defect_still_raises(self):
180        foo = self.MyObj()
181        defect = self.MyDefect("the telly is broken")
182        with self.assertRaisesRegex(self.MyDefect, "the telly is broken"):
183            self.MyPolicy(raise_on_defect=True).handle_defect(foo, defect)
184
185    def test_overridden_register_defect_works(self):
186        foo = self.MyObj()
187        defect1 = self.MyDefect("one")
188        my_policy = self.MyPolicy()
189        my_policy.handle_defect(foo, defect1)
190        self.assertEqual(my_policy.defects, [defect1])
191        self.assertEqual(foo.defects, [])
192        defect2 = self.MyDefect("two")
193        my_policy.handle_defect(foo, defect2)
194        self.assertEqual(my_policy.defects, [defect1, defect2])
195        self.assertEqual(foo.defects, [])
196
197    def test_default_header_factory(self):
198        h = email.policy.default.header_factory('Test', 'test')
199        self.assertEqual(h.name, 'Test')
200        self.assertIsInstance(h, headerregistry.UnstructuredHeader)
201        self.assertIsInstance(h, headerregistry.BaseHeader)
202
203    class Foo:
204        parse = headerregistry.UnstructuredHeader.parse
205
206    def test_each_Policy_gets_unique_factory(self):
207        policy1 = email.policy.EmailPolicy()
208        policy2 = email.policy.EmailPolicy()
209        policy1.header_factory.map_to_type('foo', self.Foo)
210        h = policy1.header_factory('foo', 'test')
211        self.assertIsInstance(h, self.Foo)
212        self.assertNotIsInstance(h, headerregistry.UnstructuredHeader)
213        h = policy2.header_factory('foo', 'test')
214        self.assertNotIsInstance(h, self.Foo)
215        self.assertIsInstance(h, headerregistry.UnstructuredHeader)
216
217    def test_clone_copies_factory(self):
218        policy1 = email.policy.EmailPolicy()
219        policy2 = policy1.clone()
220        policy1.header_factory.map_to_type('foo', self.Foo)
221        h = policy1.header_factory('foo', 'test')
222        self.assertIsInstance(h, self.Foo)
223        h = policy2.header_factory('foo', 'test')
224        self.assertIsInstance(h, self.Foo)
225
226    def test_new_factory_overrides_default(self):
227        mypolicy = email.policy.EmailPolicy()
228        myfactory = mypolicy.header_factory
229        newpolicy = mypolicy + email.policy.strict
230        self.assertEqual(newpolicy.header_factory, myfactory)
231        newpolicy = email.policy.strict + mypolicy
232        self.assertEqual(newpolicy.header_factory, myfactory)
233
234    def test_adding_default_policies_preserves_default_factory(self):
235        newpolicy = email.policy.default + email.policy.strict
236        self.assertEqual(newpolicy.header_factory,
237                         email.policy.EmailPolicy.header_factory)
238        self.assertEqual(newpolicy.__dict__, {'raise_on_defect': True})
239
240    # XXX: Need subclassing tests.
241    # For adding subclassed objects, make sure the usual rules apply (subclass
242    # wins), but that the order still works (right overrides left).
243
244
245class TestException(Exception):
246    pass
247
248class TestPolicyPropagation(unittest.TestCase):
249
250    # The abstract methods are used by the parser but not by the wrapper
251    # functions that call it, so if the exception gets raised we know that the
252    # policy was actually propagated all the way to feedparser.
253    class MyPolicy(email.policy.Policy):
254        def badmethod(self, *args, **kw):
255            raise TestException("test")
256        fold = fold_binary = header_fetch_parser = badmethod
257        header_source_parse = header_store_parse = badmethod
258
259    def test_message_from_string(self):
260        with self.assertRaisesRegex(TestException, "^test$"):
261            email.message_from_string("Subject: test\n\n",
262                                      policy=self.MyPolicy)
263
264    def test_message_from_bytes(self):
265        with self.assertRaisesRegex(TestException, "^test$"):
266            email.message_from_bytes(b"Subject: test\n\n",
267                                     policy=self.MyPolicy)
268
269    def test_message_from_file(self):
270        f = io.StringIO('Subject: test\n\n')
271        with self.assertRaisesRegex(TestException, "^test$"):
272            email.message_from_file(f, policy=self.MyPolicy)
273
274    def test_message_from_binary_file(self):
275        f = io.BytesIO(b'Subject: test\n\n')
276        with self.assertRaisesRegex(TestException, "^test$"):
277            email.message_from_binary_file(f, policy=self.MyPolicy)
278
279    # These are redundant, but we need them for black-box completeness.
280
281    def test_parser(self):
282        p = email.parser.Parser(policy=self.MyPolicy)
283        with self.assertRaisesRegex(TestException, "^test$"):
284            p.parsestr('Subject: test\n\n')
285
286    def test_bytes_parser(self):
287        p = email.parser.BytesParser(policy=self.MyPolicy)
288        with self.assertRaisesRegex(TestException, "^test$"):
289            p.parsebytes(b'Subject: test\n\n')
290
291    # Now that we've established that all the parse methods get the
292    # policy in to feedparser, we can use message_from_string for
293    # the rest of the propagation tests.
294
295    def _make_msg(self, source='Subject: test\n\n', policy=None):
296        self.policy = email.policy.default.clone() if policy is None else policy
297        return email.message_from_string(source, policy=self.policy)
298
299    def test_parser_propagates_policy_to_message(self):
300        msg = self._make_msg()
301        self.assertIs(msg.policy, self.policy)
302
303    def test_parser_propagates_policy_to_sub_messages(self):
304        msg = self._make_msg(textwrap.dedent("""\
305            Subject: mime test
306            MIME-Version: 1.0
307            Content-Type: multipart/mixed, boundary="XXX"
308
309            --XXX
310            Content-Type: text/plain
311
312            test
313            --XXX
314            Content-Type: text/plain
315
316            test2
317            --XXX--
318            """))
319        for part in msg.walk():
320            self.assertIs(part.policy, self.policy)
321
322    def test_message_policy_propagates_to_generator(self):
323        msg = self._make_msg("Subject: test\nTo: foo\n\n",
324                             policy=email.policy.default.clone(linesep='X'))
325        s = io.StringIO()
326        g = email.generator.Generator(s)
327        g.flatten(msg)
328        self.assertEqual(s.getvalue(), "Subject: testXTo: fooXX")
329
330    def test_message_policy_used_by_as_string(self):
331        msg = self._make_msg("Subject: test\nTo: foo\n\n",
332                             policy=email.policy.default.clone(linesep='X'))
333        self.assertEqual(msg.as_string(), "Subject: testXTo: fooXX")
334
335
336class TestConcretePolicies(unittest.TestCase):
337
338    def test_header_store_parse_rejects_newlines(self):
339        instance = email.policy.EmailPolicy()
340        self.assertRaises(ValueError,
341                          instance.header_store_parse,
342                          'From', 'spam\negg@foo.py')
343
344
345if __name__ == '__main__':
346    unittest.main()
347