1import os 2import sys 3import time 4import stat 5import socket 6import email 7import email.message 8import re 9import shutil 10import StringIO 11import tempfile 12from test import test_support 13import unittest 14import mailbox 15import glob 16try: 17 import fcntl 18except ImportError: 19 pass 20 21# Silence Py3k warning 22rfc822 = test_support.import_module('rfc822', deprecated=True) 23 24class TestBase: 25 26 def _check_sample(self, msg): 27 # Inspect a mailbox.Message representation of the sample message 28 self.assertIsInstance(msg, email.message.Message) 29 self.assertIsInstance(msg, mailbox.Message) 30 for key, value in _sample_headers.iteritems(): 31 self.assertIn(value, msg.get_all(key)) 32 self.assertTrue(msg.is_multipart()) 33 self.assertEqual(len(msg.get_payload()), len(_sample_payloads)) 34 for i, payload in enumerate(_sample_payloads): 35 part = msg.get_payload(i) 36 self.assertIsInstance(part, email.message.Message) 37 self.assertNotIsInstance(part, mailbox.Message) 38 self.assertEqual(part.get_payload(), payload) 39 40 def _delete_recursively(self, target): 41 # Delete a file or delete a directory recursively 42 if os.path.isdir(target): 43 test_support.rmtree(target) 44 elif os.path.exists(target): 45 test_support.unlink(target) 46 47 48class TestMailbox(TestBase): 49 50 _factory = None # Overridden by subclasses to reuse tests 51 _template = 'From: foo\n\n%s\n' 52 53 def setUp(self): 54 self._path = test_support.TESTFN 55 self._delete_recursively(self._path) 56 self._box = self._factory(self._path) 57 58 def tearDown(self): 59 self._box.close() 60 self._delete_recursively(self._path) 61 62 def test_add(self): 63 # Add copies of a sample message 64 keys = [] 65 keys.append(self._box.add(self._template % 0)) 66 self.assertEqual(len(self._box), 1) 67 keys.append(self._box.add(mailbox.Message(_sample_message))) 68 self.assertEqual(len(self._box), 2) 69 keys.append(self._box.add(email.message_from_string(_sample_message))) 70 self.assertEqual(len(self._box), 3) 71 keys.append(self._box.add(StringIO.StringIO(_sample_message))) 72 self.assertEqual(len(self._box), 4) 73 keys.append(self._box.add(_sample_message)) 74 self.assertEqual(len(self._box), 5) 75 self.assertEqual(self._box.get_string(keys[0]), self._template % 0) 76 for i in (1, 2, 3, 4): 77 self._check_sample(self._box[keys[i]]) 78 79 def test_add_file(self): 80 with tempfile.TemporaryFile('w+') as f: 81 f.write(_sample_message) 82 f.seek(0) 83 key = self._box.add(f) 84 self.assertEqual(self._box.get_string(key).split('\n'), 85 _sample_message.split('\n')) 86 87 def test_add_StringIO(self): 88 key = self._box.add(StringIO.StringIO(self._template % "0")) 89 self.assertEqual(self._box.get_string(key), self._template % "0") 90 91 def test_remove(self): 92 # Remove messages using remove() 93 self._test_remove_or_delitem(self._box.remove) 94 95 def test_delitem(self): 96 # Remove messages using __delitem__() 97 self._test_remove_or_delitem(self._box.__delitem__) 98 99 def _test_remove_or_delitem(self, method): 100 # (Used by test_remove() and test_delitem().) 101 key0 = self._box.add(self._template % 0) 102 key1 = self._box.add(self._template % 1) 103 self.assertEqual(len(self._box), 2) 104 method(key0) 105 l = len(self._box) 106 self.assertEqual(l, 1) 107 self.assertRaises(KeyError, lambda: self._box[key0]) 108 self.assertRaises(KeyError, lambda: method(key0)) 109 self.assertEqual(self._box.get_string(key1), self._template % 1) 110 key2 = self._box.add(self._template % 2) 111 self.assertEqual(len(self._box), 2) 112 method(key2) 113 l = len(self._box) 114 self.assertEqual(l, 1) 115 self.assertRaises(KeyError, lambda: self._box[key2]) 116 self.assertRaises(KeyError, lambda: method(key2)) 117 self.assertEqual(self._box.get_string(key1), self._template % 1) 118 method(key1) 119 self.assertEqual(len(self._box), 0) 120 self.assertRaises(KeyError, lambda: self._box[key1]) 121 self.assertRaises(KeyError, lambda: method(key1)) 122 123 def test_discard(self, repetitions=10): 124 # Discard messages 125 key0 = self._box.add(self._template % 0) 126 key1 = self._box.add(self._template % 1) 127 self.assertEqual(len(self._box), 2) 128 self._box.discard(key0) 129 self.assertEqual(len(self._box), 1) 130 self.assertRaises(KeyError, lambda: self._box[key0]) 131 self._box.discard(key0) 132 self.assertEqual(len(self._box), 1) 133 self.assertRaises(KeyError, lambda: self._box[key0]) 134 135 def test_get(self): 136 # Retrieve messages using get() 137 key0 = self._box.add(self._template % 0) 138 msg = self._box.get(key0) 139 self.assertEqual(msg['from'], 'foo') 140 self.assertEqual(msg.get_payload(), '0\n') 141 self.assertIsNone(self._box.get('foo')) 142 self.assertFalse(self._box.get('foo', False)) 143 self._box.close() 144 self._box = self._factory(self._path, factory=rfc822.Message) 145 key1 = self._box.add(self._template % 1) 146 msg = self._box.get(key1) 147 self.assertEqual(msg['from'], 'foo') 148 self.assertEqual(msg.fp.read(), '1' + os.linesep) 149 msg.fp.close() 150 151 def test_getitem(self): 152 # Retrieve message using __getitem__() 153 key0 = self._box.add(self._template % 0) 154 msg = self._box[key0] 155 self.assertEqual(msg['from'], 'foo') 156 self.assertEqual(msg.get_payload(), '0\n') 157 self.assertRaises(KeyError, lambda: self._box['foo']) 158 self._box.discard(key0) 159 self.assertRaises(KeyError, lambda: self._box[key0]) 160 161 def test_get_message(self): 162 # Get Message representations of messages 163 key0 = self._box.add(self._template % 0) 164 key1 = self._box.add(_sample_message) 165 msg0 = self._box.get_message(key0) 166 self.assertIsInstance(msg0, mailbox.Message) 167 self.assertEqual(msg0['from'], 'foo') 168 self.assertEqual(msg0.get_payload(), '0\n') 169 self._check_sample(self._box.get_message(key1)) 170 171 def test_get_string(self): 172 # Get string representations of messages 173 key0 = self._box.add(self._template % 0) 174 key1 = self._box.add(_sample_message) 175 self.assertEqual(self._box.get_string(key0), self._template % 0) 176 self.assertEqual(self._box.get_string(key1), _sample_message) 177 178 def test_get_file(self): 179 # Get file representations of messages 180 key0 = self._box.add(self._template % 0) 181 key1 = self._box.add(_sample_message) 182 msg0 = self._box.get_file(key0) 183 self.assertEqual(msg0.read().replace(os.linesep, '\n'), 184 self._template % 0) 185 msg1 = self._box.get_file(key1) 186 self.assertEqual(msg1.read().replace(os.linesep, '\n'), 187 _sample_message) 188 msg0.close() 189 msg1.close() 190 191 def test_get_file_can_be_closed_twice(self): 192 # Issue 11700 193 key = self._box.add(_sample_message) 194 f = self._box.get_file(key) 195 f.close() 196 f.close() 197 198 def test_iterkeys(self): 199 # Get keys using iterkeys() 200 self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False) 201 202 def test_keys(self): 203 # Get keys using keys() 204 self._check_iteration(self._box.keys, do_keys=True, do_values=False) 205 206 def test_itervalues(self): 207 # Get values using itervalues() 208 self._check_iteration(self._box.itervalues, do_keys=False, 209 do_values=True) 210 211 def test_iter(self): 212 # Get values using __iter__() 213 self._check_iteration(self._box.__iter__, do_keys=False, 214 do_values=True) 215 216 def test_values(self): 217 # Get values using values() 218 self._check_iteration(self._box.values, do_keys=False, do_values=True) 219 220 def test_iteritems(self): 221 # Get keys and values using iteritems() 222 self._check_iteration(self._box.iteritems, do_keys=True, 223 do_values=True) 224 225 def test_items(self): 226 # Get keys and values using items() 227 self._check_iteration(self._box.items, do_keys=True, do_values=True) 228 229 def _check_iteration(self, method, do_keys, do_values, repetitions=10): 230 for value in method(): 231 self.fail("Not empty") 232 keys, values = [], [] 233 for i in xrange(repetitions): 234 keys.append(self._box.add(self._template % i)) 235 values.append(self._template % i) 236 if do_keys and not do_values: 237 returned_keys = list(method()) 238 elif do_values and not do_keys: 239 returned_values = list(method()) 240 else: 241 returned_keys, returned_values = [], [] 242 for key, value in method(): 243 returned_keys.append(key) 244 returned_values.append(value) 245 if do_keys: 246 self.assertEqual(len(keys), len(returned_keys)) 247 self.assertEqual(set(keys), set(returned_keys)) 248 if do_values: 249 count = 0 250 for value in returned_values: 251 self.assertEqual(value['from'], 'foo') 252 self.assertLess(int(value.get_payload()), repetitions) 253 count += 1 254 self.assertEqual(len(values), count) 255 256 def test_has_key(self): 257 # Check existence of keys using has_key() 258 self._test_has_key_or_contains(self._box.has_key) 259 260 def test_contains(self): 261 # Check existence of keys using __contains__() 262 self._test_has_key_or_contains(self._box.__contains__) 263 264 def _test_has_key_or_contains(self, method): 265 # (Used by test_has_key() and test_contains().) 266 self.assertFalse(method('foo')) 267 key0 = self._box.add(self._template % 0) 268 self.assertTrue(method(key0)) 269 self.assertFalse(method('foo')) 270 key1 = self._box.add(self._template % 1) 271 self.assertTrue(method(key1)) 272 self.assertTrue(method(key0)) 273 self.assertFalse(method('foo')) 274 self._box.remove(key0) 275 self.assertFalse(method(key0)) 276 self.assertTrue(method(key1)) 277 self.assertFalse(method('foo')) 278 self._box.remove(key1) 279 self.assertFalse(method(key1)) 280 self.assertFalse(method(key0)) 281 self.assertFalse(method('foo')) 282 283 def test_len(self, repetitions=10): 284 # Get message count 285 keys = [] 286 for i in xrange(repetitions): 287 self.assertEqual(len(self._box), i) 288 keys.append(self._box.add(self._template % i)) 289 self.assertEqual(len(self._box), i + 1) 290 for i in xrange(repetitions): 291 self.assertEqual(len(self._box), repetitions - i) 292 self._box.remove(keys[i]) 293 self.assertEqual(len(self._box), repetitions - i - 1) 294 295 def test_set_item(self): 296 # Modify messages using __setitem__() 297 key0 = self._box.add(self._template % 'original 0') 298 self.assertEqual(self._box.get_string(key0), 299 self._template % 'original 0') 300 key1 = self._box.add(self._template % 'original 1') 301 self.assertEqual(self._box.get_string(key1), 302 self._template % 'original 1') 303 self._box[key0] = self._template % 'changed 0' 304 self.assertEqual(self._box.get_string(key0), 305 self._template % 'changed 0') 306 self._box[key1] = self._template % 'changed 1' 307 self.assertEqual(self._box.get_string(key1), 308 self._template % 'changed 1') 309 self._box[key0] = _sample_message 310 self._check_sample(self._box[key0]) 311 self._box[key1] = self._box[key0] 312 self._check_sample(self._box[key1]) 313 self._box[key0] = self._template % 'original 0' 314 self.assertEqual(self._box.get_string(key0), 315 self._template % 'original 0') 316 self._check_sample(self._box[key1]) 317 self.assertRaises(KeyError, 318 lambda: self._box.__setitem__('foo', 'bar')) 319 self.assertRaises(KeyError, lambda: self._box['foo']) 320 self.assertEqual(len(self._box), 2) 321 322 def test_clear(self, iterations=10): 323 # Remove all messages using clear() 324 keys = [] 325 for i in xrange(iterations): 326 self._box.add(self._template % i) 327 for i, key in enumerate(keys): 328 self.assertEqual(self._box.get_string(key), self._template % i) 329 self._box.clear() 330 self.assertEqual(len(self._box), 0) 331 for i, key in enumerate(keys): 332 self.assertRaises(KeyError, lambda: self._box.get_string(key)) 333 334 def test_pop(self): 335 # Get and remove a message using pop() 336 key0 = self._box.add(self._template % 0) 337 self.assertIn(key0, self._box) 338 key1 = self._box.add(self._template % 1) 339 self.assertIn(key1, self._box) 340 self.assertEqual(self._box.pop(key0).get_payload(), '0\n') 341 self.assertNotIn(key0, self._box) 342 self.assertIn(key1, self._box) 343 key2 = self._box.add(self._template % 2) 344 self.assertIn(key2, self._box) 345 self.assertEqual(self._box.pop(key2).get_payload(), '2\n') 346 self.assertNotIn(key2, self._box) 347 self.assertIn(key1, self._box) 348 self.assertEqual(self._box.pop(key1).get_payload(), '1\n') 349 self.assertNotIn(key1, self._box) 350 self.assertEqual(len(self._box), 0) 351 352 def test_popitem(self, iterations=10): 353 # Get and remove an arbitrary (key, message) using popitem() 354 keys = [] 355 for i in xrange(10): 356 keys.append(self._box.add(self._template % i)) 357 seen = [] 358 for i in xrange(10): 359 key, msg = self._box.popitem() 360 self.assertIn(key, keys) 361 self.assertNotIn(key, seen) 362 seen.append(key) 363 self.assertEqual(int(msg.get_payload()), keys.index(key)) 364 self.assertEqual(len(self._box), 0) 365 for key in keys: 366 self.assertRaises(KeyError, lambda: self._box[key]) 367 368 def test_update(self): 369 # Modify multiple messages using update() 370 key0 = self._box.add(self._template % 'original 0') 371 key1 = self._box.add(self._template % 'original 1') 372 key2 = self._box.add(self._template % 'original 2') 373 self._box.update({key0: self._template % 'changed 0', 374 key2: _sample_message}) 375 self.assertEqual(len(self._box), 3) 376 self.assertEqual(self._box.get_string(key0), 377 self._template % 'changed 0') 378 self.assertEqual(self._box.get_string(key1), 379 self._template % 'original 1') 380 self._check_sample(self._box[key2]) 381 self._box.update([(key2, self._template % 'changed 2'), 382 (key1, self._template % 'changed 1'), 383 (key0, self._template % 'original 0')]) 384 self.assertEqual(len(self._box), 3) 385 self.assertEqual(self._box.get_string(key0), 386 self._template % 'original 0') 387 self.assertEqual(self._box.get_string(key1), 388 self._template % 'changed 1') 389 self.assertEqual(self._box.get_string(key2), 390 self._template % 'changed 2') 391 self.assertRaises(KeyError, 392 lambda: self._box.update({'foo': 'bar', 393 key0: self._template % "changed 0"})) 394 self.assertEqual(len(self._box), 3) 395 self.assertEqual(self._box.get_string(key0), 396 self._template % "changed 0") 397 self.assertEqual(self._box.get_string(key1), 398 self._template % "changed 1") 399 self.assertEqual(self._box.get_string(key2), 400 self._template % "changed 2") 401 402 def test_flush(self): 403 # Write changes to disk 404 self._test_flush_or_close(self._box.flush, True) 405 406 def test_popitem_and_flush_twice(self): 407 # See #15036. 408 self._box.add(self._template % 0) 409 self._box.add(self._template % 1) 410 self._box.flush() 411 412 self._box.popitem() 413 self._box.flush() 414 self._box.popitem() 415 self._box.flush() 416 417 def test_lock_unlock(self): 418 # Lock and unlock the mailbox 419 self.assertFalse(os.path.exists(self._get_lock_path())) 420 self._box.lock() 421 self.assertTrue(os.path.exists(self._get_lock_path())) 422 self._box.unlock() 423 self.assertFalse(os.path.exists(self._get_lock_path())) 424 425 def test_close(self): 426 # Close mailbox and flush changes to disk 427 self._test_flush_or_close(self._box.close, False) 428 429 def _test_flush_or_close(self, method, should_call_close): 430 contents = [self._template % i for i in xrange(3)] 431 self._box.add(contents[0]) 432 self._box.add(contents[1]) 433 self._box.add(contents[2]) 434 oldbox = self._box 435 method() 436 if should_call_close: 437 self._box.close() 438 self._box = self._factory(self._path) 439 keys = self._box.keys() 440 self.assertEqual(len(keys), 3) 441 for key in keys: 442 self.assertIn(self._box.get_string(key), contents) 443 oldbox.close() 444 445 def test_dump_message(self): 446 # Write message representations to disk 447 for input in (email.message_from_string(_sample_message), 448 _sample_message, StringIO.StringIO(_sample_message)): 449 output = StringIO.StringIO() 450 self._box._dump_message(input, output) 451 self.assertEqual(output.getvalue(), 452 _sample_message.replace('\n', os.linesep)) 453 output = StringIO.StringIO() 454 self.assertRaises(TypeError, 455 lambda: self._box._dump_message(None, output)) 456 457 def _get_lock_path(self): 458 # Return the path of the dot lock file. May be overridden. 459 return self._path + '.lock' 460 461 462class TestMailboxSuperclass(TestBase, unittest.TestCase): 463 464 def test_notimplemented(self): 465 # Test that all Mailbox methods raise NotImplementedException. 466 box = mailbox.Mailbox('path') 467 self.assertRaises(NotImplementedError, lambda: box.add('')) 468 self.assertRaises(NotImplementedError, lambda: box.remove('')) 469 self.assertRaises(NotImplementedError, lambda: box.__delitem__('')) 470 self.assertRaises(NotImplementedError, lambda: box.discard('')) 471 self.assertRaises(NotImplementedError, lambda: box.__setitem__('', '')) 472 self.assertRaises(NotImplementedError, lambda: box.iterkeys()) 473 self.assertRaises(NotImplementedError, lambda: box.keys()) 474 self.assertRaises(NotImplementedError, lambda: box.itervalues().next()) 475 self.assertRaises(NotImplementedError, lambda: box.__iter__().next()) 476 self.assertRaises(NotImplementedError, lambda: box.values()) 477 self.assertRaises(NotImplementedError, lambda: box.iteritems().next()) 478 self.assertRaises(NotImplementedError, lambda: box.items()) 479 self.assertRaises(NotImplementedError, lambda: box.get('')) 480 self.assertRaises(NotImplementedError, lambda: box.__getitem__('')) 481 self.assertRaises(NotImplementedError, lambda: box.get_message('')) 482 self.assertRaises(NotImplementedError, lambda: box.get_string('')) 483 self.assertRaises(NotImplementedError, lambda: box.get_file('')) 484 self.assertRaises(NotImplementedError, lambda: box.has_key('')) 485 self.assertRaises(NotImplementedError, lambda: box.__contains__('')) 486 self.assertRaises(NotImplementedError, lambda: box.__len__()) 487 self.assertRaises(NotImplementedError, lambda: box.clear()) 488 self.assertRaises(NotImplementedError, lambda: box.pop('')) 489 self.assertRaises(NotImplementedError, lambda: box.popitem()) 490 self.assertRaises(NotImplementedError, lambda: box.update((('', ''),))) 491 self.assertRaises(NotImplementedError, lambda: box.flush()) 492 self.assertRaises(NotImplementedError, lambda: box.lock()) 493 self.assertRaises(NotImplementedError, lambda: box.unlock()) 494 self.assertRaises(NotImplementedError, lambda: box.close()) 495 496 497class TestMaildir(TestMailbox, unittest.TestCase): 498 499 _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory) 500 501 def setUp(self): 502 TestMailbox.setUp(self) 503 if os.name in ('nt', 'os2') or sys.platform == 'cygwin': 504 self._box.colon = '!' 505 506 def test_add_MM(self): 507 # Add a MaildirMessage instance 508 msg = mailbox.MaildirMessage(self._template % 0) 509 msg.set_subdir('cur') 510 msg.set_info('foo') 511 key = self._box.add(msg) 512 self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' % 513 (key, self._box.colon)))) 514 515 def test_get_MM(self): 516 # Get a MaildirMessage instance 517 msg = mailbox.MaildirMessage(self._template % 0) 518 msg.set_subdir('cur') 519 msg.set_flags('RF') 520 key = self._box.add(msg) 521 msg_returned = self._box.get_message(key) 522 self.assertIsInstance(msg_returned, mailbox.MaildirMessage) 523 self.assertEqual(msg_returned.get_subdir(), 'cur') 524 self.assertEqual(msg_returned.get_flags(), 'FR') 525 526 def test_set_MM(self): 527 # Set with a MaildirMessage instance 528 msg0 = mailbox.MaildirMessage(self._template % 0) 529 msg0.set_flags('TP') 530 key = self._box.add(msg0) 531 msg_returned = self._box.get_message(key) 532 self.assertEqual(msg_returned.get_subdir(), 'new') 533 self.assertEqual(msg_returned.get_flags(), 'PT') 534 msg1 = mailbox.MaildirMessage(self._template % 1) 535 self._box[key] = msg1 536 msg_returned = self._box.get_message(key) 537 self.assertEqual(msg_returned.get_subdir(), 'new') 538 self.assertEqual(msg_returned.get_flags(), '') 539 self.assertEqual(msg_returned.get_payload(), '1\n') 540 msg2 = mailbox.MaildirMessage(self._template % 2) 541 msg2.set_info('2,S') 542 self._box[key] = msg2 543 self._box[key] = self._template % 3 544 msg_returned = self._box.get_message(key) 545 self.assertEqual(msg_returned.get_subdir(), 'new') 546 self.assertEqual(msg_returned.get_flags(), 'S') 547 self.assertEqual(msg_returned.get_payload(), '3\n') 548 549 def test_consistent_factory(self): 550 # Add a message. 551 msg = mailbox.MaildirMessage(self._template % 0) 552 msg.set_subdir('cur') 553 msg.set_flags('RF') 554 key = self._box.add(msg) 555 556 # Create new mailbox with 557 class FakeMessage(mailbox.MaildirMessage): 558 pass 559 box = mailbox.Maildir(self._path, factory=FakeMessage) 560 box.colon = self._box.colon 561 msg2 = box.get_message(key) 562 self.assertIsInstance(msg2, FakeMessage) 563 564 def test_initialize_new(self): 565 # Initialize a non-existent mailbox 566 self.tearDown() 567 self._box = mailbox.Maildir(self._path) 568 self._check_basics(factory=rfc822.Message) 569 self._delete_recursively(self._path) 570 self._box = self._factory(self._path, factory=None) 571 self._check_basics() 572 573 def test_initialize_existing(self): 574 # Initialize an existing mailbox 575 self.tearDown() 576 for subdir in '', 'tmp', 'new', 'cur': 577 os.mkdir(os.path.normpath(os.path.join(self._path, subdir))) 578 self._box = mailbox.Maildir(self._path) 579 self._check_basics(factory=rfc822.Message) 580 self._box = mailbox.Maildir(self._path, factory=None) 581 self._check_basics() 582 583 def _check_basics(self, factory=None): 584 # (Used by test_open_new() and test_open_existing().) 585 self.assertEqual(self._box._path, os.path.abspath(self._path)) 586 self.assertEqual(self._box._factory, factory) 587 for subdir in '', 'tmp', 'new', 'cur': 588 path = os.path.join(self._path, subdir) 589 mode = os.stat(path)[stat.ST_MODE] 590 self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path) 591 592 def test_list_folders(self): 593 # List folders 594 self._box.add_folder('one') 595 self._box.add_folder('two') 596 self._box.add_folder('three') 597 self.assertEqual(len(self._box.list_folders()), 3) 598 self.assertEqual(set(self._box.list_folders()), 599 set(('one', 'two', 'three'))) 600 601 def test_get_folder(self): 602 # Open folders 603 self._box.add_folder('foo.bar') 604 folder0 = self._box.get_folder('foo.bar') 605 folder0.add(self._template % 'bar') 606 self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar'))) 607 folder1 = self._box.get_folder('foo.bar') 608 self.assertEqual(folder1.get_string(folder1.keys()[0]), 609 self._template % 'bar') 610 611 def test_add_and_remove_folders(self): 612 # Delete folders 613 self._box.add_folder('one') 614 self._box.add_folder('two') 615 self.assertEqual(len(self._box.list_folders()), 2) 616 self.assertEqual(set(self._box.list_folders()), set(('one', 'two'))) 617 self._box.remove_folder('one') 618 self.assertEqual(len(self._box.list_folders()), 1) 619 self.assertEqual(set(self._box.list_folders()), set(('two',))) 620 self._box.add_folder('three') 621 self.assertEqual(len(self._box.list_folders()), 2) 622 self.assertEqual(set(self._box.list_folders()), set(('two', 'three'))) 623 self._box.remove_folder('three') 624 self.assertEqual(len(self._box.list_folders()), 1) 625 self.assertEqual(set(self._box.list_folders()), set(('two',))) 626 self._box.remove_folder('two') 627 self.assertEqual(len(self._box.list_folders()), 0) 628 self.assertEqual(self._box.list_folders(), []) 629 630 def test_clean(self): 631 # Remove old files from 'tmp' 632 foo_path = os.path.join(self._path, 'tmp', 'foo') 633 bar_path = os.path.join(self._path, 'tmp', 'bar') 634 with open(foo_path, 'w') as f: 635 f.write("@") 636 with open(bar_path, 'w') as f: 637 f.write("@") 638 self._box.clean() 639 self.assertTrue(os.path.exists(foo_path)) 640 self.assertTrue(os.path.exists(bar_path)) 641 foo_stat = os.stat(foo_path) 642 os.utime(foo_path, (time.time() - 129600 - 2, 643 foo_stat.st_mtime)) 644 self._box.clean() 645 self.assertFalse(os.path.exists(foo_path)) 646 self.assertTrue(os.path.exists(bar_path)) 647 648 def test_create_tmp(self, repetitions=10): 649 # Create files in tmp directory 650 hostname = socket.gethostname() 651 if '/' in hostname: 652 hostname = hostname.replace('/', r'\057') 653 if ':' in hostname: 654 hostname = hostname.replace(':', r'\072') 655 pid = os.getpid() 656 pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)" 657 r"Q(?P<Q>\d+)\.(?P<host>[^:/]+)") 658 previous_groups = None 659 for x in xrange(repetitions): 660 tmp_file = self._box._create_tmp() 661 head, tail = os.path.split(tmp_file.name) 662 self.assertEqual(head, os.path.abspath(os.path.join(self._path, 663 "tmp")), 664 "File in wrong location: '%s'" % head) 665 match = pattern.match(tail) 666 self.assertIsNotNone(match, "Invalid file name: '%s'" % tail) 667 groups = match.groups() 668 if previous_groups is not None: 669 self.assertGreaterEqual(int(groups[0]), int(previous_groups[0]), 670 "Non-monotonic seconds: '%s' before '%s'" % 671 (previous_groups[0], groups[0])) 672 if int(groups[0]) == int(previous_groups[0]): 673 self.assertGreaterEqual(int(groups[1]), int(previous_groups[1]), 674 "Non-monotonic milliseconds: '%s' before '%s'" % 675 (previous_groups[1], groups[1])) 676 self.assertEqual(int(groups[2]), pid, 677 "Process ID mismatch: '%s' should be '%s'" % 678 (groups[2], pid)) 679 self.assertEqual(int(groups[3]), int(previous_groups[3]) + 1, 680 "Non-sequential counter: '%s' before '%s'" % 681 (previous_groups[3], groups[3])) 682 self.assertEqual(groups[4], hostname, 683 "Host name mismatch: '%s' should be '%s'" % 684 (groups[4], hostname)) 685 previous_groups = groups 686 tmp_file.write(_sample_message) 687 tmp_file.seek(0) 688 self.assertEqual(tmp_file.read(), _sample_message) 689 tmp_file.close() 690 file_count = len(os.listdir(os.path.join(self._path, "tmp"))) 691 self.assertEqual(file_count, repetitions, 692 "Wrong file count: '%s' should be '%s'" % 693 (file_count, repetitions)) 694 695 def test_refresh(self): 696 # Update the table of contents 697 self.assertEqual(self._box._toc, {}) 698 key0 = self._box.add(self._template % 0) 699 key1 = self._box.add(self._template % 1) 700 self.assertEqual(self._box._toc, {}) 701 self._box._refresh() 702 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 703 key1: os.path.join('new', key1)}) 704 key2 = self._box.add(self._template % 2) 705 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 706 key1: os.path.join('new', key1)}) 707 self._box._refresh() 708 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 709 key1: os.path.join('new', key1), 710 key2: os.path.join('new', key2)}) 711 712 def test_refresh_after_safety_period(self): 713 # Issue #13254: Call _refresh after the "file system safety 714 # period" of 2 seconds has passed; _toc should still be 715 # updated because this is the first call to _refresh. 716 key0 = self._box.add(self._template % 0) 717 key1 = self._box.add(self._template % 1) 718 719 self._box = self._factory(self._path) 720 self.assertEqual(self._box._toc, {}) 721 722 # Emulate sleeping. Instead of sleeping for 2 seconds, use the 723 # skew factor to make _refresh think that the filesystem 724 # safety period has passed and re-reading the _toc is only 725 # required if mtimes differ. 726 self._box._skewfactor = -3 727 728 self._box._refresh() 729 self.assertEqual(sorted(self._box._toc.keys()), sorted([key0, key1])) 730 731 def test_lookup(self): 732 # Look up message subpaths in the TOC 733 self.assertRaises(KeyError, lambda: self._box._lookup('foo')) 734 key0 = self._box.add(self._template % 0) 735 self.assertEqual(self._box._lookup(key0), os.path.join('new', key0)) 736 os.remove(os.path.join(self._path, 'new', key0)) 737 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0)}) 738 # Be sure that the TOC is read back from disk (see issue #6896 739 # about bad mtime behaviour on some systems). 740 self._box.flush() 741 self.assertRaises(KeyError, lambda: self._box._lookup(key0)) 742 self.assertEqual(self._box._toc, {}) 743 744 def test_lock_unlock(self): 745 # Lock and unlock the mailbox. For Maildir, this does nothing. 746 self._box.lock() 747 self._box.unlock() 748 749 def test_folder (self): 750 # Test for bug #1569790: verify that folders returned by .get_folder() 751 # use the same factory function. 752 def dummy_factory (s): 753 return None 754 box = self._factory(self._path, factory=dummy_factory) 755 folder = box.add_folder('folder1') 756 self.assertIs(folder._factory, dummy_factory) 757 758 folder1_alias = box.get_folder('folder1') 759 self.assertIs(folder1_alias._factory, dummy_factory) 760 761 def test_directory_in_folder (self): 762 # Test that mailboxes still work if there's a stray extra directory 763 # in a folder. 764 for i in range(10): 765 self._box.add(mailbox.Message(_sample_message)) 766 767 # Create a stray directory 768 os.mkdir(os.path.join(self._path, 'cur', 'stray-dir')) 769 770 # Check that looping still works with the directory present. 771 for msg in self._box: 772 pass 773 774 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 775 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') 776 def test_file_permissions(self): 777 # Verify that message files are created without execute permissions 778 msg = mailbox.MaildirMessage(self._template % 0) 779 orig_umask = os.umask(0) 780 try: 781 key = self._box.add(msg) 782 finally: 783 os.umask(orig_umask) 784 path = os.path.join(self._path, self._box._lookup(key)) 785 mode = os.stat(path).st_mode 786 self.assertEqual(mode & 0111, 0) 787 788 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 789 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') 790 def test_folder_file_perms(self): 791 # From bug #3228, we want to verify that the file created inside a Maildir 792 # subfolder isn't marked as executable. 793 orig_umask = os.umask(0) 794 try: 795 subfolder = self._box.add_folder('subfolder') 796 finally: 797 os.umask(orig_umask) 798 799 path = os.path.join(subfolder._path, 'maildirfolder') 800 st = os.stat(path) 801 perms = st.st_mode 802 self.assertFalse((perms & 0111)) # Execute bits should all be off. 803 804 def test_reread(self): 805 # Do an initial unconditional refresh 806 self._box._refresh() 807 808 # Put the last modified times more than two seconds into the past 809 # (because mtime may have only a two second granularity). 810 for subdir in ('cur', 'new'): 811 os.utime(os.path.join(self._box._path, subdir), 812 (time.time()-5,)*2) 813 814 # Because mtime has a two second granularity in worst case (FAT), a 815 # refresh is done unconditionally if called for within 816 # two-second-plus-a-bit of the last one, just in case the mbox has 817 # changed; so now we have to wait for that interval to expire. 818 # 819 # Because this is a test, emulate sleeping. Instead of 820 # sleeping for 2 seconds, use the skew factor to make _refresh 821 # think that 2 seconds have passed and re-reading the _toc is 822 # only required if mtimes differ. 823 self._box._skewfactor = -3 824 825 # Re-reading causes the ._toc attribute to be assigned a new dictionary 826 # object, so we'll check that the ._toc attribute isn't a different 827 # object. 828 orig_toc = self._box._toc 829 def refreshed(): 830 return self._box._toc is not orig_toc 831 832 self._box._refresh() 833 self.assertFalse(refreshed()) 834 835 # Now, write something into cur and remove it. This changes 836 # the mtime and should cause a re-read. Note that "sleep 837 # emulation" is still in effect, as skewfactor is -3. 838 filename = os.path.join(self._path, 'cur', 'stray-file') 839 f = open(filename, 'w') 840 f.close() 841 os.unlink(filename) 842 self._box._refresh() 843 self.assertTrue(refreshed()) 844 845 846class _TestSingleFile(TestMailbox): 847 '''Common tests for single-file mailboxes''' 848 849 def test_add_doesnt_rewrite(self): 850 # When only adding messages, flush() should not rewrite the 851 # mailbox file. See issue #9559. 852 853 # Inode number changes if the contents are written to another 854 # file which is then renamed over the original file. So we 855 # must check that the inode number doesn't change. 856 inode_before = os.stat(self._path).st_ino 857 858 self._box.add(self._template % 0) 859 self._box.flush() 860 861 inode_after = os.stat(self._path).st_ino 862 self.assertEqual(inode_before, inode_after) 863 864 # Make sure the message was really added 865 self._box.close() 866 self._box = self._factory(self._path) 867 self.assertEqual(len(self._box), 1) 868 869 def test_permissions_after_flush(self): 870 # See issue #5346 871 872 # Make the mailbox world writable. It's unlikely that the new 873 # mailbox file would have these permissions after flush(), 874 # because umask usually prevents it. 875 mode = os.stat(self._path).st_mode | 0o666 876 os.chmod(self._path, mode) 877 878 self._box.add(self._template % 0) 879 i = self._box.add(self._template % 1) 880 # Need to remove one message to make flush() create a new file 881 self._box.remove(i) 882 self._box.flush() 883 884 self.assertEqual(os.stat(self._path).st_mode, mode) 885 886 887class _TestMboxMMDF(_TestSingleFile): 888 889 def tearDown(self): 890 self._box.close() 891 self._delete_recursively(self._path) 892 for lock_remnant in glob.glob(self._path + '.*'): 893 test_support.unlink(lock_remnant) 894 895 def test_add_from_string(self): 896 # Add a string starting with 'From ' to the mailbox 897 key = self._box.add('From foo@bar blah\nFrom: foo\n\n0\n') 898 self.assertEqual(self._box[key].get_from(), 'foo@bar blah') 899 self.assertEqual(self._box[key].get_payload(), '0\n') 900 901 def test_add_mbox_or_mmdf_message(self): 902 # Add an mboxMessage or MMDFMessage 903 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 904 msg = class_('From foo@bar blah\nFrom: foo\n\n0\n') 905 key = self._box.add(msg) 906 907 def test_open_close_open(self): 908 # Open and inspect previously-created mailbox 909 values = [self._template % i for i in xrange(3)] 910 for value in values: 911 self._box.add(value) 912 self._box.close() 913 mtime = os.path.getmtime(self._path) 914 self._box = self._factory(self._path) 915 self.assertEqual(len(self._box), 3) 916 for key in self._box.iterkeys(): 917 self.assertIn(self._box.get_string(key), values) 918 self._box.close() 919 self.assertEqual(mtime, os.path.getmtime(self._path)) 920 921 def test_add_and_close(self): 922 # Verifying that closing a mailbox doesn't change added items 923 self._box.add(_sample_message) 924 for i in xrange(3): 925 self._box.add(self._template % i) 926 self._box.add(_sample_message) 927 self._box._file.flush() 928 self._box._file.seek(0) 929 contents = self._box._file.read() 930 self._box.close() 931 with open(self._path, 'rb') as f: 932 self.assertEqual(contents, f.read()) 933 self._box = self._factory(self._path) 934 935 @unittest.skipUnless(hasattr(os, 'fork'), "Test needs fork().") 936 @unittest.skipUnless(hasattr(socket, 'socketpair'), "Test needs socketpair().") 937 def test_lock_conflict(self): 938 # Fork off a child process that will lock the mailbox temporarily, 939 # unlock it and exit. 940 c, p = socket.socketpair() 941 self.addCleanup(c.close) 942 self.addCleanup(p.close) 943 944 pid = os.fork() 945 if pid == 0: 946 # child 947 try: 948 # lock the mailbox, and signal the parent it can proceed 949 self._box.lock() 950 c.send(b'c') 951 952 # wait until the parent is done, and unlock the mailbox 953 c.recv(1) 954 self._box.unlock() 955 finally: 956 os._exit(0) 957 958 # In the parent, wait until the child signals it locked the mailbox. 959 p.recv(1) 960 try: 961 self.assertRaises(mailbox.ExternalClashError, 962 self._box.lock) 963 finally: 964 # Signal the child it can now release the lock and exit. 965 p.send(b'p') 966 # Wait for child to exit. Locking should now succeed. 967 exited_pid, status = os.waitpid(pid, 0) 968 969 self._box.lock() 970 self._box.unlock() 971 972 def test_relock(self): 973 # Test case for bug #1575506: the mailbox class was locking the 974 # wrong file object in its flush() method. 975 msg = "Subject: sub\n\nbody\n" 976 key1 = self._box.add(msg) 977 self._box.flush() 978 self._box.close() 979 980 self._box = self._factory(self._path) 981 self._box.lock() 982 key2 = self._box.add(msg) 983 self._box.flush() 984 self.assertTrue(self._box._locked) 985 self._box.close() 986 987 988class TestMbox(_TestMboxMMDF, unittest.TestCase): 989 990 _factory = lambda self, path, factory=None: mailbox.mbox(path, factory) 991 992 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 993 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') 994 def test_file_perms(self): 995 # From bug #3228, we want to verify that the mailbox file isn't executable, 996 # even if the umask is set to something that would leave executable bits set. 997 # We only run this test on platforms that support umask. 998 try: 999 old_umask = os.umask(0077) 1000 self._box.close() 1001 os.unlink(self._path) 1002 self._box = mailbox.mbox(self._path, create=True) 1003 self._box.add('') 1004 self._box.close() 1005 finally: 1006 os.umask(old_umask) 1007 1008 st = os.stat(self._path) 1009 perms = st.st_mode 1010 self.assertFalse((perms & 0111)) # Execute bits should all be off. 1011 1012 def test_terminating_newline(self): 1013 message = email.message.Message() 1014 message['From'] = 'john@example.com' 1015 message.set_payload('No newline at the end') 1016 i = self._box.add(message) 1017 1018 # A newline should have been appended to the payload 1019 message = self._box.get(i) 1020 self.assertEqual(message.get_payload(), 'No newline at the end\n') 1021 1022 def test_message_separator(self): 1023 # Check there's always a single blank line after each message 1024 self._box.add('From: foo\n\n0') # No newline at the end 1025 with open(self._path) as f: 1026 data = f.read() 1027 self.assertEqual(data[-3:], '0\n\n') 1028 1029 self._box.add('From: foo\n\n0\n') # Newline at the end 1030 with open(self._path) as f: 1031 data = f.read() 1032 self.assertEqual(data[-3:], '0\n\n') 1033 1034 1035class TestMMDF(_TestMboxMMDF, unittest.TestCase): 1036 1037 _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory) 1038 1039 1040class TestMH(TestMailbox, unittest.TestCase): 1041 1042 _factory = lambda self, path, factory=None: mailbox.MH(path, factory) 1043 1044 def test_list_folders(self): 1045 # List folders 1046 self._box.add_folder('one') 1047 self._box.add_folder('two') 1048 self._box.add_folder('three') 1049 self.assertEqual(len(self._box.list_folders()), 3) 1050 self.assertEqual(set(self._box.list_folders()), 1051 set(('one', 'two', 'three'))) 1052 1053 def test_get_folder(self): 1054 # Open folders 1055 def dummy_factory (s): 1056 return None 1057 self._box = self._factory(self._path, dummy_factory) 1058 1059 new_folder = self._box.add_folder('foo.bar') 1060 folder0 = self._box.get_folder('foo.bar') 1061 folder0.add(self._template % 'bar') 1062 self.assertTrue(os.path.isdir(os.path.join(self._path, 'foo.bar'))) 1063 folder1 = self._box.get_folder('foo.bar') 1064 self.assertEqual(folder1.get_string(folder1.keys()[0]), 1065 self._template % 'bar') 1066 1067 # Test for bug #1569790: verify that folders returned by .get_folder() 1068 # use the same factory function. 1069 self.assertIs(new_folder._factory, self._box._factory) 1070 self.assertIs(folder0._factory, self._box._factory) 1071 1072 def test_add_and_remove_folders(self): 1073 # Delete folders 1074 self._box.add_folder('one') 1075 self._box.add_folder('two') 1076 self.assertEqual(len(self._box.list_folders()), 2) 1077 self.assertEqual(set(self._box.list_folders()), set(('one', 'two'))) 1078 self._box.remove_folder('one') 1079 self.assertEqual(len(self._box.list_folders()), 1) 1080 self.assertEqual(set(self._box.list_folders()), set(('two', ))) 1081 self._box.add_folder('three') 1082 self.assertEqual(len(self._box.list_folders()), 2) 1083 self.assertEqual(set(self._box.list_folders()), set(('two', 'three'))) 1084 self._box.remove_folder('three') 1085 self.assertEqual(len(self._box.list_folders()), 1) 1086 self.assertEqual(set(self._box.list_folders()), set(('two', ))) 1087 self._box.remove_folder('two') 1088 self.assertEqual(len(self._box.list_folders()), 0) 1089 self.assertEqual(self._box.list_folders(), []) 1090 1091 def test_sequences(self): 1092 # Get and set sequences 1093 self.assertEqual(self._box.get_sequences(), {}) 1094 msg0 = mailbox.MHMessage(self._template % 0) 1095 msg0.add_sequence('foo') 1096 key0 = self._box.add(msg0) 1097 self.assertEqual(self._box.get_sequences(), {'foo':[key0]}) 1098 msg1 = mailbox.MHMessage(self._template % 1) 1099 msg1.set_sequences(['bar', 'replied', 'foo']) 1100 key1 = self._box.add(msg1) 1101 self.assertEqual(self._box.get_sequences(), 1102 {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]}) 1103 msg0.set_sequences(['flagged']) 1104 self._box[key0] = msg0 1105 self.assertEqual(self._box.get_sequences(), 1106 {'foo':[key1], 'bar':[key1], 'replied':[key1], 1107 'flagged':[key0]}) 1108 self._box.remove(key1) 1109 self.assertEqual(self._box.get_sequences(), {'flagged':[key0]}) 1110 1111 def test_issue2625(self): 1112 msg0 = mailbox.MHMessage(self._template % 0) 1113 msg0.add_sequence('foo') 1114 key0 = self._box.add(msg0) 1115 refmsg0 = self._box.get_message(key0) 1116 1117 def test_issue7627(self): 1118 msg0 = mailbox.MHMessage(self._template % 0) 1119 key0 = self._box.add(msg0) 1120 self._box.lock() 1121 self._box.remove(key0) 1122 self._box.unlock() 1123 1124 def test_pack(self): 1125 # Pack the contents of the mailbox 1126 msg0 = mailbox.MHMessage(self._template % 0) 1127 msg1 = mailbox.MHMessage(self._template % 1) 1128 msg2 = mailbox.MHMessage(self._template % 2) 1129 msg3 = mailbox.MHMessage(self._template % 3) 1130 msg0.set_sequences(['foo', 'unseen']) 1131 msg1.set_sequences(['foo']) 1132 msg2.set_sequences(['foo', 'flagged']) 1133 msg3.set_sequences(['foo', 'bar', 'replied']) 1134 key0 = self._box.add(msg0) 1135 key1 = self._box.add(msg1) 1136 key2 = self._box.add(msg2) 1137 key3 = self._box.add(msg3) 1138 self.assertEqual(self._box.get_sequences(), 1139 {'foo':[key0,key1,key2,key3], 'unseen':[key0], 1140 'flagged':[key2], 'bar':[key3], 'replied':[key3]}) 1141 self._box.remove(key2) 1142 self.assertEqual(self._box.get_sequences(), 1143 {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3], 1144 'replied':[key3]}) 1145 self._box.pack() 1146 self.assertEqual(self._box.keys(), [1, 2, 3]) 1147 key0 = key0 1148 key1 = key0 + 1 1149 key2 = key1 + 1 1150 self.assertEqual(self._box.get_sequences(), 1151 {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]}) 1152 1153 # Test case for packing while holding the mailbox locked. 1154 key0 = self._box.add(msg1) 1155 key1 = self._box.add(msg1) 1156 key2 = self._box.add(msg1) 1157 key3 = self._box.add(msg1) 1158 1159 self._box.remove(key0) 1160 self._box.remove(key2) 1161 self._box.lock() 1162 self._box.pack() 1163 self._box.unlock() 1164 self.assertEqual(self._box.get_sequences(), 1165 {'foo':[1, 2, 3, 4, 5], 1166 'unseen':[1], 'bar':[3], 'replied':[3]}) 1167 1168 def _get_lock_path(self): 1169 return os.path.join(self._path, '.mh_sequences.lock') 1170 1171 1172class TestBabyl(_TestSingleFile, unittest.TestCase): 1173 1174 _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory) 1175 1176 def tearDown(self): 1177 self._box.close() 1178 self._delete_recursively(self._path) 1179 for lock_remnant in glob.glob(self._path + '.*'): 1180 test_support.unlink(lock_remnant) 1181 1182 def test_labels(self): 1183 # Get labels from the mailbox 1184 self.assertEqual(self._box.get_labels(), []) 1185 msg0 = mailbox.BabylMessage(self._template % 0) 1186 msg0.add_label('foo') 1187 key0 = self._box.add(msg0) 1188 self.assertEqual(self._box.get_labels(), ['foo']) 1189 msg1 = mailbox.BabylMessage(self._template % 1) 1190 msg1.set_labels(['bar', 'answered', 'foo']) 1191 key1 = self._box.add(msg1) 1192 self.assertEqual(set(self._box.get_labels()), set(['foo', 'bar'])) 1193 msg0.set_labels(['blah', 'filed']) 1194 self._box[key0] = msg0 1195 self.assertEqual(set(self._box.get_labels()), 1196 set(['foo', 'bar', 'blah'])) 1197 self._box.remove(key1) 1198 self.assertEqual(set(self._box.get_labels()), set(['blah'])) 1199 1200 1201class TestMessage(TestBase, unittest.TestCase): 1202 1203 _factory = mailbox.Message # Overridden by subclasses to reuse tests 1204 1205 def setUp(self): 1206 self._path = test_support.TESTFN 1207 1208 def tearDown(self): 1209 self._delete_recursively(self._path) 1210 1211 def test_initialize_with_eMM(self): 1212 # Initialize based on email.message.Message instance 1213 eMM = email.message_from_string(_sample_message) 1214 msg = self._factory(eMM) 1215 self._post_initialize_hook(msg) 1216 self._check_sample(msg) 1217 1218 def test_initialize_with_string(self): 1219 # Initialize based on string 1220 msg = self._factory(_sample_message) 1221 self._post_initialize_hook(msg) 1222 self._check_sample(msg) 1223 1224 def test_initialize_with_file(self): 1225 # Initialize based on contents of file 1226 with open(self._path, 'w+') as f: 1227 f.write(_sample_message) 1228 f.seek(0) 1229 msg = self._factory(f) 1230 self._post_initialize_hook(msg) 1231 self._check_sample(msg) 1232 1233 def test_initialize_with_nothing(self): 1234 # Initialize without arguments 1235 msg = self._factory() 1236 self._post_initialize_hook(msg) 1237 self.assertIsInstance(msg, email.message.Message) 1238 self.assertIsInstance(msg, mailbox.Message) 1239 self.assertIsInstance(msg, self._factory) 1240 self.assertEqual(msg.keys(), []) 1241 self.assertFalse(msg.is_multipart()) 1242 self.assertIsNone(msg.get_payload()) 1243 1244 def test_initialize_incorrectly(self): 1245 # Initialize with invalid argument 1246 self.assertRaises(TypeError, lambda: self._factory(object())) 1247 1248 def test_become_message(self): 1249 # Take on the state of another message 1250 eMM = email.message_from_string(_sample_message) 1251 msg = self._factory() 1252 msg._become_message(eMM) 1253 self._check_sample(msg) 1254 1255 def test_explain_to(self): 1256 # Copy self's format-specific data to other message formats. 1257 # This test is superficial; better ones are in TestMessageConversion. 1258 msg = self._factory() 1259 for class_ in (mailbox.Message, mailbox.MaildirMessage, 1260 mailbox.mboxMessage, mailbox.MHMessage, 1261 mailbox.BabylMessage, mailbox.MMDFMessage): 1262 other_msg = class_() 1263 msg._explain_to(other_msg) 1264 other_msg = email.message.Message() 1265 self.assertRaises(TypeError, lambda: msg._explain_to(other_msg)) 1266 1267 def _post_initialize_hook(self, msg): 1268 # Overridden by subclasses to check extra things after initialization 1269 pass 1270 1271 1272class TestMaildirMessage(TestMessage, unittest.TestCase): 1273 1274 _factory = mailbox.MaildirMessage 1275 1276 def _post_initialize_hook(self, msg): 1277 self.assertEqual(msg._subdir, 'new') 1278 self.assertEqual(msg._info,'') 1279 1280 def test_subdir(self): 1281 # Use get_subdir() and set_subdir() 1282 msg = mailbox.MaildirMessage(_sample_message) 1283 self.assertEqual(msg.get_subdir(), 'new') 1284 msg.set_subdir('cur') 1285 self.assertEqual(msg.get_subdir(), 'cur') 1286 msg.set_subdir('new') 1287 self.assertEqual(msg.get_subdir(), 'new') 1288 self.assertRaises(ValueError, lambda: msg.set_subdir('tmp')) 1289 self.assertEqual(msg.get_subdir(), 'new') 1290 msg.set_subdir('new') 1291 self.assertEqual(msg.get_subdir(), 'new') 1292 self._check_sample(msg) 1293 1294 def test_flags(self): 1295 # Use get_flags(), set_flags(), add_flag(), remove_flag() 1296 msg = mailbox.MaildirMessage(_sample_message) 1297 self.assertEqual(msg.get_flags(), '') 1298 self.assertEqual(msg.get_subdir(), 'new') 1299 msg.set_flags('F') 1300 self.assertEqual(msg.get_subdir(), 'new') 1301 self.assertEqual(msg.get_flags(), 'F') 1302 msg.set_flags('SDTP') 1303 self.assertEqual(msg.get_flags(), 'DPST') 1304 msg.add_flag('FT') 1305 self.assertEqual(msg.get_flags(), 'DFPST') 1306 msg.remove_flag('TDRP') 1307 self.assertEqual(msg.get_flags(), 'FS') 1308 self.assertEqual(msg.get_subdir(), 'new') 1309 self._check_sample(msg) 1310 1311 def test_date(self): 1312 # Use get_date() and set_date() 1313 msg = mailbox.MaildirMessage(_sample_message) 1314 diff = msg.get_date() - time.time() 1315 self.assertLess(abs(diff), 60, diff) 1316 msg.set_date(0.0) 1317 self.assertEqual(msg.get_date(), 0.0) 1318 1319 def test_info(self): 1320 # Use get_info() and set_info() 1321 msg = mailbox.MaildirMessage(_sample_message) 1322 self.assertEqual(msg.get_info(), '') 1323 msg.set_info('1,foo=bar') 1324 self.assertEqual(msg.get_info(), '1,foo=bar') 1325 self.assertRaises(TypeError, lambda: msg.set_info(None)) 1326 self._check_sample(msg) 1327 1328 def test_info_and_flags(self): 1329 # Test interaction of info and flag methods 1330 msg = mailbox.MaildirMessage(_sample_message) 1331 self.assertEqual(msg.get_info(), '') 1332 msg.set_flags('SF') 1333 self.assertEqual(msg.get_flags(), 'FS') 1334 self.assertEqual(msg.get_info(), '2,FS') 1335 msg.set_info('1,') 1336 self.assertEqual(msg.get_flags(), '') 1337 self.assertEqual(msg.get_info(), '1,') 1338 msg.remove_flag('RPT') 1339 self.assertEqual(msg.get_flags(), '') 1340 self.assertEqual(msg.get_info(), '1,') 1341 msg.add_flag('D') 1342 self.assertEqual(msg.get_flags(), 'D') 1343 self.assertEqual(msg.get_info(), '2,D') 1344 self._check_sample(msg) 1345 1346 1347class _TestMboxMMDFMessage: 1348 1349 _factory = mailbox._mboxMMDFMessage 1350 1351 def _post_initialize_hook(self, msg): 1352 self._check_from(msg) 1353 1354 def test_initialize_with_unixfrom(self): 1355 # Initialize with a message that already has a _unixfrom attribute 1356 msg = mailbox.Message(_sample_message) 1357 msg.set_unixfrom('From foo@bar blah') 1358 msg = mailbox.mboxMessage(msg) 1359 self.assertEqual(msg.get_from(), 'foo@bar blah') 1360 1361 def test_from(self): 1362 # Get and set "From " line 1363 msg = mailbox.mboxMessage(_sample_message) 1364 self._check_from(msg) 1365 msg.set_from('foo bar') 1366 self.assertEqual(msg.get_from(), 'foo bar') 1367 msg.set_from('foo@bar', True) 1368 self._check_from(msg, 'foo@bar') 1369 msg.set_from('blah@temp', time.localtime()) 1370 self._check_from(msg, 'blah@temp') 1371 1372 def test_flags(self): 1373 # Use get_flags(), set_flags(), add_flag(), remove_flag() 1374 msg = mailbox.mboxMessage(_sample_message) 1375 self.assertEqual(msg.get_flags(), '') 1376 msg.set_flags('F') 1377 self.assertEqual(msg.get_flags(), 'F') 1378 msg.set_flags('XODR') 1379 self.assertEqual(msg.get_flags(), 'RODX') 1380 msg.add_flag('FA') 1381 self.assertEqual(msg.get_flags(), 'RODFAX') 1382 msg.remove_flag('FDXA') 1383 self.assertEqual(msg.get_flags(), 'RO') 1384 self._check_sample(msg) 1385 1386 def _check_from(self, msg, sender=None): 1387 # Check contents of "From " line 1388 if sender is None: 1389 sender = "MAILER-DAEMON" 1390 self.assertIsNotNone(re.match( 1391 sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:\d{2} \d{4}", 1392 msg.get_from())) 1393 1394 1395class TestMboxMessage(_TestMboxMMDFMessage, TestMessage): 1396 1397 _factory = mailbox.mboxMessage 1398 1399 1400class TestMHMessage(TestMessage, unittest.TestCase): 1401 1402 _factory = mailbox.MHMessage 1403 1404 def _post_initialize_hook(self, msg): 1405 self.assertEqual(msg._sequences, []) 1406 1407 def test_sequences(self): 1408 # Get, set, join, and leave sequences 1409 msg = mailbox.MHMessage(_sample_message) 1410 self.assertEqual(msg.get_sequences(), []) 1411 msg.set_sequences(['foobar']) 1412 self.assertEqual(msg.get_sequences(), ['foobar']) 1413 msg.set_sequences([]) 1414 self.assertEqual(msg.get_sequences(), []) 1415 msg.add_sequence('unseen') 1416 self.assertEqual(msg.get_sequences(), ['unseen']) 1417 msg.add_sequence('flagged') 1418 self.assertEqual(msg.get_sequences(), ['unseen', 'flagged']) 1419 msg.add_sequence('flagged') 1420 self.assertEqual(msg.get_sequences(), ['unseen', 'flagged']) 1421 msg.remove_sequence('unseen') 1422 self.assertEqual(msg.get_sequences(), ['flagged']) 1423 msg.add_sequence('foobar') 1424 self.assertEqual(msg.get_sequences(), ['flagged', 'foobar']) 1425 msg.remove_sequence('replied') 1426 self.assertEqual(msg.get_sequences(), ['flagged', 'foobar']) 1427 msg.set_sequences(['foobar', 'replied']) 1428 self.assertEqual(msg.get_sequences(), ['foobar', 'replied']) 1429 1430 1431class TestBabylMessage(TestMessage, unittest.TestCase): 1432 1433 _factory = mailbox.BabylMessage 1434 1435 def _post_initialize_hook(self, msg): 1436 self.assertEqual(msg._labels, []) 1437 1438 def test_labels(self): 1439 # Get, set, join, and leave labels 1440 msg = mailbox.BabylMessage(_sample_message) 1441 self.assertEqual(msg.get_labels(), []) 1442 msg.set_labels(['foobar']) 1443 self.assertEqual(msg.get_labels(), ['foobar']) 1444 msg.set_labels([]) 1445 self.assertEqual(msg.get_labels(), []) 1446 msg.add_label('filed') 1447 self.assertEqual(msg.get_labels(), ['filed']) 1448 msg.add_label('resent') 1449 self.assertEqual(msg.get_labels(), ['filed', 'resent']) 1450 msg.add_label('resent') 1451 self.assertEqual(msg.get_labels(), ['filed', 'resent']) 1452 msg.remove_label('filed') 1453 self.assertEqual(msg.get_labels(), ['resent']) 1454 msg.add_label('foobar') 1455 self.assertEqual(msg.get_labels(), ['resent', 'foobar']) 1456 msg.remove_label('unseen') 1457 self.assertEqual(msg.get_labels(), ['resent', 'foobar']) 1458 msg.set_labels(['foobar', 'answered']) 1459 self.assertEqual(msg.get_labels(), ['foobar', 'answered']) 1460 1461 def test_visible(self): 1462 # Get, set, and update visible headers 1463 msg = mailbox.BabylMessage(_sample_message) 1464 visible = msg.get_visible() 1465 self.assertEqual(visible.keys(), []) 1466 self.assertIsNone(visible.get_payload()) 1467 visible['User-Agent'] = 'FooBar 1.0' 1468 visible['X-Whatever'] = 'Blah' 1469 self.assertEqual(msg.get_visible().keys(), []) 1470 msg.set_visible(visible) 1471 visible = msg.get_visible() 1472 self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) 1473 self.assertEqual(visible['User-Agent'], 'FooBar 1.0') 1474 self.assertEqual(visible['X-Whatever'], 'Blah') 1475 self.assertIsNone(visible.get_payload()) 1476 msg.update_visible() 1477 self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) 1478 self.assertIsNone(visible.get_payload()) 1479 visible = msg.get_visible() 1480 self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To', 1481 'Subject']) 1482 for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'): 1483 self.assertEqual(visible[header], msg[header]) 1484 1485 1486class TestMMDFMessage(_TestMboxMMDFMessage, TestMessage): 1487 1488 _factory = mailbox.MMDFMessage 1489 1490 1491class TestMessageConversion(TestBase, unittest.TestCase): 1492 1493 def test_plain_to_x(self): 1494 # Convert Message to all formats 1495 for class_ in (mailbox.Message, mailbox.MaildirMessage, 1496 mailbox.mboxMessage, mailbox.MHMessage, 1497 mailbox.BabylMessage, mailbox.MMDFMessage): 1498 msg_plain = mailbox.Message(_sample_message) 1499 msg = class_(msg_plain) 1500 self._check_sample(msg) 1501 1502 def test_x_to_plain(self): 1503 # Convert all formats to Message 1504 for class_ in (mailbox.Message, mailbox.MaildirMessage, 1505 mailbox.mboxMessage, mailbox.MHMessage, 1506 mailbox.BabylMessage, mailbox.MMDFMessage): 1507 msg = class_(_sample_message) 1508 msg_plain = mailbox.Message(msg) 1509 self._check_sample(msg_plain) 1510 1511 def test_x_to_invalid(self): 1512 # Convert all formats to an invalid format 1513 for class_ in (mailbox.Message, mailbox.MaildirMessage, 1514 mailbox.mboxMessage, mailbox.MHMessage, 1515 mailbox.BabylMessage, mailbox.MMDFMessage): 1516 self.assertRaises(TypeError, lambda: class_(False)) 1517 1518 def test_maildir_to_maildir(self): 1519 # Convert MaildirMessage to MaildirMessage 1520 msg_maildir = mailbox.MaildirMessage(_sample_message) 1521 msg_maildir.set_flags('DFPRST') 1522 msg_maildir.set_subdir('cur') 1523 date = msg_maildir.get_date() 1524 msg = mailbox.MaildirMessage(msg_maildir) 1525 self._check_sample(msg) 1526 self.assertEqual(msg.get_flags(), 'DFPRST') 1527 self.assertEqual(msg.get_subdir(), 'cur') 1528 self.assertEqual(msg.get_date(), date) 1529 1530 def test_maildir_to_mboxmmdf(self): 1531 # Convert MaildirMessage to mboxmessage and MMDFMessage 1532 pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'), 1533 ('T', 'D'), ('DFPRST', 'RDFA')) 1534 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1535 msg_maildir = mailbox.MaildirMessage(_sample_message) 1536 msg_maildir.set_date(0.0) 1537 for setting, result in pairs: 1538 msg_maildir.set_flags(setting) 1539 msg = class_(msg_maildir) 1540 self.assertEqual(msg.get_flags(), result) 1541 self.assertEqual(msg.get_from(), 'MAILER-DAEMON %s' % 1542 time.asctime(time.gmtime(0.0))) 1543 msg_maildir.set_subdir('cur') 1544 self.assertEqual(class_(msg_maildir).get_flags(), 'RODFA') 1545 1546 def test_maildir_to_mh(self): 1547 # Convert MaildirMessage to MHMessage 1548 msg_maildir = mailbox.MaildirMessage(_sample_message) 1549 pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']), 1550 ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []), 1551 ('T', ['unseen']), ('DFPRST', ['replied', 'flagged'])) 1552 for setting, result in pairs: 1553 msg_maildir.set_flags(setting) 1554 self.assertEqual(mailbox.MHMessage(msg_maildir).get_sequences(), 1555 result) 1556 1557 def test_maildir_to_babyl(self): 1558 # Convert MaildirMessage to Babyl 1559 msg_maildir = mailbox.MaildirMessage(_sample_message) 1560 pairs = (('D', ['unseen']), ('F', ['unseen']), 1561 ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']), 1562 ('S', []), ('T', ['unseen', 'deleted']), 1563 ('DFPRST', ['deleted', 'answered', 'forwarded'])) 1564 for setting, result in pairs: 1565 msg_maildir.set_flags(setting) 1566 self.assertEqual(mailbox.BabylMessage(msg_maildir).get_labels(), 1567 result) 1568 1569 def test_mboxmmdf_to_maildir(self): 1570 # Convert mboxMessage and MMDFMessage to MaildirMessage 1571 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1572 msg_mboxMMDF = class_(_sample_message) 1573 msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0)) 1574 pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'), 1575 ('RODFA', 'FRST')) 1576 for setting, result in pairs: 1577 msg_mboxMMDF.set_flags(setting) 1578 msg = mailbox.MaildirMessage(msg_mboxMMDF) 1579 self.assertEqual(msg.get_flags(), result) 1580 self.assertEqual(msg.get_date(), 0.0) 1581 msg_mboxMMDF.set_flags('O') 1582 self.assertEqual(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir(), 1583 'cur') 1584 1585 def test_mboxmmdf_to_mboxmmdf(self): 1586 # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage 1587 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1588 msg_mboxMMDF = class_(_sample_message) 1589 msg_mboxMMDF.set_flags('RODFA') 1590 msg_mboxMMDF.set_from('foo@bar') 1591 for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1592 msg2 = class2_(msg_mboxMMDF) 1593 self.assertEqual(msg2.get_flags(), 'RODFA') 1594 self.assertEqual(msg2.get_from(), 'foo@bar') 1595 1596 def test_mboxmmdf_to_mh(self): 1597 # Convert mboxMessage and MMDFMessage to MHMessage 1598 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1599 msg_mboxMMDF = class_(_sample_message) 1600 pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']), 1601 ('F', ['unseen', 'flagged']), 1602 ('A', ['unseen', 'replied']), 1603 ('RODFA', ['replied', 'flagged'])) 1604 for setting, result in pairs: 1605 msg_mboxMMDF.set_flags(setting) 1606 self.assertEqual(mailbox.MHMessage(msg_mboxMMDF).get_sequences(), 1607 result) 1608 1609 def test_mboxmmdf_to_babyl(self): 1610 # Convert mboxMessage and MMDFMessage to BabylMessage 1611 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1612 msg = class_(_sample_message) 1613 pairs = (('R', []), ('O', ['unseen']), 1614 ('D', ['unseen', 'deleted']), ('F', ['unseen']), 1615 ('A', ['unseen', 'answered']), 1616 ('RODFA', ['deleted', 'answered'])) 1617 for setting, result in pairs: 1618 msg.set_flags(setting) 1619 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result) 1620 1621 def test_mh_to_maildir(self): 1622 # Convert MHMessage to MaildirMessage 1623 pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS')) 1624 for setting, result in pairs: 1625 msg = mailbox.MHMessage(_sample_message) 1626 msg.add_sequence(setting) 1627 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result) 1628 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1629 msg = mailbox.MHMessage(_sample_message) 1630 msg.add_sequence('unseen') 1631 msg.add_sequence('replied') 1632 msg.add_sequence('flagged') 1633 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'FR') 1634 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1635 1636 def test_mh_to_mboxmmdf(self): 1637 # Convert MHMessage to mboxMessage and MMDFMessage 1638 pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF')) 1639 for setting, result in pairs: 1640 msg = mailbox.MHMessage(_sample_message) 1641 msg.add_sequence(setting) 1642 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1643 self.assertEqual(class_(msg).get_flags(), result) 1644 msg = mailbox.MHMessage(_sample_message) 1645 msg.add_sequence('unseen') 1646 msg.add_sequence('replied') 1647 msg.add_sequence('flagged') 1648 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1649 self.assertEqual(class_(msg).get_flags(), 'OFA') 1650 1651 def test_mh_to_mh(self): 1652 # Convert MHMessage to MHMessage 1653 msg = mailbox.MHMessage(_sample_message) 1654 msg.add_sequence('unseen') 1655 msg.add_sequence('replied') 1656 msg.add_sequence('flagged') 1657 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), 1658 ['unseen', 'replied', 'flagged']) 1659 1660 def test_mh_to_babyl(self): 1661 # Convert MHMessage to BabylMessage 1662 pairs = (('unseen', ['unseen']), ('replied', ['answered']), 1663 ('flagged', [])) 1664 for setting, result in pairs: 1665 msg = mailbox.MHMessage(_sample_message) 1666 msg.add_sequence(setting) 1667 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result) 1668 msg = mailbox.MHMessage(_sample_message) 1669 msg.add_sequence('unseen') 1670 msg.add_sequence('replied') 1671 msg.add_sequence('flagged') 1672 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), 1673 ['unseen', 'answered']) 1674 1675 def test_babyl_to_maildir(self): 1676 # Convert BabylMessage to MaildirMessage 1677 pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'), 1678 ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'), 1679 ('resent', 'PS')) 1680 for setting, result in pairs: 1681 msg = mailbox.BabylMessage(_sample_message) 1682 msg.add_label(setting) 1683 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result) 1684 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1685 msg = mailbox.BabylMessage(_sample_message) 1686 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1687 'edited', 'resent'): 1688 msg.add_label(label) 1689 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'PRT') 1690 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1691 1692 def test_babyl_to_mboxmmdf(self): 1693 # Convert BabylMessage to mboxMessage and MMDFMessage 1694 pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'), 1695 ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'), 1696 ('resent', 'RO')) 1697 for setting, result in pairs: 1698 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1699 msg = mailbox.BabylMessage(_sample_message) 1700 msg.add_label(setting) 1701 self.assertEqual(class_(msg).get_flags(), result) 1702 msg = mailbox.BabylMessage(_sample_message) 1703 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1704 'edited', 'resent'): 1705 msg.add_label(label) 1706 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1707 self.assertEqual(class_(msg).get_flags(), 'ODA') 1708 1709 def test_babyl_to_mh(self): 1710 # Convert BabylMessage to MHMessage 1711 pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []), 1712 ('answered', ['replied']), ('forwarded', []), ('edited', []), 1713 ('resent', [])) 1714 for setting, result in pairs: 1715 msg = mailbox.BabylMessage(_sample_message) 1716 msg.add_label(setting) 1717 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), result) 1718 msg = mailbox.BabylMessage(_sample_message) 1719 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1720 'edited', 'resent'): 1721 msg.add_label(label) 1722 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), 1723 ['unseen', 'replied']) 1724 1725 def test_babyl_to_babyl(self): 1726 # Convert BabylMessage to BabylMessage 1727 msg = mailbox.BabylMessage(_sample_message) 1728 msg.update_visible() 1729 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1730 'edited', 'resent'): 1731 msg.add_label(label) 1732 msg2 = mailbox.BabylMessage(msg) 1733 self.assertEqual(msg2.get_labels(), ['unseen', 'deleted', 'filed', 1734 'answered', 'forwarded', 'edited', 1735 'resent']) 1736 self.assertEqual(msg.get_visible().keys(), msg2.get_visible().keys()) 1737 for key in msg.get_visible().keys(): 1738 self.assertEqual(msg.get_visible()[key], msg2.get_visible()[key]) 1739 1740 1741class TestProxyFileBase(TestBase): 1742 1743 def _test_read(self, proxy): 1744 # Read by byte 1745 proxy.seek(0) 1746 self.assertEqual(proxy.read(), 'bar') 1747 proxy.seek(1) 1748 self.assertEqual(proxy.read(), 'ar') 1749 proxy.seek(0) 1750 self.assertEqual(proxy.read(2), 'ba') 1751 proxy.seek(1) 1752 self.assertEqual(proxy.read(-1), 'ar') 1753 proxy.seek(2) 1754 self.assertEqual(proxy.read(1000), 'r') 1755 1756 def _test_readline(self, proxy): 1757 # Read by line 1758 proxy.seek(0) 1759 self.assertEqual(proxy.readline(), 'foo' + os.linesep) 1760 self.assertEqual(proxy.readline(), 'bar' + os.linesep) 1761 self.assertEqual(proxy.readline(), 'fred' + os.linesep) 1762 self.assertEqual(proxy.readline(), 'bob') 1763 proxy.seek(2) 1764 self.assertEqual(proxy.readline(), 'o' + os.linesep) 1765 proxy.seek(6 + 2 * len(os.linesep)) 1766 self.assertEqual(proxy.readline(), 'fred' + os.linesep) 1767 proxy.seek(6 + 2 * len(os.linesep)) 1768 self.assertEqual(proxy.readline(2), 'fr') 1769 self.assertEqual(proxy.readline(-10), 'ed' + os.linesep) 1770 1771 def _test_readlines(self, proxy): 1772 # Read multiple lines 1773 proxy.seek(0) 1774 self.assertEqual(proxy.readlines(), ['foo' + os.linesep, 1775 'bar' + os.linesep, 1776 'fred' + os.linesep, 'bob']) 1777 proxy.seek(0) 1778 self.assertEqual(proxy.readlines(2), ['foo' + os.linesep]) 1779 proxy.seek(3 + len(os.linesep)) 1780 self.assertEqual(proxy.readlines(4 + len(os.linesep)), 1781 ['bar' + os.linesep, 'fred' + os.linesep]) 1782 proxy.seek(3) 1783 self.assertEqual(proxy.readlines(1000), [os.linesep, 'bar' + os.linesep, 1784 'fred' + os.linesep, 'bob']) 1785 1786 def _test_iteration(self, proxy): 1787 # Iterate by line 1788 proxy.seek(0) 1789 iterator = iter(proxy) 1790 self.assertEqual(list(iterator), 1791 ['foo' + os.linesep, 'bar' + os.linesep, 'fred' + os.linesep, 'bob']) 1792 1793 def _test_seek_and_tell(self, proxy): 1794 # Seek and use tell to check position 1795 proxy.seek(3) 1796 self.assertEqual(proxy.tell(), 3) 1797 self.assertEqual(proxy.read(len(os.linesep)), os.linesep) 1798 proxy.seek(2, 1) 1799 self.assertEqual(proxy.read(1 + len(os.linesep)), 'r' + os.linesep) 1800 proxy.seek(-3 - len(os.linesep), 2) 1801 self.assertEqual(proxy.read(3), 'bar') 1802 proxy.seek(2, 0) 1803 self.assertEqual(proxy.read(), 'o' + os.linesep + 'bar' + os.linesep) 1804 proxy.seek(100) 1805 self.assertEqual(proxy.read(), '') 1806 1807 def _test_close(self, proxy): 1808 # Close a file 1809 proxy.close() 1810 # Issue 11700 subsequent closes should be a no-op, not an error. 1811 proxy.close() 1812 1813 1814class TestProxyFile(TestProxyFileBase, unittest.TestCase): 1815 1816 def setUp(self): 1817 self._path = test_support.TESTFN 1818 self._file = open(self._path, 'wb+') 1819 1820 def tearDown(self): 1821 self._file.close() 1822 self._delete_recursively(self._path) 1823 1824 def test_initialize(self): 1825 # Initialize and check position 1826 self._file.write('foo') 1827 pos = self._file.tell() 1828 proxy0 = mailbox._ProxyFile(self._file) 1829 self.assertEqual(proxy0.tell(), pos) 1830 self.assertEqual(self._file.tell(), pos) 1831 proxy1 = mailbox._ProxyFile(self._file, 0) 1832 self.assertEqual(proxy1.tell(), 0) 1833 self.assertEqual(self._file.tell(), pos) 1834 1835 def test_read(self): 1836 self._file.write('bar') 1837 self._test_read(mailbox._ProxyFile(self._file)) 1838 1839 def test_readline(self): 1840 self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 1841 os.linesep)) 1842 self._test_readline(mailbox._ProxyFile(self._file)) 1843 1844 def test_readlines(self): 1845 self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 1846 os.linesep)) 1847 self._test_readlines(mailbox._ProxyFile(self._file)) 1848 1849 def test_iteration(self): 1850 self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 1851 os.linesep)) 1852 self._test_iteration(mailbox._ProxyFile(self._file)) 1853 1854 def test_seek_and_tell(self): 1855 self._file.write('foo%sbar%s' % (os.linesep, os.linesep)) 1856 self._test_seek_and_tell(mailbox._ProxyFile(self._file)) 1857 1858 def test_close(self): 1859 self._file.write('foo%sbar%s' % (os.linesep, os.linesep)) 1860 self._test_close(mailbox._ProxyFile(self._file)) 1861 1862 1863class TestPartialFile(TestProxyFileBase, unittest.TestCase): 1864 1865 def setUp(self): 1866 self._path = test_support.TESTFN 1867 self._file = open(self._path, 'wb+') 1868 1869 def tearDown(self): 1870 self._file.close() 1871 self._delete_recursively(self._path) 1872 1873 def test_initialize(self): 1874 # Initialize and check position 1875 self._file.write('foo' + os.linesep + 'bar') 1876 pos = self._file.tell() 1877 proxy = mailbox._PartialFile(self._file, 2, 5) 1878 self.assertEqual(proxy.tell(), 0) 1879 self.assertEqual(self._file.tell(), pos) 1880 1881 def test_read(self): 1882 self._file.write('***bar***') 1883 self._test_read(mailbox._PartialFile(self._file, 3, 6)) 1884 1885 def test_readline(self): 1886 self._file.write('!!!!!foo%sbar%sfred%sbob!!!!!' % 1887 (os.linesep, os.linesep, os.linesep)) 1888 self._test_readline(mailbox._PartialFile(self._file, 5, 1889 18 + 3 * len(os.linesep))) 1890 1891 def test_readlines(self): 1892 self._file.write('foo%sbar%sfred%sbob?????' % 1893 (os.linesep, os.linesep, os.linesep)) 1894 self._test_readlines(mailbox._PartialFile(self._file, 0, 1895 13 + 3 * len(os.linesep))) 1896 1897 def test_iteration(self): 1898 self._file.write('____foo%sbar%sfred%sbob####' % 1899 (os.linesep, os.linesep, os.linesep)) 1900 self._test_iteration(mailbox._PartialFile(self._file, 4, 1901 17 + 3 * len(os.linesep))) 1902 1903 def test_seek_and_tell(self): 1904 self._file.write('(((foo%sbar%s$$$' % (os.linesep, os.linesep)) 1905 self._test_seek_and_tell(mailbox._PartialFile(self._file, 3, 1906 9 + 2 * len(os.linesep))) 1907 1908 def test_close(self): 1909 self._file.write('&foo%sbar%s^' % (os.linesep, os.linesep)) 1910 self._test_close(mailbox._PartialFile(self._file, 1, 1911 6 + 3 * len(os.linesep))) 1912 1913 1914## Start: tests from the original module (for backward compatibility). 1915 1916FROM_ = "From some.body@dummy.domain Sat Jul 24 13:43:35 2004\n" 1917DUMMY_MESSAGE = """\ 1918From: some.body@dummy.domain 1919To: me@my.domain 1920Subject: Simple Test 1921 1922This is a dummy message. 1923""" 1924 1925class MaildirTestCase(unittest.TestCase): 1926 1927 def setUp(self): 1928 # create a new maildir mailbox to work with: 1929 self._dir = test_support.TESTFN 1930 if os.path.isdir(self._dir): 1931 test_support.rmtree(self._dir) 1932 if os.path.isfile(self._dir): 1933 test_support.unlink(self._dir) 1934 os.mkdir(self._dir) 1935 os.mkdir(os.path.join(self._dir, "cur")) 1936 os.mkdir(os.path.join(self._dir, "tmp")) 1937 os.mkdir(os.path.join(self._dir, "new")) 1938 self._counter = 1 1939 self._msgfiles = [] 1940 1941 def tearDown(self): 1942 map(os.unlink, self._msgfiles) 1943 test_support.rmdir(os.path.join(self._dir, "cur")) 1944 test_support.rmdir(os.path.join(self._dir, "tmp")) 1945 test_support.rmdir(os.path.join(self._dir, "new")) 1946 test_support.rmdir(self._dir) 1947 1948 def createMessage(self, dir, mbox=False): 1949 t = int(time.time() % 1000000) 1950 pid = self._counter 1951 self._counter += 1 1952 filename = os.extsep.join((str(t), str(pid), "myhostname", "mydomain")) 1953 tmpname = os.path.join(self._dir, "tmp", filename) 1954 newname = os.path.join(self._dir, dir, filename) 1955 with open(tmpname, "w") as fp: 1956 self._msgfiles.append(tmpname) 1957 if mbox: 1958 fp.write(FROM_) 1959 fp.write(DUMMY_MESSAGE) 1960 if hasattr(os, "link"): 1961 os.link(tmpname, newname) 1962 else: 1963 with open(newname, "w") as fp: 1964 fp.write(DUMMY_MESSAGE) 1965 self._msgfiles.append(newname) 1966 return tmpname 1967 1968 def test_empty_maildir(self): 1969 """Test an empty maildir mailbox""" 1970 # Test for regression on bug #117490: 1971 # Make sure the boxes attribute actually gets set. 1972 self.mbox = mailbox.Maildir(test_support.TESTFN) 1973 #self.assertTrue(hasattr(self.mbox, "boxes")) 1974 #self.assertEqual(len(self.mbox.boxes), 0) 1975 self.assertIsNone(self.mbox.next()) 1976 self.assertIsNone(self.mbox.next()) 1977 1978 def test_nonempty_maildir_cur(self): 1979 self.createMessage("cur") 1980 self.mbox = mailbox.Maildir(test_support.TESTFN) 1981 #self.assertEqual(len(self.mbox.boxes), 1) 1982 msg = self.mbox.next() 1983 self.assertIsNotNone(msg) 1984 msg.fp.close() 1985 self.assertIsNone(self.mbox.next()) 1986 self.assertIsNone(self.mbox.next()) 1987 1988 def test_nonempty_maildir_new(self): 1989 self.createMessage("new") 1990 self.mbox = mailbox.Maildir(test_support.TESTFN) 1991 #self.assertEqual(len(self.mbox.boxes), 1) 1992 msg = self.mbox.next() 1993 self.assertIsNotNone(msg) 1994 msg.fp.close() 1995 self.assertIsNone(self.mbox.next()) 1996 self.assertIsNone(self.mbox.next()) 1997 1998 def test_nonempty_maildir_both(self): 1999 self.createMessage("cur") 2000 self.createMessage("new") 2001 self.mbox = mailbox.Maildir(test_support.TESTFN) 2002 #self.assertEqual(len(self.mbox.boxes), 2) 2003 msg = self.mbox.next() 2004 self.assertIsNotNone(msg) 2005 msg.fp.close() 2006 msg = self.mbox.next() 2007 self.assertIsNotNone(msg) 2008 msg.fp.close() 2009 self.assertIsNone(self.mbox.next()) 2010 self.assertIsNone(self.mbox.next()) 2011 2012 def test_unix_mbox(self): 2013 ### should be better! 2014 import email.parser 2015 fname = self.createMessage("cur", True) 2016 n = 0 2017 fid = open(fname) 2018 for msg in mailbox.PortableUnixMailbox(fid, 2019 email.parser.Parser().parse): 2020 n += 1 2021 self.assertEqual(msg["subject"], "Simple Test") 2022 self.assertEqual(len(str(msg)), len(FROM_)+len(DUMMY_MESSAGE)) 2023 fid.close() 2024 self.assertEqual(n, 1) 2025 2026## End: classes from the original module (for backward compatibility). 2027 2028 2029_sample_message = """\ 2030Return-Path: <gkj@gregorykjohnson.com> 2031X-Original-To: gkj+person@localhost 2032Delivered-To: gkj+person@localhost 2033Received: from localhost (localhost [127.0.0.1]) 2034 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 2035 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT) 2036Delivered-To: gkj@sundance.gregorykjohnson.com 2037Received: from localhost [127.0.0.1] 2038 by localhost with POP3 (fetchmail-6.2.5) 2039 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT) 2040Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) 2041 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 2042 for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) 2043Received: by andy.gregorykjohnson.com (Postfix, from userid 1000) 2044 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) 2045Date: Wed, 13 Jul 2005 17:23:11 -0400 2046From: "Gregory K. Johnson" <gkj@gregorykjohnson.com> 2047To: gkj@gregorykjohnson.com 2048Subject: Sample message 2049Message-ID: <20050713212311.GC4701@andy.gregorykjohnson.com> 2050Mime-Version: 1.0 2051Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+" 2052Content-Disposition: inline 2053User-Agent: Mutt/1.5.9i 2054 2055 2056--NMuMz9nt05w80d4+ 2057Content-Type: text/plain; charset=us-ascii 2058Content-Disposition: inline 2059 2060This is a sample message. 2061 2062-- 2063Gregory K. Johnson 2064 2065--NMuMz9nt05w80d4+ 2066Content-Type: application/octet-stream 2067Content-Disposition: attachment; filename="text.gz" 2068Content-Transfer-Encoding: base64 2069 2070H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs 20713FYlAAAA 2072 2073--NMuMz9nt05w80d4+-- 2074""" 2075 2076_sample_headers = { 2077 "Return-Path":"<gkj@gregorykjohnson.com>", 2078 "X-Original-To":"gkj+person@localhost", 2079 "Delivered-To":"gkj+person@localhost", 2080 "Received":"""from localhost (localhost [127.0.0.1]) 2081 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 2082 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", 2083 "Delivered-To":"gkj@sundance.gregorykjohnson.com", 2084 "Received":"""from localhost [127.0.0.1] 2085 by localhost with POP3 (fetchmail-6.2.5) 2086 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", 2087 "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) 2088 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 2089 for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", 2090 "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000) 2091 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", 2092 "Date":"Wed, 13 Jul 2005 17:23:11 -0400", 2093 "From":""""Gregory K. Johnson" <gkj@gregorykjohnson.com>""", 2094 "To":"gkj@gregorykjohnson.com", 2095 "Subject":"Sample message", 2096 "Mime-Version":"1.0", 2097 "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""", 2098 "Content-Disposition":"inline", 2099 "User-Agent": "Mutt/1.5.9i" } 2100 2101_sample_payloads = ("""This is a sample message. 2102 2103-- 2104Gregory K. Johnson 2105""", 2106"""H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs 21073FYlAAAA 2108""") 2109 2110 2111def test_main(): 2112 tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH, 2113 TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage, 2114 TestMHMessage, TestBabylMessage, TestMMDFMessage, 2115 TestMessageConversion, TestProxyFile, TestPartialFile, 2116 MaildirTestCase) 2117 test_support.run_unittest(*tests) 2118 test_support.reap_children() 2119 2120 2121if __name__ == '__main__': 2122 test_main() 2123