1import io 2import types 3import textwrap 4import unittest 5import email.policy 6import email.parser 7import email.generator 8import email.message 9from email import headerregistry 10 11def make_defaults(base_defaults, differences): 12 defaults = base_defaults.copy() 13 defaults.update(differences) 14 return defaults 15 16class PolicyAPITests(unittest.TestCase): 17 18 longMessage = True 19 20 # Base default values. 21 compat32_defaults = { 22 'max_line_length': 78, 23 'linesep': '\n', 24 'cte_type': '8bit', 25 'raise_on_defect': False, 26 'mangle_from_': True, 27 'message_factory': None, 28 } 29 # These default values are the ones set on email.policy.default. 30 # If any of these defaults change, the docs must be updated. 31 policy_defaults = compat32_defaults.copy() 32 policy_defaults.update({ 33 'utf8': False, 34 'raise_on_defect': False, 35 'header_factory': email.policy.EmailPolicy.header_factory, 36 'refold_source': 'long', 37 'content_manager': email.policy.EmailPolicy.content_manager, 38 'mangle_from_': False, 39 'message_factory': email.message.EmailMessage, 40 }) 41 42 # For each policy under test, we give here what we expect the defaults to 43 # be for that policy. The second argument to make defaults is the 44 # difference between the base defaults and that for the particular policy. 45 new_policy = email.policy.EmailPolicy() 46 policies = { 47 email.policy.compat32: make_defaults(compat32_defaults, {}), 48 email.policy.default: make_defaults(policy_defaults, {}), 49 email.policy.SMTP: make_defaults(policy_defaults, 50 {'linesep': '\r\n'}), 51 email.policy.SMTPUTF8: make_defaults(policy_defaults, 52 {'linesep': '\r\n', 53 'utf8': True}), 54 email.policy.HTTP: make_defaults(policy_defaults, 55 {'linesep': '\r\n', 56 'max_line_length': None}), 57 email.policy.strict: make_defaults(policy_defaults, 58 {'raise_on_defect': True}), 59 new_policy: make_defaults(policy_defaults, {}), 60 } 61 # Creating a new policy creates a new header factory. There is a test 62 # later that proves this. 63 policies[new_policy]['header_factory'] = new_policy.header_factory 64 65 def test_defaults(self): 66 for policy, expected in self.policies.items(): 67 for attr, value in expected.items(): 68 with self.subTest(policy=policy, attr=attr): 69 self.assertEqual(getattr(policy, attr), value, 70 ("change {} docs/docstrings if defaults have " 71 "changed").format(policy)) 72 73 def test_all_attributes_covered(self): 74 for policy, expected in self.policies.items(): 75 for attr in dir(policy): 76 with self.subTest(policy=policy, attr=attr): 77 if (attr.startswith('_') or 78 isinstance(getattr(email.policy.EmailPolicy, attr), 79 types.FunctionType)): 80 continue 81 else: 82 self.assertIn(attr, expected, 83 "{} is not fully tested".format(attr)) 84 85 def test_abc(self): 86 with self.assertRaises(TypeError) as cm: 87 email.policy.Policy() 88 msg = str(cm.exception) 89 abstract_methods = ('fold', 90 'fold_binary', 91 'header_fetch_parse', 92 'header_source_parse', 93 'header_store_parse') 94 for method in abstract_methods: 95 self.assertIn(method, msg) 96 97 def test_policy_is_immutable(self): 98 for policy, defaults in self.policies.items(): 99 for attr in defaults: 100 with self.assertRaisesRegex(AttributeError, attr+".*read-only"): 101 setattr(policy, attr, None) 102 with self.assertRaisesRegex(AttributeError, 'no attribute.*foo'): 103 policy.foo = None 104 105 def test_set_policy_attrs_when_cloned(self): 106 # None of the attributes has a default value of None, so we set them 107 # all to None in the clone call and check that it worked. 108 for policyclass, defaults in self.policies.items(): 109 testattrdict = {attr: None for attr in defaults} 110 policy = policyclass.clone(**testattrdict) 111 for attr in defaults: 112 self.assertIsNone(getattr(policy, attr)) 113 114 def test_reject_non_policy_keyword_when_called(self): 115 for policyclass in self.policies: 116 with self.assertRaises(TypeError): 117 policyclass(this_keyword_should_not_be_valid=None) 118 with self.assertRaises(TypeError): 119 policyclass(newtline=None) 120 121 def test_policy_addition(self): 122 expected = self.policy_defaults.copy() 123 p1 = email.policy.default.clone(max_line_length=100) 124 p2 = email.policy.default.clone(max_line_length=50) 125 added = p1 + p2 126 expected.update(max_line_length=50) 127 for attr, value in expected.items(): 128 self.assertEqual(getattr(added, attr), value) 129 added = p2 + p1 130 expected.update(max_line_length=100) 131 for attr, value in expected.items(): 132 self.assertEqual(getattr(added, attr), value) 133 added = added + email.policy.default 134 for attr, value in expected.items(): 135 self.assertEqual(getattr(added, attr), value) 136 137 def test_register_defect(self): 138 class Dummy: 139 def __init__(self): 140 self.defects = [] 141 obj = Dummy() 142 defect = object() 143 policy = email.policy.EmailPolicy() 144 policy.register_defect(obj, defect) 145 self.assertEqual(obj.defects, [defect]) 146 defect2 = object() 147 policy.register_defect(obj, defect2) 148 self.assertEqual(obj.defects, [defect, defect2]) 149 150 class MyObj: 151 def __init__(self): 152 self.defects = [] 153 154 class MyDefect(Exception): 155 pass 156 157 def test_handle_defect_raises_on_strict(self): 158 foo = self.MyObj() 159 defect = self.MyDefect("the telly is broken") 160 with self.assertRaisesRegex(self.MyDefect, "the telly is broken"): 161 email.policy.strict.handle_defect(foo, defect) 162 163 def test_handle_defect_registers_defect(self): 164 foo = self.MyObj() 165 defect1 = self.MyDefect("one") 166 email.policy.default.handle_defect(foo, defect1) 167 self.assertEqual(foo.defects, [defect1]) 168 defect2 = self.MyDefect("two") 169 email.policy.default.handle_defect(foo, defect2) 170 self.assertEqual(foo.defects, [defect1, defect2]) 171 172 class MyPolicy(email.policy.EmailPolicy): 173 defects = None 174 def __init__(self, *args, **kw): 175 super().__init__(*args, defects=[], **kw) 176 def register_defect(self, obj, defect): 177 self.defects.append(defect) 178 179 def test_overridden_register_defect_still_raises(self): 180 foo = self.MyObj() 181 defect = self.MyDefect("the telly is broken") 182 with self.assertRaisesRegex(self.MyDefect, "the telly is broken"): 183 self.MyPolicy(raise_on_defect=True).handle_defect(foo, defect) 184 185 def test_overridden_register_defect_works(self): 186 foo = self.MyObj() 187 defect1 = self.MyDefect("one") 188 my_policy = self.MyPolicy() 189 my_policy.handle_defect(foo, defect1) 190 self.assertEqual(my_policy.defects, [defect1]) 191 self.assertEqual(foo.defects, []) 192 defect2 = self.MyDefect("two") 193 my_policy.handle_defect(foo, defect2) 194 self.assertEqual(my_policy.defects, [defect1, defect2]) 195 self.assertEqual(foo.defects, []) 196 197 def test_default_header_factory(self): 198 h = email.policy.default.header_factory('Test', 'test') 199 self.assertEqual(h.name, 'Test') 200 self.assertIsInstance(h, headerregistry.UnstructuredHeader) 201 self.assertIsInstance(h, headerregistry.BaseHeader) 202 203 class Foo: 204 parse = headerregistry.UnstructuredHeader.parse 205 206 def test_each_Policy_gets_unique_factory(self): 207 policy1 = email.policy.EmailPolicy() 208 policy2 = email.policy.EmailPolicy() 209 policy1.header_factory.map_to_type('foo', self.Foo) 210 h = policy1.header_factory('foo', 'test') 211 self.assertIsInstance(h, self.Foo) 212 self.assertNotIsInstance(h, headerregistry.UnstructuredHeader) 213 h = policy2.header_factory('foo', 'test') 214 self.assertNotIsInstance(h, self.Foo) 215 self.assertIsInstance(h, headerregistry.UnstructuredHeader) 216 217 def test_clone_copies_factory(self): 218 policy1 = email.policy.EmailPolicy() 219 policy2 = policy1.clone() 220 policy1.header_factory.map_to_type('foo', self.Foo) 221 h = policy1.header_factory('foo', 'test') 222 self.assertIsInstance(h, self.Foo) 223 h = policy2.header_factory('foo', 'test') 224 self.assertIsInstance(h, self.Foo) 225 226 def test_new_factory_overrides_default(self): 227 mypolicy = email.policy.EmailPolicy() 228 myfactory = mypolicy.header_factory 229 newpolicy = mypolicy + email.policy.strict 230 self.assertEqual(newpolicy.header_factory, myfactory) 231 newpolicy = email.policy.strict + mypolicy 232 self.assertEqual(newpolicy.header_factory, myfactory) 233 234 def test_adding_default_policies_preserves_default_factory(self): 235 newpolicy = email.policy.default + email.policy.strict 236 self.assertEqual(newpolicy.header_factory, 237 email.policy.EmailPolicy.header_factory) 238 self.assertEqual(newpolicy.__dict__, {'raise_on_defect': True}) 239 240 # XXX: Need subclassing tests. 241 # For adding subclassed objects, make sure the usual rules apply (subclass 242 # wins), but that the order still works (right overrides left). 243 244 245class TestException(Exception): 246 pass 247 248class TestPolicyPropagation(unittest.TestCase): 249 250 # The abstract methods are used by the parser but not by the wrapper 251 # functions that call it, so if the exception gets raised we know that the 252 # policy was actually propagated all the way to feedparser. 253 class MyPolicy(email.policy.Policy): 254 def badmethod(self, *args, **kw): 255 raise TestException("test") 256 fold = fold_binary = header_fetch_parser = badmethod 257 header_source_parse = header_store_parse = badmethod 258 259 def test_message_from_string(self): 260 with self.assertRaisesRegex(TestException, "^test$"): 261 email.message_from_string("Subject: test\n\n", 262 policy=self.MyPolicy) 263 264 def test_message_from_bytes(self): 265 with self.assertRaisesRegex(TestException, "^test$"): 266 email.message_from_bytes(b"Subject: test\n\n", 267 policy=self.MyPolicy) 268 269 def test_message_from_file(self): 270 f = io.StringIO('Subject: test\n\n') 271 with self.assertRaisesRegex(TestException, "^test$"): 272 email.message_from_file(f, policy=self.MyPolicy) 273 274 def test_message_from_binary_file(self): 275 f = io.BytesIO(b'Subject: test\n\n') 276 with self.assertRaisesRegex(TestException, "^test$"): 277 email.message_from_binary_file(f, policy=self.MyPolicy) 278 279 # These are redundant, but we need them for black-box completeness. 280 281 def test_parser(self): 282 p = email.parser.Parser(policy=self.MyPolicy) 283 with self.assertRaisesRegex(TestException, "^test$"): 284 p.parsestr('Subject: test\n\n') 285 286 def test_bytes_parser(self): 287 p = email.parser.BytesParser(policy=self.MyPolicy) 288 with self.assertRaisesRegex(TestException, "^test$"): 289 p.parsebytes(b'Subject: test\n\n') 290 291 # Now that we've established that all the parse methods get the 292 # policy in to feedparser, we can use message_from_string for 293 # the rest of the propagation tests. 294 295 def _make_msg(self, source='Subject: test\n\n', policy=None): 296 self.policy = email.policy.default.clone() if policy is None else policy 297 return email.message_from_string(source, policy=self.policy) 298 299 def test_parser_propagates_policy_to_message(self): 300 msg = self._make_msg() 301 self.assertIs(msg.policy, self.policy) 302 303 def test_parser_propagates_policy_to_sub_messages(self): 304 msg = self._make_msg(textwrap.dedent("""\ 305 Subject: mime test 306 MIME-Version: 1.0 307 Content-Type: multipart/mixed, boundary="XXX" 308 309 --XXX 310 Content-Type: text/plain 311 312 test 313 --XXX 314 Content-Type: text/plain 315 316 test2 317 --XXX-- 318 """)) 319 for part in msg.walk(): 320 self.assertIs(part.policy, self.policy) 321 322 def test_message_policy_propagates_to_generator(self): 323 msg = self._make_msg("Subject: test\nTo: foo\n\n", 324 policy=email.policy.default.clone(linesep='X')) 325 s = io.StringIO() 326 g = email.generator.Generator(s) 327 g.flatten(msg) 328 self.assertEqual(s.getvalue(), "Subject: testXTo: fooXX") 329 330 def test_message_policy_used_by_as_string(self): 331 msg = self._make_msg("Subject: test\nTo: foo\n\n", 332 policy=email.policy.default.clone(linesep='X')) 333 self.assertEqual(msg.as_string(), "Subject: testXTo: fooXX") 334 335 336class TestConcretePolicies(unittest.TestCase): 337 338 def test_header_store_parse_rejects_newlines(self): 339 instance = email.policy.EmailPolicy() 340 self.assertRaises(ValueError, 341 instance.header_store_parse, 342 'From', 'spam\negg@foo.py') 343 344 345if __name__ == '__main__': 346 unittest.main() 347