1import os 2import sys 3import time 4import stat 5import socket 6import email 7import email.message 8import re 9import StringIO 10from test import test_support 11import unittest 12import mailbox 13import glob 14try: 15 import fcntl 16except ImportError: 17 pass 18 19# Silence Py3k warning 20rfc822 = test_support.import_module('rfc822', deprecated=True) 21 22class TestBase(unittest.TestCase): 23 24 def _check_sample(self, msg): 25 # Inspect a mailbox.Message representation of the sample message 26 self.assertIsInstance(msg, email.message.Message) 27 self.assertIsInstance(msg, mailbox.Message) 28 for key, value in _sample_headers.iteritems(): 29 self.assertIn(value, msg.get_all(key)) 30 self.assertTrue(msg.is_multipart()) 31 self.assertEqual(len(msg.get_payload()), len(_sample_payloads)) 32 for i, payload in enumerate(_sample_payloads): 33 part = msg.get_payload(i) 34 self.assertIsInstance(part, email.message.Message) 35 self.assertNotIsInstance(part, mailbox.Message) 36 self.assertEqual(part.get_payload(), payload) 37 38 def _delete_recursively(self, target): 39 # Delete a file or delete a directory recursively 40 if os.path.isdir(target): 41 for path, dirs, files in os.walk(target, topdown=False): 42 for name in files: 43 os.remove(os.path.join(path, name)) 44 for name in dirs: 45 os.rmdir(os.path.join(path, name)) 46 os.rmdir(target) 47 elif os.path.exists(target): 48 os.remove(target) 49 50 51class TestMailbox(TestBase): 52 53 _factory = None # Overridden by subclasses to reuse tests 54 _template = 'From: foo\n\n%s' 55 56 def setUp(self): 57 self._path = test_support.TESTFN 58 self._delete_recursively(self._path) 59 self._box = self._factory(self._path) 60 61 def tearDown(self): 62 self._box.close() 63 self._delete_recursively(self._path) 64 65 def test_add(self): 66 # Add copies of a sample message 67 keys = [] 68 keys.append(self._box.add(self._template % 0)) 69 self.assertEqual(len(self._box), 1) 70 keys.append(self._box.add(mailbox.Message(_sample_message))) 71 self.assertEqual(len(self._box), 2) 72 keys.append(self._box.add(email.message_from_string(_sample_message))) 73 self.assertEqual(len(self._box), 3) 74 keys.append(self._box.add(StringIO.StringIO(_sample_message))) 75 self.assertEqual(len(self._box), 4) 76 keys.append(self._box.add(_sample_message)) 77 self.assertEqual(len(self._box), 5) 78 self.assertEqual(self._box.get_string(keys[0]), self._template % 0) 79 for i in (1, 2, 3, 4): 80 self._check_sample(self._box[keys[i]]) 81 82 def test_remove(self): 83 # Remove messages using remove() 84 self._test_remove_or_delitem(self._box.remove) 85 86 def test_delitem(self): 87 # Remove messages using __delitem__() 88 self._test_remove_or_delitem(self._box.__delitem__) 89 90 def _test_remove_or_delitem(self, method): 91 # (Used by test_remove() and test_delitem().) 92 key0 = self._box.add(self._template % 0) 93 key1 = self._box.add(self._template % 1) 94 self.assertEqual(len(self._box), 2) 95 method(key0) 96 l = len(self._box) 97 self.assertEqual(l, 1) 98 self.assertRaises(KeyError, lambda: self._box[key0]) 99 self.assertRaises(KeyError, lambda: method(key0)) 100 self.assertEqual(self._box.get_string(key1), self._template % 1) 101 key2 = self._box.add(self._template % 2) 102 self.assertEqual(len(self._box), 2) 103 method(key2) 104 l = len(self._box) 105 self.assertEqual(l, 1) 106 self.assertRaises(KeyError, lambda: self._box[key2]) 107 self.assertRaises(KeyError, lambda: method(key2)) 108 self.assertEqual(self._box.get_string(key1), self._template % 1) 109 method(key1) 110 self.assertEqual(len(self._box), 0) 111 self.assertRaises(KeyError, lambda: self._box[key1]) 112 self.assertRaises(KeyError, lambda: method(key1)) 113 114 def test_discard(self, repetitions=10): 115 # Discard messages 116 key0 = self._box.add(self._template % 0) 117 key1 = self._box.add(self._template % 1) 118 self.assertEqual(len(self._box), 2) 119 self._box.discard(key0) 120 self.assertEqual(len(self._box), 1) 121 self.assertRaises(KeyError, lambda: self._box[key0]) 122 self._box.discard(key0) 123 self.assertEqual(len(self._box), 1) 124 self.assertRaises(KeyError, lambda: self._box[key0]) 125 126 def test_get(self): 127 # Retrieve messages using get() 128 key0 = self._box.add(self._template % 0) 129 msg = self._box.get(key0) 130 self.assertEqual(msg['from'], 'foo') 131 self.assertEqual(msg.get_payload(), '0') 132 self.assertIs(self._box.get('foo'), None) 133 self.assertFalse(self._box.get('foo', False)) 134 self._box.close() 135 self._box = self._factory(self._path, factory=rfc822.Message) 136 key1 = self._box.add(self._template % 1) 137 msg = self._box.get(key1) 138 self.assertEqual(msg['from'], 'foo') 139 self.assertEqual(msg.fp.read(), '1') 140 141 def test_getitem(self): 142 # Retrieve message using __getitem__() 143 key0 = self._box.add(self._template % 0) 144 msg = self._box[key0] 145 self.assertEqual(msg['from'], 'foo') 146 self.assertEqual(msg.get_payload(), '0') 147 self.assertRaises(KeyError, lambda: self._box['foo']) 148 self._box.discard(key0) 149 self.assertRaises(KeyError, lambda: self._box[key0]) 150 151 def test_get_message(self): 152 # Get Message representations of messages 153 key0 = self._box.add(self._template % 0) 154 key1 = self._box.add(_sample_message) 155 msg0 = self._box.get_message(key0) 156 self.assertIsInstance(msg0, mailbox.Message) 157 self.assertEqual(msg0['from'], 'foo') 158 self.assertEqual(msg0.get_payload(), '0') 159 self._check_sample(self._box.get_message(key1)) 160 161 def test_get_string(self): 162 # Get string representations of messages 163 key0 = self._box.add(self._template % 0) 164 key1 = self._box.add(_sample_message) 165 self.assertEqual(self._box.get_string(key0), self._template % 0) 166 self.assertEqual(self._box.get_string(key1), _sample_message) 167 168 def test_get_file(self): 169 # Get file representations of messages 170 key0 = self._box.add(self._template % 0) 171 key1 = self._box.add(_sample_message) 172 self.assertEqual(self._box.get_file(key0).read().replace(os.linesep, '\n'), 173 self._template % 0) 174 self.assertEqual(self._box.get_file(key1).read().replace(os.linesep, '\n'), 175 _sample_message) 176 177 def test_iterkeys(self): 178 # Get keys using iterkeys() 179 self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False) 180 181 def test_keys(self): 182 # Get keys using keys() 183 self._check_iteration(self._box.keys, do_keys=True, do_values=False) 184 185 def test_itervalues(self): 186 # Get values using itervalues() 187 self._check_iteration(self._box.itervalues, do_keys=False, 188 do_values=True) 189 190 def test_iter(self): 191 # Get values using __iter__() 192 self._check_iteration(self._box.__iter__, do_keys=False, 193 do_values=True) 194 195 def test_values(self): 196 # Get values using values() 197 self._check_iteration(self._box.values, do_keys=False, do_values=True) 198 199 def test_iteritems(self): 200 # Get keys and values using iteritems() 201 self._check_iteration(self._box.iteritems, do_keys=True, 202 do_values=True) 203 204 def test_items(self): 205 # Get keys and values using items() 206 self._check_iteration(self._box.items, do_keys=True, do_values=True) 207 208 def _check_iteration(self, method, do_keys, do_values, repetitions=10): 209 for value in method(): 210 self.fail("Not empty") 211 keys, values = [], [] 212 for i in xrange(repetitions): 213 keys.append(self._box.add(self._template % i)) 214 values.append(self._template % i) 215 if do_keys and not do_values: 216 returned_keys = list(method()) 217 elif do_values and not do_keys: 218 returned_values = list(method()) 219 else: 220 returned_keys, returned_values = [], [] 221 for key, value in method(): 222 returned_keys.append(key) 223 returned_values.append(value) 224 if do_keys: 225 self.assertEqual(len(keys), len(returned_keys)) 226 self.assertEqual(set(keys), set(returned_keys)) 227 if do_values: 228 count = 0 229 for value in returned_values: 230 self.assertEqual(value['from'], 'foo') 231 self.assertTrue(int(value.get_payload()) < repetitions, 232 (value.get_payload(), repetitions)) 233 count += 1 234 self.assertEqual(len(values), count) 235 236 def test_has_key(self): 237 # Check existence of keys using has_key() 238 self._test_has_key_or_contains(self._box.has_key) 239 240 def test_contains(self): 241 # Check existence of keys using __contains__() 242 self._test_has_key_or_contains(self._box.__contains__) 243 244 def _test_has_key_or_contains(self, method): 245 # (Used by test_has_key() and test_contains().) 246 self.assertFalse(method('foo')) 247 key0 = self._box.add(self._template % 0) 248 self.assertTrue(method(key0)) 249 self.assertFalse(method('foo')) 250 key1 = self._box.add(self._template % 1) 251 self.assertTrue(method(key1)) 252 self.assertTrue(method(key0)) 253 self.assertFalse(method('foo')) 254 self._box.remove(key0) 255 self.assertFalse(method(key0)) 256 self.assertTrue(method(key1)) 257 self.assertFalse(method('foo')) 258 self._box.remove(key1) 259 self.assertFalse(method(key1)) 260 self.assertFalse(method(key0)) 261 self.assertFalse(method('foo')) 262 263 def test_len(self, repetitions=10): 264 # Get message count 265 keys = [] 266 for i in xrange(repetitions): 267 self.assertEqual(len(self._box), i) 268 keys.append(self._box.add(self._template % i)) 269 self.assertEqual(len(self._box), i + 1) 270 for i in xrange(repetitions): 271 self.assertEqual(len(self._box), repetitions - i) 272 self._box.remove(keys[i]) 273 self.assertEqual(len(self._box), repetitions - i - 1) 274 275 def test_set_item(self): 276 # Modify messages using __setitem__() 277 key0 = self._box.add(self._template % 'original 0') 278 self.assertEqual(self._box.get_string(key0), 279 self._template % 'original 0') 280 key1 = self._box.add(self._template % 'original 1') 281 self.assertEqual(self._box.get_string(key1), 282 self._template % 'original 1') 283 self._box[key0] = self._template % 'changed 0' 284 self.assertEqual(self._box.get_string(key0), 285 self._template % 'changed 0') 286 self._box[key1] = self._template % 'changed 1' 287 self.assertEqual(self._box.get_string(key1), 288 self._template % 'changed 1') 289 self._box[key0] = _sample_message 290 self._check_sample(self._box[key0]) 291 self._box[key1] = self._box[key0] 292 self._check_sample(self._box[key1]) 293 self._box[key0] = self._template % 'original 0' 294 self.assertEqual(self._box.get_string(key0), 295 self._template % 'original 0') 296 self._check_sample(self._box[key1]) 297 self.assertRaises(KeyError, 298 lambda: self._box.__setitem__('foo', 'bar')) 299 self.assertRaises(KeyError, lambda: self._box['foo']) 300 self.assertEqual(len(self._box), 2) 301 302 def test_clear(self, iterations=10): 303 # Remove all messages using clear() 304 keys = [] 305 for i in xrange(iterations): 306 self._box.add(self._template % i) 307 for i, key in enumerate(keys): 308 self.assertEqual(self._box.get_string(key), self._template % i) 309 self._box.clear() 310 self.assertEqual(len(self._box), 0) 311 for i, key in enumerate(keys): 312 self.assertRaises(KeyError, lambda: self._box.get_string(key)) 313 314 def test_pop(self): 315 # Get and remove a message using pop() 316 key0 = self._box.add(self._template % 0) 317 self.assertIn(key0, self._box) 318 key1 = self._box.add(self._template % 1) 319 self.assertIn(key1, self._box) 320 self.assertEqual(self._box.pop(key0).get_payload(), '0') 321 self.assertNotIn(key0, self._box) 322 self.assertIn(key1, self._box) 323 key2 = self._box.add(self._template % 2) 324 self.assertIn(key2, self._box) 325 self.assertEqual(self._box.pop(key2).get_payload(), '2') 326 self.assertNotIn(key2, self._box) 327 self.assertIn(key1, self._box) 328 self.assertEqual(self._box.pop(key1).get_payload(), '1') 329 self.assertNotIn(key1, self._box) 330 self.assertEqual(len(self._box), 0) 331 332 def test_popitem(self, iterations=10): 333 # Get and remove an arbitrary (key, message) using popitem() 334 keys = [] 335 for i in xrange(10): 336 keys.append(self._box.add(self._template % i)) 337 seen = [] 338 for i in xrange(10): 339 key, msg = self._box.popitem() 340 self.assertIn(key, keys) 341 self.assertNotIn(key, seen) 342 seen.append(key) 343 self.assertEqual(int(msg.get_payload()), keys.index(key)) 344 self.assertEqual(len(self._box), 0) 345 for key in keys: 346 self.assertRaises(KeyError, lambda: self._box[key]) 347 348 def test_update(self): 349 # Modify multiple messages using update() 350 key0 = self._box.add(self._template % 'original 0') 351 key1 = self._box.add(self._template % 'original 1') 352 key2 = self._box.add(self._template % 'original 2') 353 self._box.update({key0: self._template % 'changed 0', 354 key2: _sample_message}) 355 self.assertEqual(len(self._box), 3) 356 self.assertEqual(self._box.get_string(key0), 357 self._template % 'changed 0') 358 self.assertEqual(self._box.get_string(key1), 359 self._template % 'original 1') 360 self._check_sample(self._box[key2]) 361 self._box.update([(key2, self._template % 'changed 2'), 362 (key1, self._template % 'changed 1'), 363 (key0, self._template % 'original 0')]) 364 self.assertEqual(len(self._box), 3) 365 self.assertEqual(self._box.get_string(key0), 366 self._template % 'original 0') 367 self.assertEqual(self._box.get_string(key1), 368 self._template % 'changed 1') 369 self.assertEqual(self._box.get_string(key2), 370 self._template % 'changed 2') 371 self.assertRaises(KeyError, 372 lambda: self._box.update({'foo': 'bar', 373 key0: self._template % "changed 0"})) 374 self.assertEqual(len(self._box), 3) 375 self.assertEqual(self._box.get_string(key0), 376 self._template % "changed 0") 377 self.assertEqual(self._box.get_string(key1), 378 self._template % "changed 1") 379 self.assertEqual(self._box.get_string(key2), 380 self._template % "changed 2") 381 382 def test_flush(self): 383 # Write changes to disk 384 self._test_flush_or_close(self._box.flush, True) 385 386 def test_lock_unlock(self): 387 # Lock and unlock the mailbox 388 self.assertFalse(os.path.exists(self._get_lock_path())) 389 self._box.lock() 390 self.assertTrue(os.path.exists(self._get_lock_path())) 391 self._box.unlock() 392 self.assertFalse(os.path.exists(self._get_lock_path())) 393 394 def test_close(self): 395 # Close mailbox and flush changes to disk 396 self._test_flush_or_close(self._box.close, False) 397 398 def _test_flush_or_close(self, method, should_call_close): 399 contents = [self._template % i for i in xrange(3)] 400 self._box.add(contents[0]) 401 self._box.add(contents[1]) 402 self._box.add(contents[2]) 403 method() 404 if should_call_close: 405 self._box.close() 406 self._box = self._factory(self._path) 407 keys = self._box.keys() 408 self.assertEqual(len(keys), 3) 409 for key in keys: 410 self.assertIn(self._box.get_string(key), contents) 411 412 def test_dump_message(self): 413 # Write message representations to disk 414 for input in (email.message_from_string(_sample_message), 415 _sample_message, StringIO.StringIO(_sample_message)): 416 output = StringIO.StringIO() 417 self._box._dump_message(input, output) 418 self.assertEqual(output.getvalue(), 419 _sample_message.replace('\n', os.linesep)) 420 output = StringIO.StringIO() 421 self.assertRaises(TypeError, 422 lambda: self._box._dump_message(None, output)) 423 424 def _get_lock_path(self): 425 # Return the path of the dot lock file. May be overridden. 426 return self._path + '.lock' 427 428 429class TestMailboxSuperclass(TestBase): 430 431 def test_notimplemented(self): 432 # Test that all Mailbox methods raise NotImplementedException. 433 box = mailbox.Mailbox('path') 434 self.assertRaises(NotImplementedError, lambda: box.add('')) 435 self.assertRaises(NotImplementedError, lambda: box.remove('')) 436 self.assertRaises(NotImplementedError, lambda: box.__delitem__('')) 437 self.assertRaises(NotImplementedError, lambda: box.discard('')) 438 self.assertRaises(NotImplementedError, lambda: box.__setitem__('', '')) 439 self.assertRaises(NotImplementedError, lambda: box.iterkeys()) 440 self.assertRaises(NotImplementedError, lambda: box.keys()) 441 self.assertRaises(NotImplementedError, lambda: box.itervalues().next()) 442 self.assertRaises(NotImplementedError, lambda: box.__iter__().next()) 443 self.assertRaises(NotImplementedError, lambda: box.values()) 444 self.assertRaises(NotImplementedError, lambda: box.iteritems().next()) 445 self.assertRaises(NotImplementedError, lambda: box.items()) 446 self.assertRaises(NotImplementedError, lambda: box.get('')) 447 self.assertRaises(NotImplementedError, lambda: box.__getitem__('')) 448 self.assertRaises(NotImplementedError, lambda: box.get_message('')) 449 self.assertRaises(NotImplementedError, lambda: box.get_string('')) 450 self.assertRaises(NotImplementedError, lambda: box.get_file('')) 451 self.assertRaises(NotImplementedError, lambda: box.has_key('')) 452 self.assertRaises(NotImplementedError, lambda: box.__contains__('')) 453 self.assertRaises(NotImplementedError, lambda: box.__len__()) 454 self.assertRaises(NotImplementedError, lambda: box.clear()) 455 self.assertRaises(NotImplementedError, lambda: box.pop('')) 456 self.assertRaises(NotImplementedError, lambda: box.popitem()) 457 self.assertRaises(NotImplementedError, lambda: box.update((('', ''),))) 458 self.assertRaises(NotImplementedError, lambda: box.flush()) 459 self.assertRaises(NotImplementedError, lambda: box.lock()) 460 self.assertRaises(NotImplementedError, lambda: box.unlock()) 461 self.assertRaises(NotImplementedError, lambda: box.close()) 462 463 464class TestMaildir(TestMailbox): 465 466 _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory) 467 468 def setUp(self): 469 TestMailbox.setUp(self) 470 if os.name in ('nt', 'os2') or sys.platform == 'cygwin': 471 self._box.colon = '!' 472 473 def test_add_MM(self): 474 # Add a MaildirMessage instance 475 msg = mailbox.MaildirMessage(self._template % 0) 476 msg.set_subdir('cur') 477 msg.set_info('foo') 478 key = self._box.add(msg) 479 self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' % 480 (key, self._box.colon)))) 481 482 def test_get_MM(self): 483 # Get a MaildirMessage instance 484 msg = mailbox.MaildirMessage(self._template % 0) 485 msg.set_subdir('cur') 486 msg.set_flags('RF') 487 key = self._box.add(msg) 488 msg_returned = self._box.get_message(key) 489 self.assertIsInstance(msg_returned, mailbox.MaildirMessage) 490 self.assertEqual(msg_returned.get_subdir(), 'cur') 491 self.assertEqual(msg_returned.get_flags(), 'FR') 492 493 def test_set_MM(self): 494 # Set with a MaildirMessage instance 495 msg0 = mailbox.MaildirMessage(self._template % 0) 496 msg0.set_flags('TP') 497 key = self._box.add(msg0) 498 msg_returned = self._box.get_message(key) 499 self.assertEqual(msg_returned.get_subdir(), 'new') 500 self.assertEqual(msg_returned.get_flags(), 'PT') 501 msg1 = mailbox.MaildirMessage(self._template % 1) 502 self._box[key] = msg1 503 msg_returned = self._box.get_message(key) 504 self.assertEqual(msg_returned.get_subdir(), 'new') 505 self.assertEqual(msg_returned.get_flags(), '') 506 self.assertEqual(msg_returned.get_payload(), '1') 507 msg2 = mailbox.MaildirMessage(self._template % 2) 508 msg2.set_info('2,S') 509 self._box[key] = msg2 510 self._box[key] = self._template % 3 511 msg_returned = self._box.get_message(key) 512 self.assertEqual(msg_returned.get_subdir(), 'new') 513 self.assertEqual(msg_returned.get_flags(), 'S') 514 self.assertEqual(msg_returned.get_payload(), '3') 515 516 def test_consistent_factory(self): 517 # Add a message. 518 msg = mailbox.MaildirMessage(self._template % 0) 519 msg.set_subdir('cur') 520 msg.set_flags('RF') 521 key = self._box.add(msg) 522 523 # Create new mailbox with 524 class FakeMessage(mailbox.MaildirMessage): 525 pass 526 box = mailbox.Maildir(self._path, factory=FakeMessage) 527 box.colon = self._box.colon 528 msg2 = box.get_message(key) 529 self.assertIsInstance(msg2, FakeMessage) 530 531 def test_initialize_new(self): 532 # Initialize a non-existent mailbox 533 self.tearDown() 534 self._box = mailbox.Maildir(self._path) 535 self._check_basics(factory=rfc822.Message) 536 self._delete_recursively(self._path) 537 self._box = self._factory(self._path, factory=None) 538 self._check_basics() 539 540 def test_initialize_existing(self): 541 # Initialize an existing mailbox 542 self.tearDown() 543 for subdir in '', 'tmp', 'new', 'cur': 544 os.mkdir(os.path.normpath(os.path.join(self._path, subdir))) 545 self._box = mailbox.Maildir(self._path) 546 self._check_basics(factory=rfc822.Message) 547 self._box = mailbox.Maildir(self._path, factory=None) 548 self._check_basics() 549 550 def _check_basics(self, factory=None): 551 # (Used by test_open_new() and test_open_existing().) 552 self.assertEqual(self._box._path, os.path.abspath(self._path)) 553 self.assertEqual(self._box._factory, factory) 554 for subdir in '', 'tmp', 'new', 'cur': 555 path = os.path.join(self._path, subdir) 556 mode = os.stat(path)[stat.ST_MODE] 557 self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path) 558 559 def test_list_folders(self): 560 # List folders 561 self._box.add_folder('one') 562 self._box.add_folder('two') 563 self._box.add_folder('three') 564 self.assertEqual(len(self._box.list_folders()), 3) 565 self.assertEqual(set(self._box.list_folders()), 566 set(('one', 'two', 'three'))) 567 568 def test_get_folder(self): 569 # Open folders 570 self._box.add_folder('foo.bar') 571 folder0 = self._box.get_folder('foo.bar') 572 folder0.add(self._template % 'bar') 573 self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar'))) 574 folder1 = self._box.get_folder('foo.bar') 575 self.assertEqual(folder1.get_string(folder1.keys()[0]), 576 self._template % 'bar') 577 578 def test_add_and_remove_folders(self): 579 # Delete folders 580 self._box.add_folder('one') 581 self._box.add_folder('two') 582 self.assertEqual(len(self._box.list_folders()), 2) 583 self.assertEqual(set(self._box.list_folders()), set(('one', 'two'))) 584 self._box.remove_folder('one') 585 self.assertEqual(len(self._box.list_folders()), 1) 586 self.assertEqual(set(self._box.list_folders()), set(('two',))) 587 self._box.add_folder('three') 588 self.assertEqual(len(self._box.list_folders()), 2) 589 self.assertEqual(set(self._box.list_folders()), set(('two', 'three'))) 590 self._box.remove_folder('three') 591 self.assertEqual(len(self._box.list_folders()), 1) 592 self.assertEqual(set(self._box.list_folders()), set(('two',))) 593 self._box.remove_folder('two') 594 self.assertEqual(len(self._box.list_folders()), 0) 595 self.assertEqual(self._box.list_folders(), []) 596 597 def test_clean(self): 598 # Remove old files from 'tmp' 599 foo_path = os.path.join(self._path, 'tmp', 'foo') 600 bar_path = os.path.join(self._path, 'tmp', 'bar') 601 with open(foo_path, 'w') as f: 602 f.write("@") 603 with open(bar_path, 'w') as f: 604 f.write("@") 605 self._box.clean() 606 self.assertTrue(os.path.exists(foo_path)) 607 self.assertTrue(os.path.exists(bar_path)) 608 foo_stat = os.stat(foo_path) 609 os.utime(foo_path, (time.time() - 129600 - 2, 610 foo_stat.st_mtime)) 611 self._box.clean() 612 self.assertFalse(os.path.exists(foo_path)) 613 self.assertTrue(os.path.exists(bar_path)) 614 615 def test_create_tmp(self, repetitions=10): 616 # Create files in tmp directory 617 hostname = socket.gethostname() 618 if '/' in hostname: 619 hostname = hostname.replace('/', r'\057') 620 if ':' in hostname: 621 hostname = hostname.replace(':', r'\072') 622 pid = os.getpid() 623 pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)" 624 r"Q(?P<Q>\d+)\.(?P<host>[^:/]+)") 625 previous_groups = None 626 for x in xrange(repetitions): 627 tmp_file = self._box._create_tmp() 628 head, tail = os.path.split(tmp_file.name) 629 self.assertEqual(head, os.path.abspath(os.path.join(self._path, 630 "tmp")), 631 "File in wrong location: '%s'" % head) 632 match = pattern.match(tail) 633 self.assertTrue(match is not None, "Invalid file name: '%s'" % tail) 634 groups = match.groups() 635 if previous_groups is not None: 636 self.assertTrue(int(groups[0] >= previous_groups[0]), 637 "Non-monotonic seconds: '%s' before '%s'" % 638 (previous_groups[0], groups[0])) 639 self.assertTrue(int(groups[1] >= previous_groups[1]) or 640 groups[0] != groups[1], 641 "Non-monotonic milliseconds: '%s' before '%s'" % 642 (previous_groups[1], groups[1])) 643 self.assertTrue(int(groups[2]) == pid, 644 "Process ID mismatch: '%s' should be '%s'" % 645 (groups[2], pid)) 646 self.assertTrue(int(groups[3]) == int(previous_groups[3]) + 1, 647 "Non-sequential counter: '%s' before '%s'" % 648 (previous_groups[3], groups[3])) 649 self.assertTrue(groups[4] == hostname, 650 "Host name mismatch: '%s' should be '%s'" % 651 (groups[4], hostname)) 652 previous_groups = groups 653 tmp_file.write(_sample_message) 654 tmp_file.seek(0) 655 self.assertTrue(tmp_file.read() == _sample_message) 656 tmp_file.close() 657 file_count = len(os.listdir(os.path.join(self._path, "tmp"))) 658 self.assertTrue(file_count == repetitions, 659 "Wrong file count: '%s' should be '%s'" % 660 (file_count, repetitions)) 661 662 def test_refresh(self): 663 # Update the table of contents 664 self.assertEqual(self._box._toc, {}) 665 key0 = self._box.add(self._template % 0) 666 key1 = self._box.add(self._template % 1) 667 self.assertEqual(self._box._toc, {}) 668 self._box._refresh() 669 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 670 key1: os.path.join('new', key1)}) 671 key2 = self._box.add(self._template % 2) 672 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 673 key1: os.path.join('new', key1)}) 674 self._box._refresh() 675 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 676 key1: os.path.join('new', key1), 677 key2: os.path.join('new', key2)}) 678 679 def test_lookup(self): 680 # Look up message subpaths in the TOC 681 self.assertRaises(KeyError, lambda: self._box._lookup('foo')) 682 key0 = self._box.add(self._template % 0) 683 self.assertEqual(self._box._lookup(key0), os.path.join('new', key0)) 684 os.remove(os.path.join(self._path, 'new', key0)) 685 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0)}) 686 # Be sure that the TOC is read back from disk (see issue #6896 687 # about bad mtime behaviour on some systems). 688 self._box.flush() 689 self.assertRaises(KeyError, lambda: self._box._lookup(key0)) 690 self.assertEqual(self._box._toc, {}) 691 692 def test_lock_unlock(self): 693 # Lock and unlock the mailbox. For Maildir, this does nothing. 694 self._box.lock() 695 self._box.unlock() 696 697 def test_folder (self): 698 # Test for bug #1569790: verify that folders returned by .get_folder() 699 # use the same factory function. 700 def dummy_factory (s): 701 return None 702 box = self._factory(self._path, factory=dummy_factory) 703 folder = box.add_folder('folder1') 704 self.assertIs(folder._factory, dummy_factory) 705 706 folder1_alias = box.get_folder('folder1') 707 self.assertIs(folder1_alias._factory, dummy_factory) 708 709 def test_directory_in_folder (self): 710 # Test that mailboxes still work if there's a stray extra directory 711 # in a folder. 712 for i in range(10): 713 self._box.add(mailbox.Message(_sample_message)) 714 715 # Create a stray directory 716 os.mkdir(os.path.join(self._path, 'cur', 'stray-dir')) 717 718 # Check that looping still works with the directory present. 719 for msg in self._box: 720 pass 721 722 def test_file_permissions(self): 723 # Verify that message files are created without execute permissions 724 if not hasattr(os, "stat") or not hasattr(os, "umask"): 725 return 726 msg = mailbox.MaildirMessage(self._template % 0) 727 orig_umask = os.umask(0) 728 try: 729 key = self._box.add(msg) 730 finally: 731 os.umask(orig_umask) 732 path = os.path.join(self._path, self._box._lookup(key)) 733 mode = os.stat(path).st_mode 734 self.assertEqual(mode & 0111, 0) 735 736 def test_folder_file_perms(self): 737 # From bug #3228, we want to verify that the file created inside a Maildir 738 # subfolder isn't marked as executable. 739 if not hasattr(os, "stat") or not hasattr(os, "umask"): 740 return 741 742 orig_umask = os.umask(0) 743 try: 744 subfolder = self._box.add_folder('subfolder') 745 finally: 746 os.umask(orig_umask) 747 748 path = os.path.join(subfolder._path, 'maildirfolder') 749 st = os.stat(path) 750 perms = st.st_mode 751 self.assertFalse((perms & 0111)) # Execute bits should all be off. 752 753 def test_reread(self): 754 755 # Put the last modified times more than two seconds into the past 756 # (because mtime may have only a two second granularity). 757 for subdir in ('cur', 'new'): 758 os.utime(os.path.join(self._box._path, subdir), 759 (time.time()-5,)*2) 760 761 # Because mtime has a two second granularity in worst case (FAT), a 762 # refresh is done unconditionally if called for within 763 # two-second-plus-a-bit of the last one, just in case the mbox has 764 # changed; so now we have to wait for that interval to expire. 765 time.sleep(2.01 + self._box._skewfactor) 766 767 # Re-reading causes the ._toc attribute to be assigned a new dictionary 768 # object, so we'll check that the ._toc attribute isn't a different 769 # object. 770 orig_toc = self._box._toc 771 def refreshed(): 772 return self._box._toc is not orig_toc 773 774 self._box._refresh() 775 self.assertFalse(refreshed()) 776 777 # Now, write something into cur and remove it. This changes 778 # the mtime and should cause a re-read. 779 filename = os.path.join(self._path, 'cur', 'stray-file') 780 f = open(filename, 'w') 781 f.close() 782 os.unlink(filename) 783 self._box._refresh() 784 self.assertTrue(refreshed()) 785 786class _TestMboxMMDF(TestMailbox): 787 788 def tearDown(self): 789 self._box.close() 790 self._delete_recursively(self._path) 791 for lock_remnant in glob.glob(self._path + '.*'): 792 test_support.unlink(lock_remnant) 793 794 def test_add_from_string(self): 795 # Add a string starting with 'From ' to the mailbox 796 key = self._box.add('From foo@bar blah\nFrom: foo\n\n0') 797 self.assertEqual(self._box[key].get_from(), 'foo@bar blah') 798 self.assertEqual(self._box[key].get_payload(), '0') 799 800 def test_add_mbox_or_mmdf_message(self): 801 # Add an mboxMessage or MMDFMessage 802 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 803 msg = class_('From foo@bar blah\nFrom: foo\n\n0') 804 key = self._box.add(msg) 805 806 def test_open_close_open(self): 807 # Open and inspect previously-created mailbox 808 values = [self._template % i for i in xrange(3)] 809 for value in values: 810 self._box.add(value) 811 self._box.close() 812 mtime = os.path.getmtime(self._path) 813 self._box = self._factory(self._path) 814 self.assertEqual(len(self._box), 3) 815 for key in self._box.iterkeys(): 816 self.assertIn(self._box.get_string(key), values) 817 self._box.close() 818 self.assertEqual(mtime, os.path.getmtime(self._path)) 819 820 def test_add_and_close(self): 821 # Verifying that closing a mailbox doesn't change added items 822 self._box.add(_sample_message) 823 for i in xrange(3): 824 self._box.add(self._template % i) 825 self._box.add(_sample_message) 826 self._box._file.flush() 827 self._box._file.seek(0) 828 contents = self._box._file.read() 829 self._box.close() 830 with open(self._path, 'rb') as f: 831 self.assertEqual(contents, f.read()) 832 self._box = self._factory(self._path) 833 834 def test_lock_conflict(self): 835 # Fork off a subprocess that will lock the file for 2 seconds, 836 # unlock it, and then exit. 837 if not hasattr(os, 'fork'): 838 return 839 pid = os.fork() 840 if pid == 0: 841 # In the child, lock the mailbox. 842 self._box.lock() 843 time.sleep(2) 844 self._box.unlock() 845 os._exit(0) 846 847 # In the parent, sleep a bit to give the child time to acquire 848 # the lock. 849 time.sleep(0.5) 850 try: 851 self.assertRaises(mailbox.ExternalClashError, 852 self._box.lock) 853 finally: 854 # Wait for child to exit. Locking should now succeed. 855 exited_pid, status = os.waitpid(pid, 0) 856 857 self._box.lock() 858 self._box.unlock() 859 860 def test_relock(self): 861 # Test case for bug #1575506: the mailbox class was locking the 862 # wrong file object in its flush() method. 863 msg = "Subject: sub\n\nbody\n" 864 key1 = self._box.add(msg) 865 self._box.flush() 866 self._box.close() 867 868 self._box = self._factory(self._path) 869 self._box.lock() 870 key2 = self._box.add(msg) 871 self._box.flush() 872 self.assertTrue(self._box._locked) 873 self._box.close() 874 875 876class TestMbox(_TestMboxMMDF): 877 878 _factory = lambda self, path, factory=None: mailbox.mbox(path, factory) 879 880 def test_file_perms(self): 881 # From bug #3228, we want to verify that the mailbox file isn't executable, 882 # even if the umask is set to something that would leave executable bits set. 883 # We only run this test on platforms that support umask. 884 if hasattr(os, 'umask') and hasattr(os, 'stat'): 885 try: 886 old_umask = os.umask(0077) 887 self._box.close() 888 os.unlink(self._path) 889 self._box = mailbox.mbox(self._path, create=True) 890 self._box.add('') 891 self._box.close() 892 finally: 893 os.umask(old_umask) 894 895 st = os.stat(self._path) 896 perms = st.st_mode 897 self.assertFalse((perms & 0111)) # Execute bits should all be off. 898 899class TestMMDF(_TestMboxMMDF): 900 901 _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory) 902 903 904class TestMH(TestMailbox): 905 906 _factory = lambda self, path, factory=None: mailbox.MH(path, factory) 907 908 def test_list_folders(self): 909 # List folders 910 self._box.add_folder('one') 911 self._box.add_folder('two') 912 self._box.add_folder('three') 913 self.assertEqual(len(self._box.list_folders()), 3) 914 self.assertEqual(set(self._box.list_folders()), 915 set(('one', 'two', 'three'))) 916 917 def test_get_folder(self): 918 # Open folders 919 def dummy_factory (s): 920 return None 921 self._box = self._factory(self._path, dummy_factory) 922 923 new_folder = self._box.add_folder('foo.bar') 924 folder0 = self._box.get_folder('foo.bar') 925 folder0.add(self._template % 'bar') 926 self.assertTrue(os.path.isdir(os.path.join(self._path, 'foo.bar'))) 927 folder1 = self._box.get_folder('foo.bar') 928 self.assertEqual(folder1.get_string(folder1.keys()[0]), 929 self._template % 'bar') 930 931 # Test for bug #1569790: verify that folders returned by .get_folder() 932 # use the same factory function. 933 self.assertIs(new_folder._factory, self._box._factory) 934 self.assertIs(folder0._factory, self._box._factory) 935 936 def test_add_and_remove_folders(self): 937 # Delete folders 938 self._box.add_folder('one') 939 self._box.add_folder('two') 940 self.assertEqual(len(self._box.list_folders()), 2) 941 self.assertEqual(set(self._box.list_folders()), set(('one', 'two'))) 942 self._box.remove_folder('one') 943 self.assertEqual(len(self._box.list_folders()), 1) 944 self.assertEqual(set(self._box.list_folders()), set(('two', ))) 945 self._box.add_folder('three') 946 self.assertEqual(len(self._box.list_folders()), 2) 947 self.assertEqual(set(self._box.list_folders()), set(('two', 'three'))) 948 self._box.remove_folder('three') 949 self.assertEqual(len(self._box.list_folders()), 1) 950 self.assertEqual(set(self._box.list_folders()), set(('two', ))) 951 self._box.remove_folder('two') 952 self.assertEqual(len(self._box.list_folders()), 0) 953 self.assertEqual(self._box.list_folders(), []) 954 955 def test_sequences(self): 956 # Get and set sequences 957 self.assertEqual(self._box.get_sequences(), {}) 958 msg0 = mailbox.MHMessage(self._template % 0) 959 msg0.add_sequence('foo') 960 key0 = self._box.add(msg0) 961 self.assertEqual(self._box.get_sequences(), {'foo':[key0]}) 962 msg1 = mailbox.MHMessage(self._template % 1) 963 msg1.set_sequences(['bar', 'replied', 'foo']) 964 key1 = self._box.add(msg1) 965 self.assertEqual(self._box.get_sequences(), 966 {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]}) 967 msg0.set_sequences(['flagged']) 968 self._box[key0] = msg0 969 self.assertEqual(self._box.get_sequences(), 970 {'foo':[key1], 'bar':[key1], 'replied':[key1], 971 'flagged':[key0]}) 972 self._box.remove(key1) 973 self.assertEqual(self._box.get_sequences(), {'flagged':[key0]}) 974 975 def test_issue2625(self): 976 msg0 = mailbox.MHMessage(self._template % 0) 977 msg0.add_sequence('foo') 978 key0 = self._box.add(msg0) 979 refmsg0 = self._box.get_message(key0) 980 981 def test_issue7627(self): 982 msg0 = mailbox.MHMessage(self._template % 0) 983 key0 = self._box.add(msg0) 984 self._box.lock() 985 self._box.remove(key0) 986 self._box.unlock() 987 988 def test_pack(self): 989 # Pack the contents of the mailbox 990 msg0 = mailbox.MHMessage(self._template % 0) 991 msg1 = mailbox.MHMessage(self._template % 1) 992 msg2 = mailbox.MHMessage(self._template % 2) 993 msg3 = mailbox.MHMessage(self._template % 3) 994 msg0.set_sequences(['foo', 'unseen']) 995 msg1.set_sequences(['foo']) 996 msg2.set_sequences(['foo', 'flagged']) 997 msg3.set_sequences(['foo', 'bar', 'replied']) 998 key0 = self._box.add(msg0) 999 key1 = self._box.add(msg1) 1000 key2 = self._box.add(msg2) 1001 key3 = self._box.add(msg3) 1002 self.assertEqual(self._box.get_sequences(), 1003 {'foo':[key0,key1,key2,key3], 'unseen':[key0], 1004 'flagged':[key2], 'bar':[key3], 'replied':[key3]}) 1005 self._box.remove(key2) 1006 self.assertEqual(self._box.get_sequences(), 1007 {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3], 1008 'replied':[key3]}) 1009 self._box.pack() 1010 self.assertEqual(self._box.keys(), [1, 2, 3]) 1011 key0 = key0 1012 key1 = key0 + 1 1013 key2 = key1 + 1 1014 self.assertEqual(self._box.get_sequences(), 1015 {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]}) 1016 1017 # Test case for packing while holding the mailbox locked. 1018 key0 = self._box.add(msg1) 1019 key1 = self._box.add(msg1) 1020 key2 = self._box.add(msg1) 1021 key3 = self._box.add(msg1) 1022 1023 self._box.remove(key0) 1024 self._box.remove(key2) 1025 self._box.lock() 1026 self._box.pack() 1027 self._box.unlock() 1028 self.assertEqual(self._box.get_sequences(), 1029 {'foo':[1, 2, 3, 4, 5], 1030 'unseen':[1], 'bar':[3], 'replied':[3]}) 1031 1032 def _get_lock_path(self): 1033 return os.path.join(self._path, '.mh_sequences.lock') 1034 1035 1036class TestBabyl(TestMailbox): 1037 1038 _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory) 1039 1040 def tearDown(self): 1041 self._box.close() 1042 self._delete_recursively(self._path) 1043 for lock_remnant in glob.glob(self._path + '.*'): 1044 test_support.unlink(lock_remnant) 1045 1046 def test_labels(self): 1047 # Get labels from the mailbox 1048 self.assertEqual(self._box.get_labels(), []) 1049 msg0 = mailbox.BabylMessage(self._template % 0) 1050 msg0.add_label('foo') 1051 key0 = self._box.add(msg0) 1052 self.assertEqual(self._box.get_labels(), ['foo']) 1053 msg1 = mailbox.BabylMessage(self._template % 1) 1054 msg1.set_labels(['bar', 'answered', 'foo']) 1055 key1 = self._box.add(msg1) 1056 self.assertEqual(set(self._box.get_labels()), set(['foo', 'bar'])) 1057 msg0.set_labels(['blah', 'filed']) 1058 self._box[key0] = msg0 1059 self.assertEqual(set(self._box.get_labels()), 1060 set(['foo', 'bar', 'blah'])) 1061 self._box.remove(key1) 1062 self.assertEqual(set(self._box.get_labels()), set(['blah'])) 1063 1064 1065class TestMessage(TestBase): 1066 1067 _factory = mailbox.Message # Overridden by subclasses to reuse tests 1068 1069 def setUp(self): 1070 self._path = test_support.TESTFN 1071 1072 def tearDown(self): 1073 self._delete_recursively(self._path) 1074 1075 def test_initialize_with_eMM(self): 1076 # Initialize based on email.message.Message instance 1077 eMM = email.message_from_string(_sample_message) 1078 msg = self._factory(eMM) 1079 self._post_initialize_hook(msg) 1080 self._check_sample(msg) 1081 1082 def test_initialize_with_string(self): 1083 # Initialize based on string 1084 msg = self._factory(_sample_message) 1085 self._post_initialize_hook(msg) 1086 self._check_sample(msg) 1087 1088 def test_initialize_with_file(self): 1089 # Initialize based on contents of file 1090 with open(self._path, 'w+') as f: 1091 f.write(_sample_message) 1092 f.seek(0) 1093 msg = self._factory(f) 1094 self._post_initialize_hook(msg) 1095 self._check_sample(msg) 1096 1097 def test_initialize_with_nothing(self): 1098 # Initialize without arguments 1099 msg = self._factory() 1100 self._post_initialize_hook(msg) 1101 self.assertIsInstance(msg, email.message.Message) 1102 self.assertIsInstance(msg, mailbox.Message) 1103 self.assertIsInstance(msg, self._factory) 1104 self.assertEqual(msg.keys(), []) 1105 self.assertFalse(msg.is_multipart()) 1106 self.assertEqual(msg.get_payload(), None) 1107 1108 def test_initialize_incorrectly(self): 1109 # Initialize with invalid argument 1110 self.assertRaises(TypeError, lambda: self._factory(object())) 1111 1112 def test_become_message(self): 1113 # Take on the state of another message 1114 eMM = email.message_from_string(_sample_message) 1115 msg = self._factory() 1116 msg._become_message(eMM) 1117 self._check_sample(msg) 1118 1119 def test_explain_to(self): 1120 # Copy self's format-specific data to other message formats. 1121 # This test is superficial; better ones are in TestMessageConversion. 1122 msg = self._factory() 1123 for class_ in (mailbox.Message, mailbox.MaildirMessage, 1124 mailbox.mboxMessage, mailbox.MHMessage, 1125 mailbox.BabylMessage, mailbox.MMDFMessage): 1126 other_msg = class_() 1127 msg._explain_to(other_msg) 1128 other_msg = email.message.Message() 1129 self.assertRaises(TypeError, lambda: msg._explain_to(other_msg)) 1130 1131 def _post_initialize_hook(self, msg): 1132 # Overridden by subclasses to check extra things after initialization 1133 pass 1134 1135 1136class TestMaildirMessage(TestMessage): 1137 1138 _factory = mailbox.MaildirMessage 1139 1140 def _post_initialize_hook(self, msg): 1141 self.assertEqual(msg._subdir, 'new') 1142 self.assertEqual(msg._info,'') 1143 1144 def test_subdir(self): 1145 # Use get_subdir() and set_subdir() 1146 msg = mailbox.MaildirMessage(_sample_message) 1147 self.assertEqual(msg.get_subdir(), 'new') 1148 msg.set_subdir('cur') 1149 self.assertEqual(msg.get_subdir(), 'cur') 1150 msg.set_subdir('new') 1151 self.assertEqual(msg.get_subdir(), 'new') 1152 self.assertRaises(ValueError, lambda: msg.set_subdir('tmp')) 1153 self.assertEqual(msg.get_subdir(), 'new') 1154 msg.set_subdir('new') 1155 self.assertEqual(msg.get_subdir(), 'new') 1156 self._check_sample(msg) 1157 1158 def test_flags(self): 1159 # Use get_flags(), set_flags(), add_flag(), remove_flag() 1160 msg = mailbox.MaildirMessage(_sample_message) 1161 self.assertEqual(msg.get_flags(), '') 1162 self.assertEqual(msg.get_subdir(), 'new') 1163 msg.set_flags('F') 1164 self.assertEqual(msg.get_subdir(), 'new') 1165 self.assertEqual(msg.get_flags(), 'F') 1166 msg.set_flags('SDTP') 1167 self.assertEqual(msg.get_flags(), 'DPST') 1168 msg.add_flag('FT') 1169 self.assertEqual(msg.get_flags(), 'DFPST') 1170 msg.remove_flag('TDRP') 1171 self.assertEqual(msg.get_flags(), 'FS') 1172 self.assertEqual(msg.get_subdir(), 'new') 1173 self._check_sample(msg) 1174 1175 def test_date(self): 1176 # Use get_date() and set_date() 1177 msg = mailbox.MaildirMessage(_sample_message) 1178 diff = msg.get_date() - time.time() 1179 self.assertTrue(abs(diff) < 60, diff) 1180 msg.set_date(0.0) 1181 self.assertEqual(msg.get_date(), 0.0) 1182 1183 def test_info(self): 1184 # Use get_info() and set_info() 1185 msg = mailbox.MaildirMessage(_sample_message) 1186 self.assertEqual(msg.get_info(), '') 1187 msg.set_info('1,foo=bar') 1188 self.assertEqual(msg.get_info(), '1,foo=bar') 1189 self.assertRaises(TypeError, lambda: msg.set_info(None)) 1190 self._check_sample(msg) 1191 1192 def test_info_and_flags(self): 1193 # Test interaction of info and flag methods 1194 msg = mailbox.MaildirMessage(_sample_message) 1195 self.assertEqual(msg.get_info(), '') 1196 msg.set_flags('SF') 1197 self.assertEqual(msg.get_flags(), 'FS') 1198 self.assertEqual(msg.get_info(), '2,FS') 1199 msg.set_info('1,') 1200 self.assertEqual(msg.get_flags(), '') 1201 self.assertEqual(msg.get_info(), '1,') 1202 msg.remove_flag('RPT') 1203 self.assertEqual(msg.get_flags(), '') 1204 self.assertEqual(msg.get_info(), '1,') 1205 msg.add_flag('D') 1206 self.assertEqual(msg.get_flags(), 'D') 1207 self.assertEqual(msg.get_info(), '2,D') 1208 self._check_sample(msg) 1209 1210 1211class _TestMboxMMDFMessage(TestMessage): 1212 1213 _factory = mailbox._mboxMMDFMessage 1214 1215 def _post_initialize_hook(self, msg): 1216 self._check_from(msg) 1217 1218 def test_initialize_with_unixfrom(self): 1219 # Initialize with a message that already has a _unixfrom attribute 1220 msg = mailbox.Message(_sample_message) 1221 msg.set_unixfrom('From foo@bar blah') 1222 msg = mailbox.mboxMessage(msg) 1223 self.assertEqual(msg.get_from(), 'foo@bar blah') 1224 1225 def test_from(self): 1226 # Get and set "From " line 1227 msg = mailbox.mboxMessage(_sample_message) 1228 self._check_from(msg) 1229 msg.set_from('foo bar') 1230 self.assertEqual(msg.get_from(), 'foo bar') 1231 msg.set_from('foo@bar', True) 1232 self._check_from(msg, 'foo@bar') 1233 msg.set_from('blah@temp', time.localtime()) 1234 self._check_from(msg, 'blah@temp') 1235 1236 def test_flags(self): 1237 # Use get_flags(), set_flags(), add_flag(), remove_flag() 1238 msg = mailbox.mboxMessage(_sample_message) 1239 self.assertEqual(msg.get_flags(), '') 1240 msg.set_flags('F') 1241 self.assertEqual(msg.get_flags(), 'F') 1242 msg.set_flags('XODR') 1243 self.assertEqual(msg.get_flags(), 'RODX') 1244 msg.add_flag('FA') 1245 self.assertEqual(msg.get_flags(), 'RODFAX') 1246 msg.remove_flag('FDXA') 1247 self.assertEqual(msg.get_flags(), 'RO') 1248 self._check_sample(msg) 1249 1250 def _check_from(self, msg, sender=None): 1251 # Check contents of "From " line 1252 if sender is None: 1253 sender = "MAILER-DAEMON" 1254 self.assertTrue(re.match(sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:" 1255 r"\d{2} \d{4}", msg.get_from())) 1256 1257 1258class TestMboxMessage(_TestMboxMMDFMessage): 1259 1260 _factory = mailbox.mboxMessage 1261 1262 1263class TestMHMessage(TestMessage): 1264 1265 _factory = mailbox.MHMessage 1266 1267 def _post_initialize_hook(self, msg): 1268 self.assertEqual(msg._sequences, []) 1269 1270 def test_sequences(self): 1271 # Get, set, join, and leave sequences 1272 msg = mailbox.MHMessage(_sample_message) 1273 self.assertEqual(msg.get_sequences(), []) 1274 msg.set_sequences(['foobar']) 1275 self.assertEqual(msg.get_sequences(), ['foobar']) 1276 msg.set_sequences([]) 1277 self.assertEqual(msg.get_sequences(), []) 1278 msg.add_sequence('unseen') 1279 self.assertEqual(msg.get_sequences(), ['unseen']) 1280 msg.add_sequence('flagged') 1281 self.assertEqual(msg.get_sequences(), ['unseen', 'flagged']) 1282 msg.add_sequence('flagged') 1283 self.assertEqual(msg.get_sequences(), ['unseen', 'flagged']) 1284 msg.remove_sequence('unseen') 1285 self.assertEqual(msg.get_sequences(), ['flagged']) 1286 msg.add_sequence('foobar') 1287 self.assertEqual(msg.get_sequences(), ['flagged', 'foobar']) 1288 msg.remove_sequence('replied') 1289 self.assertEqual(msg.get_sequences(), ['flagged', 'foobar']) 1290 msg.set_sequences(['foobar', 'replied']) 1291 self.assertEqual(msg.get_sequences(), ['foobar', 'replied']) 1292 1293 1294class TestBabylMessage(TestMessage): 1295 1296 _factory = mailbox.BabylMessage 1297 1298 def _post_initialize_hook(self, msg): 1299 self.assertEqual(msg._labels, []) 1300 1301 def test_labels(self): 1302 # Get, set, join, and leave labels 1303 msg = mailbox.BabylMessage(_sample_message) 1304 self.assertEqual(msg.get_labels(), []) 1305 msg.set_labels(['foobar']) 1306 self.assertEqual(msg.get_labels(), ['foobar']) 1307 msg.set_labels([]) 1308 self.assertEqual(msg.get_labels(), []) 1309 msg.add_label('filed') 1310 self.assertEqual(msg.get_labels(), ['filed']) 1311 msg.add_label('resent') 1312 self.assertEqual(msg.get_labels(), ['filed', 'resent']) 1313 msg.add_label('resent') 1314 self.assertEqual(msg.get_labels(), ['filed', 'resent']) 1315 msg.remove_label('filed') 1316 self.assertEqual(msg.get_labels(), ['resent']) 1317 msg.add_label('foobar') 1318 self.assertEqual(msg.get_labels(), ['resent', 'foobar']) 1319 msg.remove_label('unseen') 1320 self.assertEqual(msg.get_labels(), ['resent', 'foobar']) 1321 msg.set_labels(['foobar', 'answered']) 1322 self.assertEqual(msg.get_labels(), ['foobar', 'answered']) 1323 1324 def test_visible(self): 1325 # Get, set, and update visible headers 1326 msg = mailbox.BabylMessage(_sample_message) 1327 visible = msg.get_visible() 1328 self.assertEqual(visible.keys(), []) 1329 self.assertIs(visible.get_payload(), None) 1330 visible['User-Agent'] = 'FooBar 1.0' 1331 visible['X-Whatever'] = 'Blah' 1332 self.assertEqual(msg.get_visible().keys(), []) 1333 msg.set_visible(visible) 1334 visible = msg.get_visible() 1335 self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) 1336 self.assertEqual(visible['User-Agent'], 'FooBar 1.0') 1337 self.assertEqual(visible['X-Whatever'], 'Blah') 1338 self.assertIs(visible.get_payload(), None) 1339 msg.update_visible() 1340 self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) 1341 self.assertIs(visible.get_payload(), None) 1342 visible = msg.get_visible() 1343 self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To', 1344 'Subject']) 1345 for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'): 1346 self.assertEqual(visible[header], msg[header]) 1347 1348 1349class TestMMDFMessage(_TestMboxMMDFMessage): 1350 1351 _factory = mailbox.MMDFMessage 1352 1353 1354class TestMessageConversion(TestBase): 1355 1356 def test_plain_to_x(self): 1357 # Convert Message to all formats 1358 for class_ in (mailbox.Message, mailbox.MaildirMessage, 1359 mailbox.mboxMessage, mailbox.MHMessage, 1360 mailbox.BabylMessage, mailbox.MMDFMessage): 1361 msg_plain = mailbox.Message(_sample_message) 1362 msg = class_(msg_plain) 1363 self._check_sample(msg) 1364 1365 def test_x_to_plain(self): 1366 # Convert all formats to Message 1367 for class_ in (mailbox.Message, mailbox.MaildirMessage, 1368 mailbox.mboxMessage, mailbox.MHMessage, 1369 mailbox.BabylMessage, mailbox.MMDFMessage): 1370 msg = class_(_sample_message) 1371 msg_plain = mailbox.Message(msg) 1372 self._check_sample(msg_plain) 1373 1374 def test_x_to_invalid(self): 1375 # Convert all formats to an invalid format 1376 for class_ in (mailbox.Message, mailbox.MaildirMessage, 1377 mailbox.mboxMessage, mailbox.MHMessage, 1378 mailbox.BabylMessage, mailbox.MMDFMessage): 1379 self.assertRaises(TypeError, lambda: class_(False)) 1380 1381 def test_maildir_to_maildir(self): 1382 # Convert MaildirMessage to MaildirMessage 1383 msg_maildir = mailbox.MaildirMessage(_sample_message) 1384 msg_maildir.set_flags('DFPRST') 1385 msg_maildir.set_subdir('cur') 1386 date = msg_maildir.get_date() 1387 msg = mailbox.MaildirMessage(msg_maildir) 1388 self._check_sample(msg) 1389 self.assertEqual(msg.get_flags(), 'DFPRST') 1390 self.assertEqual(msg.get_subdir(), 'cur') 1391 self.assertEqual(msg.get_date(), date) 1392 1393 def test_maildir_to_mboxmmdf(self): 1394 # Convert MaildirMessage to mboxmessage and MMDFMessage 1395 pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'), 1396 ('T', 'D'), ('DFPRST', 'RDFA')) 1397 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1398 msg_maildir = mailbox.MaildirMessage(_sample_message) 1399 msg_maildir.set_date(0.0) 1400 for setting, result in pairs: 1401 msg_maildir.set_flags(setting) 1402 msg = class_(msg_maildir) 1403 self.assertEqual(msg.get_flags(), result) 1404 self.assertEqual(msg.get_from(), 'MAILER-DAEMON %s' % 1405 time.asctime(time.gmtime(0.0))) 1406 msg_maildir.set_subdir('cur') 1407 self.assertEqual(class_(msg_maildir).get_flags(), 'RODFA') 1408 1409 def test_maildir_to_mh(self): 1410 # Convert MaildirMessage to MHMessage 1411 msg_maildir = mailbox.MaildirMessage(_sample_message) 1412 pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']), 1413 ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []), 1414 ('T', ['unseen']), ('DFPRST', ['replied', 'flagged'])) 1415 for setting, result in pairs: 1416 msg_maildir.set_flags(setting) 1417 self.assertEqual(mailbox.MHMessage(msg_maildir).get_sequences(), 1418 result) 1419 1420 def test_maildir_to_babyl(self): 1421 # Convert MaildirMessage to Babyl 1422 msg_maildir = mailbox.MaildirMessage(_sample_message) 1423 pairs = (('D', ['unseen']), ('F', ['unseen']), 1424 ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']), 1425 ('S', []), ('T', ['unseen', 'deleted']), 1426 ('DFPRST', ['deleted', 'answered', 'forwarded'])) 1427 for setting, result in pairs: 1428 msg_maildir.set_flags(setting) 1429 self.assertEqual(mailbox.BabylMessage(msg_maildir).get_labels(), 1430 result) 1431 1432 def test_mboxmmdf_to_maildir(self): 1433 # Convert mboxMessage and MMDFMessage to MaildirMessage 1434 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1435 msg_mboxMMDF = class_(_sample_message) 1436 msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0)) 1437 pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'), 1438 ('RODFA', 'FRST')) 1439 for setting, result in pairs: 1440 msg_mboxMMDF.set_flags(setting) 1441 msg = mailbox.MaildirMessage(msg_mboxMMDF) 1442 self.assertEqual(msg.get_flags(), result) 1443 self.assertEqual(msg.get_date(), 0.0) 1444 msg_mboxMMDF.set_flags('O') 1445 self.assertEqual(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir(), 1446 'cur') 1447 1448 def test_mboxmmdf_to_mboxmmdf(self): 1449 # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage 1450 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1451 msg_mboxMMDF = class_(_sample_message) 1452 msg_mboxMMDF.set_flags('RODFA') 1453 msg_mboxMMDF.set_from('foo@bar') 1454 for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1455 msg2 = class2_(msg_mboxMMDF) 1456 self.assertEqual(msg2.get_flags(), 'RODFA') 1457 self.assertEqual(msg2.get_from(), 'foo@bar') 1458 1459 def test_mboxmmdf_to_mh(self): 1460 # Convert mboxMessage and MMDFMessage to MHMessage 1461 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1462 msg_mboxMMDF = class_(_sample_message) 1463 pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']), 1464 ('F', ['unseen', 'flagged']), 1465 ('A', ['unseen', 'replied']), 1466 ('RODFA', ['replied', 'flagged'])) 1467 for setting, result in pairs: 1468 msg_mboxMMDF.set_flags(setting) 1469 self.assertEqual(mailbox.MHMessage(msg_mboxMMDF).get_sequences(), 1470 result) 1471 1472 def test_mboxmmdf_to_babyl(self): 1473 # Convert mboxMessage and MMDFMessage to BabylMessage 1474 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1475 msg = class_(_sample_message) 1476 pairs = (('R', []), ('O', ['unseen']), 1477 ('D', ['unseen', 'deleted']), ('F', ['unseen']), 1478 ('A', ['unseen', 'answered']), 1479 ('RODFA', ['deleted', 'answered'])) 1480 for setting, result in pairs: 1481 msg.set_flags(setting) 1482 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result) 1483 1484 def test_mh_to_maildir(self): 1485 # Convert MHMessage to MaildirMessage 1486 pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS')) 1487 for setting, result in pairs: 1488 msg = mailbox.MHMessage(_sample_message) 1489 msg.add_sequence(setting) 1490 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result) 1491 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1492 msg = mailbox.MHMessage(_sample_message) 1493 msg.add_sequence('unseen') 1494 msg.add_sequence('replied') 1495 msg.add_sequence('flagged') 1496 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'FR') 1497 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1498 1499 def test_mh_to_mboxmmdf(self): 1500 # Convert MHMessage to mboxMessage and MMDFMessage 1501 pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF')) 1502 for setting, result in pairs: 1503 msg = mailbox.MHMessage(_sample_message) 1504 msg.add_sequence(setting) 1505 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1506 self.assertEqual(class_(msg).get_flags(), result) 1507 msg = mailbox.MHMessage(_sample_message) 1508 msg.add_sequence('unseen') 1509 msg.add_sequence('replied') 1510 msg.add_sequence('flagged') 1511 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1512 self.assertEqual(class_(msg).get_flags(), 'OFA') 1513 1514 def test_mh_to_mh(self): 1515 # Convert MHMessage to MHMessage 1516 msg = mailbox.MHMessage(_sample_message) 1517 msg.add_sequence('unseen') 1518 msg.add_sequence('replied') 1519 msg.add_sequence('flagged') 1520 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), 1521 ['unseen', 'replied', 'flagged']) 1522 1523 def test_mh_to_babyl(self): 1524 # Convert MHMessage to BabylMessage 1525 pairs = (('unseen', ['unseen']), ('replied', ['answered']), 1526 ('flagged', [])) 1527 for setting, result in pairs: 1528 msg = mailbox.MHMessage(_sample_message) 1529 msg.add_sequence(setting) 1530 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result) 1531 msg = mailbox.MHMessage(_sample_message) 1532 msg.add_sequence('unseen') 1533 msg.add_sequence('replied') 1534 msg.add_sequence('flagged') 1535 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), 1536 ['unseen', 'answered']) 1537 1538 def test_babyl_to_maildir(self): 1539 # Convert BabylMessage to MaildirMessage 1540 pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'), 1541 ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'), 1542 ('resent', 'PS')) 1543 for setting, result in pairs: 1544 msg = mailbox.BabylMessage(_sample_message) 1545 msg.add_label(setting) 1546 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result) 1547 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1548 msg = mailbox.BabylMessage(_sample_message) 1549 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1550 'edited', 'resent'): 1551 msg.add_label(label) 1552 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'PRT') 1553 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1554 1555 def test_babyl_to_mboxmmdf(self): 1556 # Convert BabylMessage to mboxMessage and MMDFMessage 1557 pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'), 1558 ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'), 1559 ('resent', 'RO')) 1560 for setting, result in pairs: 1561 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1562 msg = mailbox.BabylMessage(_sample_message) 1563 msg.add_label(setting) 1564 self.assertEqual(class_(msg).get_flags(), result) 1565 msg = mailbox.BabylMessage(_sample_message) 1566 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1567 'edited', 'resent'): 1568 msg.add_label(label) 1569 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1570 self.assertEqual(class_(msg).get_flags(), 'ODA') 1571 1572 def test_babyl_to_mh(self): 1573 # Convert BabylMessage to MHMessage 1574 pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []), 1575 ('answered', ['replied']), ('forwarded', []), ('edited', []), 1576 ('resent', [])) 1577 for setting, result in pairs: 1578 msg = mailbox.BabylMessage(_sample_message) 1579 msg.add_label(setting) 1580 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), result) 1581 msg = mailbox.BabylMessage(_sample_message) 1582 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1583 'edited', 'resent'): 1584 msg.add_label(label) 1585 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), 1586 ['unseen', 'replied']) 1587 1588 def test_babyl_to_babyl(self): 1589 # Convert BabylMessage to BabylMessage 1590 msg = mailbox.BabylMessage(_sample_message) 1591 msg.update_visible() 1592 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1593 'edited', 'resent'): 1594 msg.add_label(label) 1595 msg2 = mailbox.BabylMessage(msg) 1596 self.assertEqual(msg2.get_labels(), ['unseen', 'deleted', 'filed', 1597 'answered', 'forwarded', 'edited', 1598 'resent']) 1599 self.assertEqual(msg.get_visible().keys(), msg2.get_visible().keys()) 1600 for key in msg.get_visible().keys(): 1601 self.assertEqual(msg.get_visible()[key], msg2.get_visible()[key]) 1602 1603 1604class TestProxyFileBase(TestBase): 1605 1606 def _test_read(self, proxy): 1607 # Read by byte 1608 proxy.seek(0) 1609 self.assertEqual(proxy.read(), 'bar') 1610 proxy.seek(1) 1611 self.assertEqual(proxy.read(), 'ar') 1612 proxy.seek(0) 1613 self.assertEqual(proxy.read(2), 'ba') 1614 proxy.seek(1) 1615 self.assertEqual(proxy.read(-1), 'ar') 1616 proxy.seek(2) 1617 self.assertEqual(proxy.read(1000), 'r') 1618 1619 def _test_readline(self, proxy): 1620 # Read by line 1621 proxy.seek(0) 1622 self.assertEqual(proxy.readline(), 'foo' + os.linesep) 1623 self.assertEqual(proxy.readline(), 'bar' + os.linesep) 1624 self.assertEqual(proxy.readline(), 'fred' + os.linesep) 1625 self.assertEqual(proxy.readline(), 'bob') 1626 proxy.seek(2) 1627 self.assertEqual(proxy.readline(), 'o' + os.linesep) 1628 proxy.seek(6 + 2 * len(os.linesep)) 1629 self.assertEqual(proxy.readline(), 'fred' + os.linesep) 1630 proxy.seek(6 + 2 * len(os.linesep)) 1631 self.assertEqual(proxy.readline(2), 'fr') 1632 self.assertEqual(proxy.readline(-10), 'ed' + os.linesep) 1633 1634 def _test_readlines(self, proxy): 1635 # Read multiple lines 1636 proxy.seek(0) 1637 self.assertEqual(proxy.readlines(), ['foo' + os.linesep, 1638 'bar' + os.linesep, 1639 'fred' + os.linesep, 'bob']) 1640 proxy.seek(0) 1641 self.assertEqual(proxy.readlines(2), ['foo' + os.linesep]) 1642 proxy.seek(3 + len(os.linesep)) 1643 self.assertEqual(proxy.readlines(4 + len(os.linesep)), 1644 ['bar' + os.linesep, 'fred' + os.linesep]) 1645 proxy.seek(3) 1646 self.assertEqual(proxy.readlines(1000), [os.linesep, 'bar' + os.linesep, 1647 'fred' + os.linesep, 'bob']) 1648 1649 def _test_iteration(self, proxy): 1650 # Iterate by line 1651 proxy.seek(0) 1652 iterator = iter(proxy) 1653 self.assertEqual(list(iterator), 1654 ['foo' + os.linesep, 'bar' + os.linesep, 'fred' + os.linesep, 'bob']) 1655 1656 def _test_seek_and_tell(self, proxy): 1657 # Seek and use tell to check position 1658 proxy.seek(3) 1659 self.assertEqual(proxy.tell(), 3) 1660 self.assertEqual(proxy.read(len(os.linesep)), os.linesep) 1661 proxy.seek(2, 1) 1662 self.assertEqual(proxy.read(1 + len(os.linesep)), 'r' + os.linesep) 1663 proxy.seek(-3 - len(os.linesep), 2) 1664 self.assertEqual(proxy.read(3), 'bar') 1665 proxy.seek(2, 0) 1666 self.assertEqual(proxy.read(), 'o' + os.linesep + 'bar' + os.linesep) 1667 proxy.seek(100) 1668 self.assertEqual(proxy.read(), '') 1669 1670 def _test_close(self, proxy): 1671 # Close a file 1672 proxy.close() 1673 self.assertRaises(AttributeError, lambda: proxy.close()) 1674 1675 1676class TestProxyFile(TestProxyFileBase): 1677 1678 def setUp(self): 1679 self._path = test_support.TESTFN 1680 self._file = open(self._path, 'wb+') 1681 1682 def tearDown(self): 1683 self._file.close() 1684 self._delete_recursively(self._path) 1685 1686 def test_initialize(self): 1687 # Initialize and check position 1688 self._file.write('foo') 1689 pos = self._file.tell() 1690 proxy0 = mailbox._ProxyFile(self._file) 1691 self.assertEqual(proxy0.tell(), pos) 1692 self.assertEqual(self._file.tell(), pos) 1693 proxy1 = mailbox._ProxyFile(self._file, 0) 1694 self.assertEqual(proxy1.tell(), 0) 1695 self.assertEqual(self._file.tell(), pos) 1696 1697 def test_read(self): 1698 self._file.write('bar') 1699 self._test_read(mailbox._ProxyFile(self._file)) 1700 1701 def test_readline(self): 1702 self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 1703 os.linesep)) 1704 self._test_readline(mailbox._ProxyFile(self._file)) 1705 1706 def test_readlines(self): 1707 self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 1708 os.linesep)) 1709 self._test_readlines(mailbox._ProxyFile(self._file)) 1710 1711 def test_iteration(self): 1712 self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 1713 os.linesep)) 1714 self._test_iteration(mailbox._ProxyFile(self._file)) 1715 1716 def test_seek_and_tell(self): 1717 self._file.write('foo%sbar%s' % (os.linesep, os.linesep)) 1718 self._test_seek_and_tell(mailbox._ProxyFile(self._file)) 1719 1720 def test_close(self): 1721 self._file.write('foo%sbar%s' % (os.linesep, os.linesep)) 1722 self._test_close(mailbox._ProxyFile(self._file)) 1723 1724 1725class TestPartialFile(TestProxyFileBase): 1726 1727 def setUp(self): 1728 self._path = test_support.TESTFN 1729 self._file = open(self._path, 'wb+') 1730 1731 def tearDown(self): 1732 self._file.close() 1733 self._delete_recursively(self._path) 1734 1735 def test_initialize(self): 1736 # Initialize and check position 1737 self._file.write('foo' + os.linesep + 'bar') 1738 pos = self._file.tell() 1739 proxy = mailbox._PartialFile(self._file, 2, 5) 1740 self.assertEqual(proxy.tell(), 0) 1741 self.assertEqual(self._file.tell(), pos) 1742 1743 def test_read(self): 1744 self._file.write('***bar***') 1745 self._test_read(mailbox._PartialFile(self._file, 3, 6)) 1746 1747 def test_readline(self): 1748 self._file.write('!!!!!foo%sbar%sfred%sbob!!!!!' % 1749 (os.linesep, os.linesep, os.linesep)) 1750 self._test_readline(mailbox._PartialFile(self._file, 5, 1751 18 + 3 * len(os.linesep))) 1752 1753 def test_readlines(self): 1754 self._file.write('foo%sbar%sfred%sbob?????' % 1755 (os.linesep, os.linesep, os.linesep)) 1756 self._test_readlines(mailbox._PartialFile(self._file, 0, 1757 13 + 3 * len(os.linesep))) 1758 1759 def test_iteration(self): 1760 self._file.write('____foo%sbar%sfred%sbob####' % 1761 (os.linesep, os.linesep, os.linesep)) 1762 self._test_iteration(mailbox._PartialFile(self._file, 4, 1763 17 + 3 * len(os.linesep))) 1764 1765 def test_seek_and_tell(self): 1766 self._file.write('(((foo%sbar%s$$$' % (os.linesep, os.linesep)) 1767 self._test_seek_and_tell(mailbox._PartialFile(self._file, 3, 1768 9 + 2 * len(os.linesep))) 1769 1770 def test_close(self): 1771 self._file.write('&foo%sbar%s^' % (os.linesep, os.linesep)) 1772 self._test_close(mailbox._PartialFile(self._file, 1, 1773 6 + 3 * len(os.linesep))) 1774 1775 1776## Start: tests from the original module (for backward compatibility). 1777 1778FROM_ = "From some.body@dummy.domain Sat Jul 24 13:43:35 2004\n" 1779DUMMY_MESSAGE = """\ 1780From: some.body@dummy.domain 1781To: me@my.domain 1782Subject: Simple Test 1783 1784This is a dummy message. 1785""" 1786 1787class MaildirTestCase(unittest.TestCase): 1788 1789 def setUp(self): 1790 # create a new maildir mailbox to work with: 1791 self._dir = test_support.TESTFN 1792 os.mkdir(self._dir) 1793 os.mkdir(os.path.join(self._dir, "cur")) 1794 os.mkdir(os.path.join(self._dir, "tmp")) 1795 os.mkdir(os.path.join(self._dir, "new")) 1796 self._counter = 1 1797 self._msgfiles = [] 1798 1799 def tearDown(self): 1800 map(os.unlink, self._msgfiles) 1801 os.rmdir(os.path.join(self._dir, "cur")) 1802 os.rmdir(os.path.join(self._dir, "tmp")) 1803 os.rmdir(os.path.join(self._dir, "new")) 1804 os.rmdir(self._dir) 1805 1806 def createMessage(self, dir, mbox=False): 1807 t = int(time.time() % 1000000) 1808 pid = self._counter 1809 self._counter += 1 1810 filename = os.extsep.join((str(t), str(pid), "myhostname", "mydomain")) 1811 tmpname = os.path.join(self._dir, "tmp", filename) 1812 newname = os.path.join(self._dir, dir, filename) 1813 with open(tmpname, "w") as fp: 1814 self._msgfiles.append(tmpname) 1815 if mbox: 1816 fp.write(FROM_) 1817 fp.write(DUMMY_MESSAGE) 1818 if hasattr(os, "link"): 1819 os.link(tmpname, newname) 1820 else: 1821 with open(newname, "w") as fp: 1822 fp.write(DUMMY_MESSAGE) 1823 self._msgfiles.append(newname) 1824 return tmpname 1825 1826 def test_empty_maildir(self): 1827 """Test an empty maildir mailbox""" 1828 # Test for regression on bug #117490: 1829 # Make sure the boxes attribute actually gets set. 1830 self.mbox = mailbox.Maildir(test_support.TESTFN) 1831 #self.assertTrue(hasattr(self.mbox, "boxes")) 1832 #self.assertTrue(len(self.mbox.boxes) == 0) 1833 self.assertIs(self.mbox.next(), None) 1834 self.assertIs(self.mbox.next(), None) 1835 1836 def test_nonempty_maildir_cur(self): 1837 self.createMessage("cur") 1838 self.mbox = mailbox.Maildir(test_support.TESTFN) 1839 #self.assertTrue(len(self.mbox.boxes) == 1) 1840 self.assertIsNot(self.mbox.next(), None) 1841 self.assertIs(self.mbox.next(), None) 1842 self.assertIs(self.mbox.next(), None) 1843 1844 def test_nonempty_maildir_new(self): 1845 self.createMessage("new") 1846 self.mbox = mailbox.Maildir(test_support.TESTFN) 1847 #self.assertTrue(len(self.mbox.boxes) == 1) 1848 self.assertIsNot(self.mbox.next(), None) 1849 self.assertIs(self.mbox.next(), None) 1850 self.assertIs(self.mbox.next(), None) 1851 1852 def test_nonempty_maildir_both(self): 1853 self.createMessage("cur") 1854 self.createMessage("new") 1855 self.mbox = mailbox.Maildir(test_support.TESTFN) 1856 #self.assertTrue(len(self.mbox.boxes) == 2) 1857 self.assertIsNot(self.mbox.next(), None) 1858 self.assertIsNot(self.mbox.next(), None) 1859 self.assertIs(self.mbox.next(), None) 1860 self.assertIs(self.mbox.next(), None) 1861 1862 def test_unix_mbox(self): 1863 ### should be better! 1864 import email.parser 1865 fname = self.createMessage("cur", True) 1866 n = 0 1867 for msg in mailbox.PortableUnixMailbox(open(fname), 1868 email.parser.Parser().parse): 1869 n += 1 1870 self.assertEqual(msg["subject"], "Simple Test") 1871 self.assertEqual(len(str(msg)), len(FROM_)+len(DUMMY_MESSAGE)) 1872 self.assertEqual(n, 1) 1873 1874## End: classes from the original module (for backward compatibility). 1875 1876 1877_sample_message = """\ 1878Return-Path: <gkj@gregorykjohnson.com> 1879X-Original-To: gkj+person@localhost 1880Delivered-To: gkj+person@localhost 1881Received: from localhost (localhost [127.0.0.1]) 1882 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 1883 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT) 1884Delivered-To: gkj@sundance.gregorykjohnson.com 1885Received: from localhost [127.0.0.1] 1886 by localhost with POP3 (fetchmail-6.2.5) 1887 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT) 1888Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) 1889 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 1890 for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) 1891Received: by andy.gregorykjohnson.com (Postfix, from userid 1000) 1892 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) 1893Date: Wed, 13 Jul 2005 17:23:11 -0400 1894From: "Gregory K. Johnson" <gkj@gregorykjohnson.com> 1895To: gkj@gregorykjohnson.com 1896Subject: Sample message 1897Message-ID: <20050713212311.GC4701@andy.gregorykjohnson.com> 1898Mime-Version: 1.0 1899Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+" 1900Content-Disposition: inline 1901User-Agent: Mutt/1.5.9i 1902 1903 1904--NMuMz9nt05w80d4+ 1905Content-Type: text/plain; charset=us-ascii 1906Content-Disposition: inline 1907 1908This is a sample message. 1909 1910-- 1911Gregory K. Johnson 1912 1913--NMuMz9nt05w80d4+ 1914Content-Type: application/octet-stream 1915Content-Disposition: attachment; filename="text.gz" 1916Content-Transfer-Encoding: base64 1917 1918H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs 19193FYlAAAA 1920 1921--NMuMz9nt05w80d4+-- 1922""" 1923 1924_sample_headers = { 1925 "Return-Path":"<gkj@gregorykjohnson.com>", 1926 "X-Original-To":"gkj+person@localhost", 1927 "Delivered-To":"gkj+person@localhost", 1928 "Received":"""from localhost (localhost [127.0.0.1]) 1929 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 1930 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", 1931 "Delivered-To":"gkj@sundance.gregorykjohnson.com", 1932 "Received":"""from localhost [127.0.0.1] 1933 by localhost with POP3 (fetchmail-6.2.5) 1934 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", 1935 "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) 1936 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 1937 for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", 1938 "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000) 1939 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", 1940 "Date":"Wed, 13 Jul 2005 17:23:11 -0400", 1941 "From":""""Gregory K. Johnson" <gkj@gregorykjohnson.com>""", 1942 "To":"gkj@gregorykjohnson.com", 1943 "Subject":"Sample message", 1944 "Mime-Version":"1.0", 1945 "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""", 1946 "Content-Disposition":"inline", 1947 "User-Agent": "Mutt/1.5.9i" } 1948 1949_sample_payloads = ("""This is a sample message. 1950 1951-- 1952Gregory K. Johnson 1953""", 1954"""H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs 19553FYlAAAA 1956""") 1957 1958 1959def test_main(): 1960 tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH, 1961 TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage, 1962 TestMHMessage, TestBabylMessage, TestMMDFMessage, 1963 TestMessageConversion, TestProxyFile, TestPartialFile, 1964 MaildirTestCase) 1965 test_support.run_unittest(*tests) 1966 test_support.reap_children() 1967 1968 1969if __name__ == '__main__': 1970 test_main() 1971