1import os 2import sys 3import time 4import stat 5import socket 6import email 7import email.message 8import re 9import io 10import tempfile 11from test import support 12import unittest 13import textwrap 14import mailbox 15import glob 16 17 18class TestBase: 19 20 all_mailbox_types = (mailbox.Message, mailbox.MaildirMessage, 21 mailbox.mboxMessage, mailbox.MHMessage, 22 mailbox.BabylMessage, mailbox.MMDFMessage) 23 24 def _check_sample(self, msg): 25 # Inspect a mailbox.Message representation of the sample message 26 self.assertIsInstance(msg, email.message.Message) 27 self.assertIsInstance(msg, mailbox.Message) 28 for key, value in _sample_headers.items(): 29 self.assertIn(value, msg.get_all(key)) 30 self.assertTrue(msg.is_multipart()) 31 self.assertEqual(len(msg.get_payload()), len(_sample_payloads)) 32 for i, payload in enumerate(_sample_payloads): 33 part = msg.get_payload(i) 34 self.assertIsInstance(part, email.message.Message) 35 self.assertNotIsInstance(part, mailbox.Message) 36 self.assertEqual(part.get_payload(), payload) 37 38 def _delete_recursively(self, target): 39 # Delete a file or delete a directory recursively 40 if os.path.isdir(target): 41 support.rmtree(target) 42 elif os.path.exists(target): 43 support.unlink(target) 44 45 46class TestMailbox(TestBase): 47 48 maxDiff = None 49 50 _factory = None # Overridden by subclasses to reuse tests 51 _template = 'From: foo\n\n%s\n' 52 53 def setUp(self): 54 self._path = support.TESTFN 55 self._delete_recursively(self._path) 56 self._box = self._factory(self._path) 57 58 def tearDown(self): 59 self._box.close() 60 self._delete_recursively(self._path) 61 62 def test_add(self): 63 # Add copies of a sample message 64 keys = [] 65 keys.append(self._box.add(self._template % 0)) 66 self.assertEqual(len(self._box), 1) 67 keys.append(self._box.add(mailbox.Message(_sample_message))) 68 self.assertEqual(len(self._box), 2) 69 keys.append(self._box.add(email.message_from_string(_sample_message))) 70 self.assertEqual(len(self._box), 3) 71 keys.append(self._box.add(io.BytesIO(_bytes_sample_message))) 72 self.assertEqual(len(self._box), 4) 73 keys.append(self._box.add(_sample_message)) 74 self.assertEqual(len(self._box), 5) 75 keys.append(self._box.add(_bytes_sample_message)) 76 self.assertEqual(len(self._box), 6) 77 with self.assertWarns(DeprecationWarning): 78 keys.append(self._box.add( 79 io.TextIOWrapper(io.BytesIO(_bytes_sample_message)))) 80 self.assertEqual(len(self._box), 7) 81 self.assertEqual(self._box.get_string(keys[0]), self._template % 0) 82 for i in (1, 2, 3, 4, 5, 6): 83 self._check_sample(self._box[keys[i]]) 84 85 _nonascii_msg = textwrap.dedent("""\ 86 From: foo 87 Subject: Falinaptár házhozszállítással. Már rendeltél? 88 89 0 90 """) 91 92 def test_add_invalid_8bit_bytes_header(self): 93 key = self._box.add(self._nonascii_msg.encode('latin-1')) 94 self.assertEqual(len(self._box), 1) 95 self.assertEqual(self._box.get_bytes(key), 96 self._nonascii_msg.encode('latin-1')) 97 98 def test_invalid_nonascii_header_as_string(self): 99 subj = self._nonascii_msg.splitlines()[1] 100 key = self._box.add(subj.encode('latin-1')) 101 self.assertEqual(self._box.get_string(key), 102 'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz' 103 'YWwuIE3hciByZW5kZWx06Ww/?=\n\n') 104 105 def test_add_nonascii_string_header_raises(self): 106 with self.assertRaisesRegex(ValueError, "ASCII-only"): 107 self._box.add(self._nonascii_msg) 108 self._box.flush() 109 self.assertEqual(len(self._box), 0) 110 self.assertMailboxEmpty() 111 112 def test_add_that_raises_leaves_mailbox_empty(self): 113 def raiser(*args, **kw): 114 raise Exception("a fake error") 115 support.patch(self, email.generator.BytesGenerator, 'flatten', raiser) 116 with self.assertRaises(Exception): 117 self._box.add(email.message_from_string("From: Alphöso")) 118 self.assertEqual(len(self._box), 0) 119 self._box.close() 120 self.assertMailboxEmpty() 121 122 _non_latin_bin_msg = textwrap.dedent("""\ 123 From: foo@bar.com 124 To: báz 125 Subject: Maintenant je vous présente mon collègue, le pouf célèbre 126 \tJean de Baddie 127 Mime-Version: 1.0 128 Content-Type: text/plain; charset="utf-8" 129 Content-Transfer-Encoding: 8bit 130 131 Да, они летят. 132 """).encode('utf-8') 133 134 def test_add_8bit_body(self): 135 key = self._box.add(self._non_latin_bin_msg) 136 self.assertEqual(self._box.get_bytes(key), 137 self._non_latin_bin_msg) 138 with self._box.get_file(key) as f: 139 self.assertEqual(f.read(), 140 self._non_latin_bin_msg.replace(b'\n', 141 os.linesep.encode())) 142 self.assertEqual(self._box[key].get_payload(), 143 "Да, они летят.\n") 144 145 def test_add_binary_file(self): 146 with tempfile.TemporaryFile('wb+') as f: 147 f.write(_bytes_sample_message) 148 f.seek(0) 149 key = self._box.add(f) 150 self.assertEqual(self._box.get_bytes(key).split(b'\n'), 151 _bytes_sample_message.split(b'\n')) 152 153 def test_add_binary_nonascii_file(self): 154 with tempfile.TemporaryFile('wb+') as f: 155 f.write(self._non_latin_bin_msg) 156 f.seek(0) 157 key = self._box.add(f) 158 self.assertEqual(self._box.get_bytes(key).split(b'\n'), 159 self._non_latin_bin_msg.split(b'\n')) 160 161 def test_add_text_file_warns(self): 162 with tempfile.TemporaryFile('w+') as f: 163 f.write(_sample_message) 164 f.seek(0) 165 with self.assertWarns(DeprecationWarning): 166 key = self._box.add(f) 167 self.assertEqual(self._box.get_bytes(key).split(b'\n'), 168 _bytes_sample_message.split(b'\n')) 169 170 def test_add_StringIO_warns(self): 171 with self.assertWarns(DeprecationWarning): 172 key = self._box.add(io.StringIO(self._template % "0")) 173 self.assertEqual(self._box.get_string(key), self._template % "0") 174 175 def test_add_nonascii_StringIO_raises(self): 176 with self.assertWarns(DeprecationWarning): 177 with self.assertRaisesRegex(ValueError, "ASCII-only"): 178 self._box.add(io.StringIO(self._nonascii_msg)) 179 self.assertEqual(len(self._box), 0) 180 self._box.close() 181 self.assertMailboxEmpty() 182 183 def test_remove(self): 184 # Remove messages using remove() 185 self._test_remove_or_delitem(self._box.remove) 186 187 def test_delitem(self): 188 # Remove messages using __delitem__() 189 self._test_remove_or_delitem(self._box.__delitem__) 190 191 def _test_remove_or_delitem(self, method): 192 # (Used by test_remove() and test_delitem().) 193 key0 = self._box.add(self._template % 0) 194 key1 = self._box.add(self._template % 1) 195 self.assertEqual(len(self._box), 2) 196 method(key0) 197 self.assertEqual(len(self._box), 1) 198 self.assertRaises(KeyError, lambda: self._box[key0]) 199 self.assertRaises(KeyError, lambda: method(key0)) 200 self.assertEqual(self._box.get_string(key1), self._template % 1) 201 key2 = self._box.add(self._template % 2) 202 self.assertEqual(len(self._box), 2) 203 method(key2) 204 self.assertEqual(len(self._box), 1) 205 self.assertRaises(KeyError, lambda: self._box[key2]) 206 self.assertRaises(KeyError, lambda: method(key2)) 207 self.assertEqual(self._box.get_string(key1), self._template % 1) 208 method(key1) 209 self.assertEqual(len(self._box), 0) 210 self.assertRaises(KeyError, lambda: self._box[key1]) 211 self.assertRaises(KeyError, lambda: method(key1)) 212 213 def test_discard(self, repetitions=10): 214 # Discard messages 215 key0 = self._box.add(self._template % 0) 216 key1 = self._box.add(self._template % 1) 217 self.assertEqual(len(self._box), 2) 218 self._box.discard(key0) 219 self.assertEqual(len(self._box), 1) 220 self.assertRaises(KeyError, lambda: self._box[key0]) 221 self._box.discard(key0) 222 self.assertEqual(len(self._box), 1) 223 self.assertRaises(KeyError, lambda: self._box[key0]) 224 225 def test_get(self): 226 # Retrieve messages using get() 227 key0 = self._box.add(self._template % 0) 228 msg = self._box.get(key0) 229 self.assertEqual(msg['from'], 'foo') 230 self.assertEqual(msg.get_payload(), '0\n') 231 self.assertIsNone(self._box.get('foo')) 232 self.assertIs(self._box.get('foo', False), False) 233 self._box.close() 234 self._box = self._factory(self._path) 235 key1 = self._box.add(self._template % 1) 236 msg = self._box.get(key1) 237 self.assertEqual(msg['from'], 'foo') 238 self.assertEqual(msg.get_payload(), '1\n') 239 240 def test_getitem(self): 241 # Retrieve message using __getitem__() 242 key0 = self._box.add(self._template % 0) 243 msg = self._box[key0] 244 self.assertEqual(msg['from'], 'foo') 245 self.assertEqual(msg.get_payload(), '0\n') 246 self.assertRaises(KeyError, lambda: self._box['foo']) 247 self._box.discard(key0) 248 self.assertRaises(KeyError, lambda: self._box[key0]) 249 250 def test_get_message(self): 251 # Get Message representations of messages 252 key0 = self._box.add(self._template % 0) 253 key1 = self._box.add(_sample_message) 254 msg0 = self._box.get_message(key0) 255 self.assertIsInstance(msg0, mailbox.Message) 256 self.assertEqual(msg0['from'], 'foo') 257 self.assertEqual(msg0.get_payload(), '0\n') 258 self._check_sample(self._box.get_message(key1)) 259 260 def test_get_bytes(self): 261 # Get bytes representations of messages 262 key0 = self._box.add(self._template % 0) 263 key1 = self._box.add(_sample_message) 264 self.assertEqual(self._box.get_bytes(key0), 265 (self._template % 0).encode('ascii')) 266 self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message) 267 268 def test_get_string(self): 269 # Get string representations of messages 270 key0 = self._box.add(self._template % 0) 271 key1 = self._box.add(_sample_message) 272 self.assertEqual(self._box.get_string(key0), self._template % 0) 273 self.assertEqual(self._box.get_string(key1).split('\n'), 274 _sample_message.split('\n')) 275 276 def test_get_file(self): 277 # Get file representations of messages 278 key0 = self._box.add(self._template % 0) 279 key1 = self._box.add(_sample_message) 280 with self._box.get_file(key0) as file: 281 data0 = file.read() 282 with self._box.get_file(key1) as file: 283 data1 = file.read() 284 self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'), 285 self._template % 0) 286 self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'), 287 _sample_message) 288 289 def test_get_file_can_be_closed_twice(self): 290 # Issue 11700 291 key = self._box.add(_sample_message) 292 f = self._box.get_file(key) 293 f.close() 294 f.close() 295 296 def test_iterkeys(self): 297 # Get keys using iterkeys() 298 self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False) 299 300 def test_keys(self): 301 # Get keys using keys() 302 self._check_iteration(self._box.keys, do_keys=True, do_values=False) 303 304 def test_itervalues(self): 305 # Get values using itervalues() 306 self._check_iteration(self._box.itervalues, do_keys=False, 307 do_values=True) 308 309 def test_iter(self): 310 # Get values using __iter__() 311 self._check_iteration(self._box.__iter__, do_keys=False, 312 do_values=True) 313 314 def test_values(self): 315 # Get values using values() 316 self._check_iteration(self._box.values, do_keys=False, do_values=True) 317 318 def test_iteritems(self): 319 # Get keys and values using iteritems() 320 self._check_iteration(self._box.iteritems, do_keys=True, 321 do_values=True) 322 323 def test_items(self): 324 # Get keys and values using items() 325 self._check_iteration(self._box.items, do_keys=True, do_values=True) 326 327 def _check_iteration(self, method, do_keys, do_values, repetitions=10): 328 for value in method(): 329 self.fail("Not empty") 330 keys, values = [], [] 331 for i in range(repetitions): 332 keys.append(self._box.add(self._template % i)) 333 values.append(self._template % i) 334 if do_keys and not do_values: 335 returned_keys = list(method()) 336 elif do_values and not do_keys: 337 returned_values = list(method()) 338 else: 339 returned_keys, returned_values = [], [] 340 for key, value in method(): 341 returned_keys.append(key) 342 returned_values.append(value) 343 if do_keys: 344 self.assertEqual(len(keys), len(returned_keys)) 345 self.assertEqual(set(keys), set(returned_keys)) 346 if do_values: 347 count = 0 348 for value in returned_values: 349 self.assertEqual(value['from'], 'foo') 350 self.assertLess(int(value.get_payload()), repetitions) 351 count += 1 352 self.assertEqual(len(values), count) 353 354 def test_contains(self): 355 # Check existence of keys using __contains__() 356 self.assertNotIn('foo', self._box) 357 key0 = self._box.add(self._template % 0) 358 self.assertIn(key0, self._box) 359 self.assertNotIn('foo', self._box) 360 key1 = self._box.add(self._template % 1) 361 self.assertIn(key1, self._box) 362 self.assertIn(key0, self._box) 363 self.assertNotIn('foo', self._box) 364 self._box.remove(key0) 365 self.assertNotIn(key0, self._box) 366 self.assertIn(key1, self._box) 367 self.assertNotIn('foo', self._box) 368 self._box.remove(key1) 369 self.assertNotIn(key1, self._box) 370 self.assertNotIn(key0, self._box) 371 self.assertNotIn('foo', self._box) 372 373 def test_len(self, repetitions=10): 374 # Get message count 375 keys = [] 376 for i in range(repetitions): 377 self.assertEqual(len(self._box), i) 378 keys.append(self._box.add(self._template % i)) 379 self.assertEqual(len(self._box), i + 1) 380 for i in range(repetitions): 381 self.assertEqual(len(self._box), repetitions - i) 382 self._box.remove(keys[i]) 383 self.assertEqual(len(self._box), repetitions - i - 1) 384 385 def test_set_item(self): 386 # Modify messages using __setitem__() 387 key0 = self._box.add(self._template % 'original 0') 388 self.assertEqual(self._box.get_string(key0), 389 self._template % 'original 0') 390 key1 = self._box.add(self._template % 'original 1') 391 self.assertEqual(self._box.get_string(key1), 392 self._template % 'original 1') 393 self._box[key0] = self._template % 'changed 0' 394 self.assertEqual(self._box.get_string(key0), 395 self._template % 'changed 0') 396 self._box[key1] = self._template % 'changed 1' 397 self.assertEqual(self._box.get_string(key1), 398 self._template % 'changed 1') 399 self._box[key0] = _sample_message 400 self._check_sample(self._box[key0]) 401 self._box[key1] = self._box[key0] 402 self._check_sample(self._box[key1]) 403 self._box[key0] = self._template % 'original 0' 404 self.assertEqual(self._box.get_string(key0), 405 self._template % 'original 0') 406 self._check_sample(self._box[key1]) 407 self.assertRaises(KeyError, 408 lambda: self._box.__setitem__('foo', 'bar')) 409 self.assertRaises(KeyError, lambda: self._box['foo']) 410 self.assertEqual(len(self._box), 2) 411 412 def test_clear(self, iterations=10): 413 # Remove all messages using clear() 414 keys = [] 415 for i in range(iterations): 416 self._box.add(self._template % i) 417 for i, key in enumerate(keys): 418 self.assertEqual(self._box.get_string(key), self._template % i) 419 self._box.clear() 420 self.assertEqual(len(self._box), 0) 421 for i, key in enumerate(keys): 422 self.assertRaises(KeyError, lambda: self._box.get_string(key)) 423 424 def test_pop(self): 425 # Get and remove a message using pop() 426 key0 = self._box.add(self._template % 0) 427 self.assertIn(key0, self._box) 428 key1 = self._box.add(self._template % 1) 429 self.assertIn(key1, self._box) 430 self.assertEqual(self._box.pop(key0).get_payload(), '0\n') 431 self.assertNotIn(key0, self._box) 432 self.assertIn(key1, self._box) 433 key2 = self._box.add(self._template % 2) 434 self.assertIn(key2, self._box) 435 self.assertEqual(self._box.pop(key2).get_payload(), '2\n') 436 self.assertNotIn(key2, self._box) 437 self.assertIn(key1, self._box) 438 self.assertEqual(self._box.pop(key1).get_payload(), '1\n') 439 self.assertNotIn(key1, self._box) 440 self.assertEqual(len(self._box), 0) 441 442 def test_popitem(self, iterations=10): 443 # Get and remove an arbitrary (key, message) using popitem() 444 keys = [] 445 for i in range(10): 446 keys.append(self._box.add(self._template % i)) 447 seen = [] 448 for i in range(10): 449 key, msg = self._box.popitem() 450 self.assertIn(key, keys) 451 self.assertNotIn(key, seen) 452 seen.append(key) 453 self.assertEqual(int(msg.get_payload()), keys.index(key)) 454 self.assertEqual(len(self._box), 0) 455 for key in keys: 456 self.assertRaises(KeyError, lambda: self._box[key]) 457 458 def test_update(self): 459 # Modify multiple messages using update() 460 key0 = self._box.add(self._template % 'original 0') 461 key1 = self._box.add(self._template % 'original 1') 462 key2 = self._box.add(self._template % 'original 2') 463 self._box.update({key0: self._template % 'changed 0', 464 key2: _sample_message}) 465 self.assertEqual(len(self._box), 3) 466 self.assertEqual(self._box.get_string(key0), 467 self._template % 'changed 0') 468 self.assertEqual(self._box.get_string(key1), 469 self._template % 'original 1') 470 self._check_sample(self._box[key2]) 471 self._box.update([(key2, self._template % 'changed 2'), 472 (key1, self._template % 'changed 1'), 473 (key0, self._template % 'original 0')]) 474 self.assertEqual(len(self._box), 3) 475 self.assertEqual(self._box.get_string(key0), 476 self._template % 'original 0') 477 self.assertEqual(self._box.get_string(key1), 478 self._template % 'changed 1') 479 self.assertEqual(self._box.get_string(key2), 480 self._template % 'changed 2') 481 self.assertRaises(KeyError, 482 lambda: self._box.update({'foo': 'bar', 483 key0: self._template % "changed 0"})) 484 self.assertEqual(len(self._box), 3) 485 self.assertEqual(self._box.get_string(key0), 486 self._template % "changed 0") 487 self.assertEqual(self._box.get_string(key1), 488 self._template % "changed 1") 489 self.assertEqual(self._box.get_string(key2), 490 self._template % "changed 2") 491 492 def test_flush(self): 493 # Write changes to disk 494 self._test_flush_or_close(self._box.flush, True) 495 496 def test_popitem_and_flush_twice(self): 497 # See #15036. 498 self._box.add(self._template % 0) 499 self._box.add(self._template % 1) 500 self._box.flush() 501 502 self._box.popitem() 503 self._box.flush() 504 self._box.popitem() 505 self._box.flush() 506 507 def test_lock_unlock(self): 508 # Lock and unlock the mailbox 509 self.assertFalse(os.path.exists(self._get_lock_path())) 510 self._box.lock() 511 self.assertTrue(os.path.exists(self._get_lock_path())) 512 self._box.unlock() 513 self.assertFalse(os.path.exists(self._get_lock_path())) 514 515 def test_close(self): 516 # Close mailbox and flush changes to disk 517 self._test_flush_or_close(self._box.close, False) 518 519 def _test_flush_or_close(self, method, should_call_close): 520 contents = [self._template % i for i in range(3)] 521 self._box.add(contents[0]) 522 self._box.add(contents[1]) 523 self._box.add(contents[2]) 524 oldbox = self._box 525 method() 526 if should_call_close: 527 self._box.close() 528 self._box = self._factory(self._path) 529 keys = self._box.keys() 530 self.assertEqual(len(keys), 3) 531 for key in keys: 532 self.assertIn(self._box.get_string(key), contents) 533 oldbox.close() 534 535 def test_dump_message(self): 536 # Write message representations to disk 537 for input in (email.message_from_string(_sample_message), 538 _sample_message, io.BytesIO(_bytes_sample_message)): 539 output = io.BytesIO() 540 self._box._dump_message(input, output) 541 self.assertEqual(output.getvalue(), 542 _bytes_sample_message.replace(b'\n', os.linesep.encode())) 543 output = io.BytesIO() 544 self.assertRaises(TypeError, 545 lambda: self._box._dump_message(None, output)) 546 547 def _get_lock_path(self): 548 # Return the path of the dot lock file. May be overridden. 549 return self._path + '.lock' 550 551 552class TestMailboxSuperclass(TestBase, unittest.TestCase): 553 554 def test_notimplemented(self): 555 # Test that all Mailbox methods raise NotImplementedException. 556 box = mailbox.Mailbox('path') 557 self.assertRaises(NotImplementedError, lambda: box.add('')) 558 self.assertRaises(NotImplementedError, lambda: box.remove('')) 559 self.assertRaises(NotImplementedError, lambda: box.__delitem__('')) 560 self.assertRaises(NotImplementedError, lambda: box.discard('')) 561 self.assertRaises(NotImplementedError, lambda: box.__setitem__('', '')) 562 self.assertRaises(NotImplementedError, lambda: box.iterkeys()) 563 self.assertRaises(NotImplementedError, lambda: box.keys()) 564 self.assertRaises(NotImplementedError, lambda: box.itervalues().__next__()) 565 self.assertRaises(NotImplementedError, lambda: box.__iter__().__next__()) 566 self.assertRaises(NotImplementedError, lambda: box.values()) 567 self.assertRaises(NotImplementedError, lambda: box.iteritems().__next__()) 568 self.assertRaises(NotImplementedError, lambda: box.items()) 569 self.assertRaises(NotImplementedError, lambda: box.get('')) 570 self.assertRaises(NotImplementedError, lambda: box.__getitem__('')) 571 self.assertRaises(NotImplementedError, lambda: box.get_message('')) 572 self.assertRaises(NotImplementedError, lambda: box.get_string('')) 573 self.assertRaises(NotImplementedError, lambda: box.get_bytes('')) 574 self.assertRaises(NotImplementedError, lambda: box.get_file('')) 575 self.assertRaises(NotImplementedError, lambda: '' in box) 576 self.assertRaises(NotImplementedError, lambda: box.__contains__('')) 577 self.assertRaises(NotImplementedError, lambda: box.__len__()) 578 self.assertRaises(NotImplementedError, lambda: box.clear()) 579 self.assertRaises(NotImplementedError, lambda: box.pop('')) 580 self.assertRaises(NotImplementedError, lambda: box.popitem()) 581 self.assertRaises(NotImplementedError, lambda: box.update((('', ''),))) 582 self.assertRaises(NotImplementedError, lambda: box.flush()) 583 self.assertRaises(NotImplementedError, lambda: box.lock()) 584 self.assertRaises(NotImplementedError, lambda: box.unlock()) 585 self.assertRaises(NotImplementedError, lambda: box.close()) 586 587 588class TestMaildir(TestMailbox, unittest.TestCase): 589 590 _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory) 591 592 def setUp(self): 593 TestMailbox.setUp(self) 594 if (os.name == 'nt') or (sys.platform == 'cygwin'): 595 self._box.colon = '!' 596 597 def assertMailboxEmpty(self): 598 self.assertEqual(os.listdir(os.path.join(self._path, 'tmp')), []) 599 600 def test_add_MM(self): 601 # Add a MaildirMessage instance 602 msg = mailbox.MaildirMessage(self._template % 0) 603 msg.set_subdir('cur') 604 msg.set_info('foo') 605 key = self._box.add(msg) 606 self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' % 607 (key, self._box.colon)))) 608 609 def test_get_MM(self): 610 # Get a MaildirMessage instance 611 msg = mailbox.MaildirMessage(self._template % 0) 612 msg.set_subdir('cur') 613 msg.set_flags('RF') 614 key = self._box.add(msg) 615 msg_returned = self._box.get_message(key) 616 self.assertIsInstance(msg_returned, mailbox.MaildirMessage) 617 self.assertEqual(msg_returned.get_subdir(), 'cur') 618 self.assertEqual(msg_returned.get_flags(), 'FR') 619 620 def test_set_MM(self): 621 # Set with a MaildirMessage instance 622 msg0 = mailbox.MaildirMessage(self._template % 0) 623 msg0.set_flags('TP') 624 key = self._box.add(msg0) 625 msg_returned = self._box.get_message(key) 626 self.assertEqual(msg_returned.get_subdir(), 'new') 627 self.assertEqual(msg_returned.get_flags(), 'PT') 628 msg1 = mailbox.MaildirMessage(self._template % 1) 629 self._box[key] = msg1 630 msg_returned = self._box.get_message(key) 631 self.assertEqual(msg_returned.get_subdir(), 'new') 632 self.assertEqual(msg_returned.get_flags(), '') 633 self.assertEqual(msg_returned.get_payload(), '1\n') 634 msg2 = mailbox.MaildirMessage(self._template % 2) 635 msg2.set_info('2,S') 636 self._box[key] = msg2 637 self._box[key] = self._template % 3 638 msg_returned = self._box.get_message(key) 639 self.assertEqual(msg_returned.get_subdir(), 'new') 640 self.assertEqual(msg_returned.get_flags(), 'S') 641 self.assertEqual(msg_returned.get_payload(), '3\n') 642 643 def test_consistent_factory(self): 644 # Add a message. 645 msg = mailbox.MaildirMessage(self._template % 0) 646 msg.set_subdir('cur') 647 msg.set_flags('RF') 648 key = self._box.add(msg) 649 650 # Create new mailbox with 651 class FakeMessage(mailbox.MaildirMessage): 652 pass 653 box = mailbox.Maildir(self._path, factory=FakeMessage) 654 box.colon = self._box.colon 655 msg2 = box.get_message(key) 656 self.assertIsInstance(msg2, FakeMessage) 657 658 def test_initialize_new(self): 659 # Initialize a non-existent mailbox 660 self.tearDown() 661 self._box = mailbox.Maildir(self._path) 662 self._check_basics() 663 self._delete_recursively(self._path) 664 self._box = self._factory(self._path, factory=None) 665 self._check_basics() 666 667 def test_initialize_existing(self): 668 # Initialize an existing mailbox 669 self.tearDown() 670 for subdir in '', 'tmp', 'new', 'cur': 671 os.mkdir(os.path.normpath(os.path.join(self._path, subdir))) 672 self._box = mailbox.Maildir(self._path) 673 self._check_basics() 674 675 def _check_basics(self, factory=None): 676 # (Used by test_open_new() and test_open_existing().) 677 self.assertEqual(self._box._path, os.path.abspath(self._path)) 678 self.assertEqual(self._box._factory, factory) 679 for subdir in '', 'tmp', 'new', 'cur': 680 path = os.path.join(self._path, subdir) 681 mode = os.stat(path)[stat.ST_MODE] 682 self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path) 683 684 def test_list_folders(self): 685 # List folders 686 self._box.add_folder('one') 687 self._box.add_folder('two') 688 self._box.add_folder('three') 689 self.assertEqual(len(self._box.list_folders()), 3) 690 self.assertEqual(set(self._box.list_folders()), 691 set(('one', 'two', 'three'))) 692 693 def test_get_folder(self): 694 # Open folders 695 self._box.add_folder('foo.bar') 696 folder0 = self._box.get_folder('foo.bar') 697 folder0.add(self._template % 'bar') 698 self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar'))) 699 folder1 = self._box.get_folder('foo.bar') 700 self.assertEqual(folder1.get_string(folder1.keys()[0]), 701 self._template % 'bar') 702 703 def test_add_and_remove_folders(self): 704 # Delete folders 705 self._box.add_folder('one') 706 self._box.add_folder('two') 707 self.assertEqual(len(self._box.list_folders()), 2) 708 self.assertEqual(set(self._box.list_folders()), set(('one', 'two'))) 709 self._box.remove_folder('one') 710 self.assertEqual(len(self._box.list_folders()), 1) 711 self.assertEqual(set(self._box.list_folders()), set(('two',))) 712 self._box.add_folder('three') 713 self.assertEqual(len(self._box.list_folders()), 2) 714 self.assertEqual(set(self._box.list_folders()), set(('two', 'three'))) 715 self._box.remove_folder('three') 716 self.assertEqual(len(self._box.list_folders()), 1) 717 self.assertEqual(set(self._box.list_folders()), set(('two',))) 718 self._box.remove_folder('two') 719 self.assertEqual(len(self._box.list_folders()), 0) 720 self.assertEqual(self._box.list_folders(), []) 721 722 def test_clean(self): 723 # Remove old files from 'tmp' 724 foo_path = os.path.join(self._path, 'tmp', 'foo') 725 bar_path = os.path.join(self._path, 'tmp', 'bar') 726 with open(foo_path, 'w') as f: 727 f.write("@") 728 with open(bar_path, 'w') as f: 729 f.write("@") 730 self._box.clean() 731 self.assertTrue(os.path.exists(foo_path)) 732 self.assertTrue(os.path.exists(bar_path)) 733 foo_stat = os.stat(foo_path) 734 os.utime(foo_path, (time.time() - 129600 - 2, 735 foo_stat.st_mtime)) 736 self._box.clean() 737 self.assertFalse(os.path.exists(foo_path)) 738 self.assertTrue(os.path.exists(bar_path)) 739 740 def test_create_tmp(self, repetitions=10): 741 # Create files in tmp directory 742 hostname = socket.gethostname() 743 if '/' in hostname: 744 hostname = hostname.replace('/', r'\057') 745 if ':' in hostname: 746 hostname = hostname.replace(':', r'\072') 747 pid = os.getpid() 748 pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)" 749 r"Q(?P<Q>\d+)\.(?P<host>[^:/]*)") 750 previous_groups = None 751 for x in range(repetitions): 752 tmp_file = self._box._create_tmp() 753 head, tail = os.path.split(tmp_file.name) 754 self.assertEqual(head, os.path.abspath(os.path.join(self._path, 755 "tmp")), 756 "File in wrong location: '%s'" % head) 757 match = pattern.match(tail) 758 self.assertIsNotNone(match, "Invalid file name: '%s'" % tail) 759 groups = match.groups() 760 if previous_groups is not None: 761 self.assertGreaterEqual(int(groups[0]), int(previous_groups[0]), 762 "Non-monotonic seconds: '%s' before '%s'" % 763 (previous_groups[0], groups[0])) 764 if int(groups[0]) == int(previous_groups[0]): 765 self.assertGreaterEqual(int(groups[1]), int(previous_groups[1]), 766 "Non-monotonic milliseconds: '%s' before '%s'" % 767 (previous_groups[1], groups[1])) 768 self.assertEqual(int(groups[2]), pid, 769 "Process ID mismatch: '%s' should be '%s'" % 770 (groups[2], pid)) 771 self.assertEqual(int(groups[3]), int(previous_groups[3]) + 1, 772 "Non-sequential counter: '%s' before '%s'" % 773 (previous_groups[3], groups[3])) 774 self.assertEqual(groups[4], hostname, 775 "Host name mismatch: '%s' should be '%s'" % 776 (groups[4], hostname)) 777 previous_groups = groups 778 tmp_file.write(_bytes_sample_message) 779 tmp_file.seek(0) 780 self.assertEqual(tmp_file.read(), _bytes_sample_message) 781 tmp_file.close() 782 file_count = len(os.listdir(os.path.join(self._path, "tmp"))) 783 self.assertEqual(file_count, repetitions, 784 "Wrong file count: '%s' should be '%s'" % 785 (file_count, repetitions)) 786 787 def test_refresh(self): 788 # Update the table of contents 789 self.assertEqual(self._box._toc, {}) 790 key0 = self._box.add(self._template % 0) 791 key1 = self._box.add(self._template % 1) 792 self.assertEqual(self._box._toc, {}) 793 self._box._refresh() 794 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 795 key1: os.path.join('new', key1)}) 796 key2 = self._box.add(self._template % 2) 797 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 798 key1: os.path.join('new', key1)}) 799 self._box._refresh() 800 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 801 key1: os.path.join('new', key1), 802 key2: os.path.join('new', key2)}) 803 804 def test_refresh_after_safety_period(self): 805 # Issue #13254: Call _refresh after the "file system safety 806 # period" of 2 seconds has passed; _toc should still be 807 # updated because this is the first call to _refresh. 808 key0 = self._box.add(self._template % 0) 809 key1 = self._box.add(self._template % 1) 810 811 self._box = self._factory(self._path) 812 self.assertEqual(self._box._toc, {}) 813 814 # Emulate sleeping. Instead of sleeping for 2 seconds, use the 815 # skew factor to make _refresh think that the filesystem 816 # safety period has passed and re-reading the _toc is only 817 # required if mtimes differ. 818 self._box._skewfactor = -3 819 820 self._box._refresh() 821 self.assertEqual(sorted(self._box._toc.keys()), sorted([key0, key1])) 822 823 def test_lookup(self): 824 # Look up message subpaths in the TOC 825 self.assertRaises(KeyError, lambda: self._box._lookup('foo')) 826 key0 = self._box.add(self._template % 0) 827 self.assertEqual(self._box._lookup(key0), os.path.join('new', key0)) 828 os.remove(os.path.join(self._path, 'new', key0)) 829 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0)}) 830 # Be sure that the TOC is read back from disk (see issue #6896 831 # about bad mtime behaviour on some systems). 832 self._box.flush() 833 self.assertRaises(KeyError, lambda: self._box._lookup(key0)) 834 self.assertEqual(self._box._toc, {}) 835 836 def test_lock_unlock(self): 837 # Lock and unlock the mailbox. For Maildir, this does nothing. 838 self._box.lock() 839 self._box.unlock() 840 841 def test_folder (self): 842 # Test for bug #1569790: verify that folders returned by .get_folder() 843 # use the same factory function. 844 def dummy_factory (s): 845 return None 846 box = self._factory(self._path, factory=dummy_factory) 847 folder = box.add_folder('folder1') 848 self.assertIs(folder._factory, dummy_factory) 849 850 folder1_alias = box.get_folder('folder1') 851 self.assertIs(folder1_alias._factory, dummy_factory) 852 853 def test_directory_in_folder (self): 854 # Test that mailboxes still work if there's a stray extra directory 855 # in a folder. 856 for i in range(10): 857 self._box.add(mailbox.Message(_sample_message)) 858 859 # Create a stray directory 860 os.mkdir(os.path.join(self._path, 'cur', 'stray-dir')) 861 862 # Check that looping still works with the directory present. 863 for msg in self._box: 864 pass 865 866 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 867 def test_file_permissions(self): 868 # Verify that message files are created without execute permissions 869 msg = mailbox.MaildirMessage(self._template % 0) 870 orig_umask = os.umask(0) 871 try: 872 key = self._box.add(msg) 873 finally: 874 os.umask(orig_umask) 875 path = os.path.join(self._path, self._box._lookup(key)) 876 mode = os.stat(path).st_mode 877 self.assertFalse(mode & 0o111) 878 879 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 880 def test_folder_file_perms(self): 881 # From bug #3228, we want to verify that the file created inside a Maildir 882 # subfolder isn't marked as executable. 883 orig_umask = os.umask(0) 884 try: 885 subfolder = self._box.add_folder('subfolder') 886 finally: 887 os.umask(orig_umask) 888 889 path = os.path.join(subfolder._path, 'maildirfolder') 890 st = os.stat(path) 891 perms = st.st_mode 892 self.assertFalse((perms & 0o111)) # Execute bits should all be off. 893 894 def test_reread(self): 895 # Do an initial unconditional refresh 896 self._box._refresh() 897 898 # Put the last modified times more than two seconds into the past 899 # (because mtime may have a two second granularity) 900 for subdir in ('cur', 'new'): 901 os.utime(os.path.join(self._box._path, subdir), 902 (time.time()-5,)*2) 903 904 # Because mtime has a two second granularity in worst case (FAT), a 905 # refresh is done unconditionally if called for within 906 # two-second-plus-a-bit of the last one, just in case the mbox has 907 # changed; so now we have to wait for that interval to expire. 908 # 909 # Because this is a test, emulate sleeping. Instead of 910 # sleeping for 2 seconds, use the skew factor to make _refresh 911 # think that 2 seconds have passed and re-reading the _toc is 912 # only required if mtimes differ. 913 self._box._skewfactor = -3 914 915 # Re-reading causes the ._toc attribute to be assigned a new dictionary 916 # object, so we'll check that the ._toc attribute isn't a different 917 # object. 918 orig_toc = self._box._toc 919 def refreshed(): 920 return self._box._toc is not orig_toc 921 922 self._box._refresh() 923 self.assertFalse(refreshed()) 924 925 # Now, write something into cur and remove it. This changes 926 # the mtime and should cause a re-read. Note that "sleep 927 # emulation" is still in effect, as skewfactor is -3. 928 filename = os.path.join(self._path, 'cur', 'stray-file') 929 support.create_empty_file(filename) 930 os.unlink(filename) 931 self._box._refresh() 932 self.assertTrue(refreshed()) 933 934 935class _TestSingleFile(TestMailbox): 936 '''Common tests for single-file mailboxes''' 937 938 def test_add_doesnt_rewrite(self): 939 # When only adding messages, flush() should not rewrite the 940 # mailbox file. See issue #9559. 941 942 # Inode number changes if the contents are written to another 943 # file which is then renamed over the original file. So we 944 # must check that the inode number doesn't change. 945 inode_before = os.stat(self._path).st_ino 946 947 self._box.add(self._template % 0) 948 self._box.flush() 949 950 inode_after = os.stat(self._path).st_ino 951 self.assertEqual(inode_before, inode_after) 952 953 # Make sure the message was really added 954 self._box.close() 955 self._box = self._factory(self._path) 956 self.assertEqual(len(self._box), 1) 957 958 def test_permissions_after_flush(self): 959 # See issue #5346 960 961 # Make the mailbox world writable. It's unlikely that the new 962 # mailbox file would have these permissions after flush(), 963 # because umask usually prevents it. 964 mode = os.stat(self._path).st_mode | 0o666 965 os.chmod(self._path, mode) 966 967 self._box.add(self._template % 0) 968 i = self._box.add(self._template % 1) 969 # Need to remove one message to make flush() create a new file 970 self._box.remove(i) 971 self._box.flush() 972 973 self.assertEqual(os.stat(self._path).st_mode, mode) 974 975 976class _TestMboxMMDF(_TestSingleFile): 977 978 def tearDown(self): 979 super().tearDown() 980 self._box.close() 981 self._delete_recursively(self._path) 982 for lock_remnant in glob.glob(glob.escape(self._path) + '.*'): 983 support.unlink(lock_remnant) 984 985 def assertMailboxEmpty(self): 986 with open(self._path) as f: 987 self.assertEqual(f.readlines(), []) 988 989 def test_get_bytes_from(self): 990 # Get bytes representations of messages with _unixfrom. 991 unixfrom = 'From foo@bar blah\n' 992 key0 = self._box.add(unixfrom + self._template % 0) 993 key1 = self._box.add(unixfrom + _sample_message) 994 self.assertEqual(self._box.get_bytes(key0, from_=False), 995 (self._template % 0).encode('ascii')) 996 self.assertEqual(self._box.get_bytes(key1, from_=False), 997 _bytes_sample_message) 998 self.assertEqual(self._box.get_bytes(key0, from_=True), 999 (unixfrom + self._template % 0).encode('ascii')) 1000 self.assertEqual(self._box.get_bytes(key1, from_=True), 1001 unixfrom.encode('ascii') + _bytes_sample_message) 1002 1003 def test_get_string_from(self): 1004 # Get string representations of messages with _unixfrom. 1005 unixfrom = 'From foo@bar blah\n' 1006 key0 = self._box.add(unixfrom + self._template % 0) 1007 key1 = self._box.add(unixfrom + _sample_message) 1008 self.assertEqual(self._box.get_string(key0, from_=False), 1009 self._template % 0) 1010 self.assertEqual(self._box.get_string(key1, from_=False).split('\n'), 1011 _sample_message.split('\n')) 1012 self.assertEqual(self._box.get_string(key0, from_=True), 1013 unixfrom + self._template % 0) 1014 self.assertEqual(self._box.get_string(key1, from_=True).split('\n'), 1015 (unixfrom + _sample_message).split('\n')) 1016 1017 def test_add_from_string(self): 1018 # Add a string starting with 'From ' to the mailbox 1019 key = self._box.add('From foo@bar blah\nFrom: foo\n\n0\n') 1020 self.assertEqual(self._box[key].get_from(), 'foo@bar blah') 1021 self.assertEqual(self._box[key].get_payload(), '0\n') 1022 1023 def test_add_from_bytes(self): 1024 # Add a byte string starting with 'From ' to the mailbox 1025 key = self._box.add(b'From foo@bar blah\nFrom: foo\n\n0\n') 1026 self.assertEqual(self._box[key].get_from(), 'foo@bar blah') 1027 self.assertEqual(self._box[key].get_payload(), '0\n') 1028 1029 def test_add_mbox_or_mmdf_message(self): 1030 # Add an mboxMessage or MMDFMessage 1031 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1032 msg = class_('From foo@bar blah\nFrom: foo\n\n0\n') 1033 key = self._box.add(msg) 1034 1035 def test_open_close_open(self): 1036 # Open and inspect previously-created mailbox 1037 values = [self._template % i for i in range(3)] 1038 for value in values: 1039 self._box.add(value) 1040 self._box.close() 1041 mtime = os.path.getmtime(self._path) 1042 self._box = self._factory(self._path) 1043 self.assertEqual(len(self._box), 3) 1044 for key in self._box.iterkeys(): 1045 self.assertIn(self._box.get_string(key), values) 1046 self._box.close() 1047 self.assertEqual(mtime, os.path.getmtime(self._path)) 1048 1049 def test_add_and_close(self): 1050 # Verifying that closing a mailbox doesn't change added items 1051 self._box.add(_sample_message) 1052 for i in range(3): 1053 self._box.add(self._template % i) 1054 self._box.add(_sample_message) 1055 self._box._file.flush() 1056 self._box._file.seek(0) 1057 contents = self._box._file.read() 1058 self._box.close() 1059 with open(self._path, 'rb') as f: 1060 self.assertEqual(contents, f.read()) 1061 self._box = self._factory(self._path) 1062 1063 @unittest.skipUnless(hasattr(os, 'fork'), "Test needs fork().") 1064 @unittest.skipUnless(hasattr(socket, 'socketpair'), "Test needs socketpair().") 1065 def test_lock_conflict(self): 1066 # Fork off a child process that will lock the mailbox temporarily, 1067 # unlock it and exit. 1068 c, p = socket.socketpair() 1069 self.addCleanup(c.close) 1070 self.addCleanup(p.close) 1071 1072 pid = os.fork() 1073 if pid == 0: 1074 # child 1075 try: 1076 # lock the mailbox, and signal the parent it can proceed 1077 self._box.lock() 1078 c.send(b'c') 1079 1080 # wait until the parent is done, and unlock the mailbox 1081 c.recv(1) 1082 self._box.unlock() 1083 finally: 1084 os._exit(0) 1085 1086 # In the parent, wait until the child signals it locked the mailbox. 1087 p.recv(1) 1088 try: 1089 self.assertRaises(mailbox.ExternalClashError, 1090 self._box.lock) 1091 finally: 1092 # Signal the child it can now release the lock and exit. 1093 p.send(b'p') 1094 # Wait for child to exit. Locking should now succeed. 1095 exited_pid, status = os.waitpid(pid, 0) 1096 1097 self._box.lock() 1098 self._box.unlock() 1099 1100 def test_relock(self): 1101 # Test case for bug #1575506: the mailbox class was locking the 1102 # wrong file object in its flush() method. 1103 msg = "Subject: sub\n\nbody\n" 1104 key1 = self._box.add(msg) 1105 self._box.flush() 1106 self._box.close() 1107 1108 self._box = self._factory(self._path) 1109 self._box.lock() 1110 key2 = self._box.add(msg) 1111 self._box.flush() 1112 self.assertTrue(self._box._locked) 1113 self._box.close() 1114 1115 1116class TestMbox(_TestMboxMMDF, unittest.TestCase): 1117 1118 _factory = lambda self, path, factory=None: mailbox.mbox(path, factory) 1119 1120 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 1121 def test_file_perms(self): 1122 # From bug #3228, we want to verify that the mailbox file isn't executable, 1123 # even if the umask is set to something that would leave executable bits set. 1124 # We only run this test on platforms that support umask. 1125 try: 1126 old_umask = os.umask(0o077) 1127 self._box.close() 1128 os.unlink(self._path) 1129 self._box = mailbox.mbox(self._path, create=True) 1130 self._box.add('') 1131 self._box.close() 1132 finally: 1133 os.umask(old_umask) 1134 1135 st = os.stat(self._path) 1136 perms = st.st_mode 1137 self.assertFalse((perms & 0o111)) # Execute bits should all be off. 1138 1139 def test_terminating_newline(self): 1140 message = email.message.Message() 1141 message['From'] = 'john@example.com' 1142 message.set_payload('No newline at the end') 1143 i = self._box.add(message) 1144 1145 # A newline should have been appended to the payload 1146 message = self._box.get(i) 1147 self.assertEqual(message.get_payload(), 'No newline at the end\n') 1148 1149 def test_message_separator(self): 1150 # Check there's always a single blank line after each message 1151 self._box.add('From: foo\n\n0') # No newline at the end 1152 with open(self._path) as f: 1153 data = f.read() 1154 self.assertEqual(data[-3:], '0\n\n') 1155 1156 self._box.add('From: foo\n\n0\n') # Newline at the end 1157 with open(self._path) as f: 1158 data = f.read() 1159 self.assertEqual(data[-3:], '0\n\n') 1160 1161 1162class TestMMDF(_TestMboxMMDF, unittest.TestCase): 1163 1164 _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory) 1165 1166 1167class TestMH(TestMailbox, unittest.TestCase): 1168 1169 _factory = lambda self, path, factory=None: mailbox.MH(path, factory) 1170 1171 def assertMailboxEmpty(self): 1172 self.assertEqual(os.listdir(self._path), ['.mh_sequences']) 1173 1174 def test_list_folders(self): 1175 # List folders 1176 self._box.add_folder('one') 1177 self._box.add_folder('two') 1178 self._box.add_folder('three') 1179 self.assertEqual(len(self._box.list_folders()), 3) 1180 self.assertEqual(set(self._box.list_folders()), 1181 set(('one', 'two', 'three'))) 1182 1183 def test_get_folder(self): 1184 # Open folders 1185 def dummy_factory (s): 1186 return None 1187 self._box = self._factory(self._path, dummy_factory) 1188 1189 new_folder = self._box.add_folder('foo.bar') 1190 folder0 = self._box.get_folder('foo.bar') 1191 folder0.add(self._template % 'bar') 1192 self.assertTrue(os.path.isdir(os.path.join(self._path, 'foo.bar'))) 1193 folder1 = self._box.get_folder('foo.bar') 1194 self.assertEqual(folder1.get_string(folder1.keys()[0]), 1195 self._template % 'bar') 1196 1197 # Test for bug #1569790: verify that folders returned by .get_folder() 1198 # use the same factory function. 1199 self.assertIs(new_folder._factory, self._box._factory) 1200 self.assertIs(folder0._factory, self._box._factory) 1201 1202 def test_add_and_remove_folders(self): 1203 # Delete folders 1204 self._box.add_folder('one') 1205 self._box.add_folder('two') 1206 self.assertEqual(len(self._box.list_folders()), 2) 1207 self.assertEqual(set(self._box.list_folders()), set(('one', 'two'))) 1208 self._box.remove_folder('one') 1209 self.assertEqual(len(self._box.list_folders()), 1) 1210 self.assertEqual(set(self._box.list_folders()), set(('two',))) 1211 self._box.add_folder('three') 1212 self.assertEqual(len(self._box.list_folders()), 2) 1213 self.assertEqual(set(self._box.list_folders()), set(('two', 'three'))) 1214 self._box.remove_folder('three') 1215 self.assertEqual(len(self._box.list_folders()), 1) 1216 self.assertEqual(set(self._box.list_folders()), set(('two',))) 1217 self._box.remove_folder('two') 1218 self.assertEqual(len(self._box.list_folders()), 0) 1219 self.assertEqual(self._box.list_folders(), []) 1220 1221 def test_sequences(self): 1222 # Get and set sequences 1223 self.assertEqual(self._box.get_sequences(), {}) 1224 msg0 = mailbox.MHMessage(self._template % 0) 1225 msg0.add_sequence('foo') 1226 key0 = self._box.add(msg0) 1227 self.assertEqual(self._box.get_sequences(), {'foo':[key0]}) 1228 msg1 = mailbox.MHMessage(self._template % 1) 1229 msg1.set_sequences(['bar', 'replied', 'foo']) 1230 key1 = self._box.add(msg1) 1231 self.assertEqual(self._box.get_sequences(), 1232 {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]}) 1233 msg0.set_sequences(['flagged']) 1234 self._box[key0] = msg0 1235 self.assertEqual(self._box.get_sequences(), 1236 {'foo':[key1], 'bar':[key1], 'replied':[key1], 1237 'flagged':[key0]}) 1238 self._box.remove(key1) 1239 self.assertEqual(self._box.get_sequences(), {'flagged':[key0]}) 1240 1241 def test_issue2625(self): 1242 msg0 = mailbox.MHMessage(self._template % 0) 1243 msg0.add_sequence('foo') 1244 key0 = self._box.add(msg0) 1245 refmsg0 = self._box.get_message(key0) 1246 1247 def test_issue7627(self): 1248 msg0 = mailbox.MHMessage(self._template % 0) 1249 key0 = self._box.add(msg0) 1250 self._box.lock() 1251 self._box.remove(key0) 1252 self._box.unlock() 1253 1254 def test_pack(self): 1255 # Pack the contents of the mailbox 1256 msg0 = mailbox.MHMessage(self._template % 0) 1257 msg1 = mailbox.MHMessage(self._template % 1) 1258 msg2 = mailbox.MHMessage(self._template % 2) 1259 msg3 = mailbox.MHMessage(self._template % 3) 1260 msg0.set_sequences(['foo', 'unseen']) 1261 msg1.set_sequences(['foo']) 1262 msg2.set_sequences(['foo', 'flagged']) 1263 msg3.set_sequences(['foo', 'bar', 'replied']) 1264 key0 = self._box.add(msg0) 1265 key1 = self._box.add(msg1) 1266 key2 = self._box.add(msg2) 1267 key3 = self._box.add(msg3) 1268 self.assertEqual(self._box.get_sequences(), 1269 {'foo':[key0,key1,key2,key3], 'unseen':[key0], 1270 'flagged':[key2], 'bar':[key3], 'replied':[key3]}) 1271 self._box.remove(key2) 1272 self.assertEqual(self._box.get_sequences(), 1273 {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3], 1274 'replied':[key3]}) 1275 self._box.pack() 1276 self.assertEqual(self._box.keys(), [1, 2, 3]) 1277 key0 = key0 1278 key1 = key0 + 1 1279 key2 = key1 + 1 1280 self.assertEqual(self._box.get_sequences(), 1281 {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]}) 1282 1283 # Test case for packing while holding the mailbox locked. 1284 key0 = self._box.add(msg1) 1285 key1 = self._box.add(msg1) 1286 key2 = self._box.add(msg1) 1287 key3 = self._box.add(msg1) 1288 1289 self._box.remove(key0) 1290 self._box.remove(key2) 1291 self._box.lock() 1292 self._box.pack() 1293 self._box.unlock() 1294 self.assertEqual(self._box.get_sequences(), 1295 {'foo':[1, 2, 3, 4, 5], 1296 'unseen':[1], 'bar':[3], 'replied':[3]}) 1297 1298 def _get_lock_path(self): 1299 return os.path.join(self._path, '.mh_sequences.lock') 1300 1301 1302class TestBabyl(_TestSingleFile, unittest.TestCase): 1303 1304 _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory) 1305 1306 def assertMailboxEmpty(self): 1307 with open(self._path) as f: 1308 self.assertEqual(f.readlines(), []) 1309 1310 def tearDown(self): 1311 super().tearDown() 1312 self._box.close() 1313 self._delete_recursively(self._path) 1314 for lock_remnant in glob.glob(glob.escape(self._path) + '.*'): 1315 support.unlink(lock_remnant) 1316 1317 def test_labels(self): 1318 # Get labels from the mailbox 1319 self.assertEqual(self._box.get_labels(), []) 1320 msg0 = mailbox.BabylMessage(self._template % 0) 1321 msg0.add_label('foo') 1322 key0 = self._box.add(msg0) 1323 self.assertEqual(self._box.get_labels(), ['foo']) 1324 msg1 = mailbox.BabylMessage(self._template % 1) 1325 msg1.set_labels(['bar', 'answered', 'foo']) 1326 key1 = self._box.add(msg1) 1327 self.assertEqual(set(self._box.get_labels()), set(['foo', 'bar'])) 1328 msg0.set_labels(['blah', 'filed']) 1329 self._box[key0] = msg0 1330 self.assertEqual(set(self._box.get_labels()), 1331 set(['foo', 'bar', 'blah'])) 1332 self._box.remove(key1) 1333 self.assertEqual(set(self._box.get_labels()), set(['blah'])) 1334 1335 1336class FakeFileLikeObject: 1337 1338 def __init__(self): 1339 self.closed = False 1340 1341 def close(self): 1342 self.closed = True 1343 1344 1345class FakeMailBox(mailbox.Mailbox): 1346 1347 def __init__(self): 1348 mailbox.Mailbox.__init__(self, '', lambda file: None) 1349 self.files = [FakeFileLikeObject() for i in range(10)] 1350 1351 def get_file(self, key): 1352 return self.files[key] 1353 1354 1355class TestFakeMailBox(unittest.TestCase): 1356 1357 def test_closing_fd(self): 1358 box = FakeMailBox() 1359 for i in range(10): 1360 self.assertFalse(box.files[i].closed) 1361 for i in range(10): 1362 box[i] 1363 for i in range(10): 1364 self.assertTrue(box.files[i].closed) 1365 1366 1367class TestMessage(TestBase, unittest.TestCase): 1368 1369 _factory = mailbox.Message # Overridden by subclasses to reuse tests 1370 1371 def setUp(self): 1372 self._path = support.TESTFN 1373 1374 def tearDown(self): 1375 self._delete_recursively(self._path) 1376 1377 def test_initialize_with_eMM(self): 1378 # Initialize based on email.message.Message instance 1379 eMM = email.message_from_string(_sample_message) 1380 msg = self._factory(eMM) 1381 self._post_initialize_hook(msg) 1382 self._check_sample(msg) 1383 1384 def test_initialize_with_string(self): 1385 # Initialize based on string 1386 msg = self._factory(_sample_message) 1387 self._post_initialize_hook(msg) 1388 self._check_sample(msg) 1389 1390 def test_initialize_with_file(self): 1391 # Initialize based on contents of file 1392 with open(self._path, 'w+') as f: 1393 f.write(_sample_message) 1394 f.seek(0) 1395 msg = self._factory(f) 1396 self._post_initialize_hook(msg) 1397 self._check_sample(msg) 1398 1399 def test_initialize_with_binary_file(self): 1400 # Initialize based on contents of binary file 1401 with open(self._path, 'wb+') as f: 1402 f.write(_bytes_sample_message) 1403 f.seek(0) 1404 msg = self._factory(f) 1405 self._post_initialize_hook(msg) 1406 self._check_sample(msg) 1407 1408 def test_initialize_with_nothing(self): 1409 # Initialize without arguments 1410 msg = self._factory() 1411 self._post_initialize_hook(msg) 1412 self.assertIsInstance(msg, email.message.Message) 1413 self.assertIsInstance(msg, mailbox.Message) 1414 self.assertIsInstance(msg, self._factory) 1415 self.assertEqual(msg.keys(), []) 1416 self.assertFalse(msg.is_multipart()) 1417 self.assertIsNone(msg.get_payload()) 1418 1419 def test_initialize_incorrectly(self): 1420 # Initialize with invalid argument 1421 self.assertRaises(TypeError, lambda: self._factory(object())) 1422 1423 def test_all_eMM_attribues_exist(self): 1424 # Issue 12537 1425 eMM = email.message_from_string(_sample_message) 1426 msg = self._factory(_sample_message) 1427 for attr in eMM.__dict__: 1428 self.assertIn(attr, msg.__dict__, 1429 '{} attribute does not exist'.format(attr)) 1430 1431 def test_become_message(self): 1432 # Take on the state of another message 1433 eMM = email.message_from_string(_sample_message) 1434 msg = self._factory() 1435 msg._become_message(eMM) 1436 self._check_sample(msg) 1437 1438 def test_explain_to(self): 1439 # Copy self's format-specific data to other message formats. 1440 # This test is superficial; better ones are in TestMessageConversion. 1441 msg = self._factory() 1442 for class_ in self.all_mailbox_types: 1443 other_msg = class_() 1444 msg._explain_to(other_msg) 1445 other_msg = email.message.Message() 1446 self.assertRaises(TypeError, lambda: msg._explain_to(other_msg)) 1447 1448 def _post_initialize_hook(self, msg): 1449 # Overridden by subclasses to check extra things after initialization 1450 pass 1451 1452 1453class TestMaildirMessage(TestMessage, unittest.TestCase): 1454 1455 _factory = mailbox.MaildirMessage 1456 1457 def _post_initialize_hook(self, msg): 1458 self.assertEqual(msg._subdir, 'new') 1459 self.assertEqual(msg._info, '') 1460 1461 def test_subdir(self): 1462 # Use get_subdir() and set_subdir() 1463 msg = mailbox.MaildirMessage(_sample_message) 1464 self.assertEqual(msg.get_subdir(), 'new') 1465 msg.set_subdir('cur') 1466 self.assertEqual(msg.get_subdir(), 'cur') 1467 msg.set_subdir('new') 1468 self.assertEqual(msg.get_subdir(), 'new') 1469 self.assertRaises(ValueError, lambda: msg.set_subdir('tmp')) 1470 self.assertEqual(msg.get_subdir(), 'new') 1471 msg.set_subdir('new') 1472 self.assertEqual(msg.get_subdir(), 'new') 1473 self._check_sample(msg) 1474 1475 def test_flags(self): 1476 # Use get_flags(), set_flags(), add_flag(), remove_flag() 1477 msg = mailbox.MaildirMessage(_sample_message) 1478 self.assertEqual(msg.get_flags(), '') 1479 self.assertEqual(msg.get_subdir(), 'new') 1480 msg.set_flags('F') 1481 self.assertEqual(msg.get_subdir(), 'new') 1482 self.assertEqual(msg.get_flags(), 'F') 1483 msg.set_flags('SDTP') 1484 self.assertEqual(msg.get_flags(), 'DPST') 1485 msg.add_flag('FT') 1486 self.assertEqual(msg.get_flags(), 'DFPST') 1487 msg.remove_flag('TDRP') 1488 self.assertEqual(msg.get_flags(), 'FS') 1489 self.assertEqual(msg.get_subdir(), 'new') 1490 self._check_sample(msg) 1491 1492 def test_date(self): 1493 # Use get_date() and set_date() 1494 msg = mailbox.MaildirMessage(_sample_message) 1495 self.assertLess(abs(msg.get_date() - time.time()), 60) 1496 msg.set_date(0.0) 1497 self.assertEqual(msg.get_date(), 0.0) 1498 1499 def test_info(self): 1500 # Use get_info() and set_info() 1501 msg = mailbox.MaildirMessage(_sample_message) 1502 self.assertEqual(msg.get_info(), '') 1503 msg.set_info('1,foo=bar') 1504 self.assertEqual(msg.get_info(), '1,foo=bar') 1505 self.assertRaises(TypeError, lambda: msg.set_info(None)) 1506 self._check_sample(msg) 1507 1508 def test_info_and_flags(self): 1509 # Test interaction of info and flag methods 1510 msg = mailbox.MaildirMessage(_sample_message) 1511 self.assertEqual(msg.get_info(), '') 1512 msg.set_flags('SF') 1513 self.assertEqual(msg.get_flags(), 'FS') 1514 self.assertEqual(msg.get_info(), '2,FS') 1515 msg.set_info('1,') 1516 self.assertEqual(msg.get_flags(), '') 1517 self.assertEqual(msg.get_info(), '1,') 1518 msg.remove_flag('RPT') 1519 self.assertEqual(msg.get_flags(), '') 1520 self.assertEqual(msg.get_info(), '1,') 1521 msg.add_flag('D') 1522 self.assertEqual(msg.get_flags(), 'D') 1523 self.assertEqual(msg.get_info(), '2,D') 1524 self._check_sample(msg) 1525 1526 1527class _TestMboxMMDFMessage: 1528 1529 _factory = mailbox._mboxMMDFMessage 1530 1531 def _post_initialize_hook(self, msg): 1532 self._check_from(msg) 1533 1534 def test_initialize_with_unixfrom(self): 1535 # Initialize with a message that already has a _unixfrom attribute 1536 msg = mailbox.Message(_sample_message) 1537 msg.set_unixfrom('From foo@bar blah') 1538 msg = mailbox.mboxMessage(msg) 1539 self.assertEqual(msg.get_from(), 'foo@bar blah', msg.get_from()) 1540 1541 def test_from(self): 1542 # Get and set "From " line 1543 msg = mailbox.mboxMessage(_sample_message) 1544 self._check_from(msg) 1545 msg.set_from('foo bar') 1546 self.assertEqual(msg.get_from(), 'foo bar') 1547 msg.set_from('foo@bar', True) 1548 self._check_from(msg, 'foo@bar') 1549 msg.set_from('blah@temp', time.localtime()) 1550 self._check_from(msg, 'blah@temp') 1551 1552 def test_flags(self): 1553 # Use get_flags(), set_flags(), add_flag(), remove_flag() 1554 msg = mailbox.mboxMessage(_sample_message) 1555 self.assertEqual(msg.get_flags(), '') 1556 msg.set_flags('F') 1557 self.assertEqual(msg.get_flags(), 'F') 1558 msg.set_flags('XODR') 1559 self.assertEqual(msg.get_flags(), 'RODX') 1560 msg.add_flag('FA') 1561 self.assertEqual(msg.get_flags(), 'RODFAX') 1562 msg.remove_flag('FDXA') 1563 self.assertEqual(msg.get_flags(), 'RO') 1564 self._check_sample(msg) 1565 1566 def _check_from(self, msg, sender=None): 1567 # Check contents of "From " line 1568 if sender is None: 1569 sender = "MAILER-DAEMON" 1570 self.assertIsNotNone(re.match( 1571 sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:\d{2} \d{4}", 1572 msg.get_from())) 1573 1574 1575class TestMboxMessage(_TestMboxMMDFMessage, TestMessage): 1576 1577 _factory = mailbox.mboxMessage 1578 1579 1580class TestMHMessage(TestMessage, unittest.TestCase): 1581 1582 _factory = mailbox.MHMessage 1583 1584 def _post_initialize_hook(self, msg): 1585 self.assertEqual(msg._sequences, []) 1586 1587 def test_sequences(self): 1588 # Get, set, join, and leave sequences 1589 msg = mailbox.MHMessage(_sample_message) 1590 self.assertEqual(msg.get_sequences(), []) 1591 msg.set_sequences(['foobar']) 1592 self.assertEqual(msg.get_sequences(), ['foobar']) 1593 msg.set_sequences([]) 1594 self.assertEqual(msg.get_sequences(), []) 1595 msg.add_sequence('unseen') 1596 self.assertEqual(msg.get_sequences(), ['unseen']) 1597 msg.add_sequence('flagged') 1598 self.assertEqual(msg.get_sequences(), ['unseen', 'flagged']) 1599 msg.add_sequence('flagged') 1600 self.assertEqual(msg.get_sequences(), ['unseen', 'flagged']) 1601 msg.remove_sequence('unseen') 1602 self.assertEqual(msg.get_sequences(), ['flagged']) 1603 msg.add_sequence('foobar') 1604 self.assertEqual(msg.get_sequences(), ['flagged', 'foobar']) 1605 msg.remove_sequence('replied') 1606 self.assertEqual(msg.get_sequences(), ['flagged', 'foobar']) 1607 msg.set_sequences(['foobar', 'replied']) 1608 self.assertEqual(msg.get_sequences(), ['foobar', 'replied']) 1609 1610 1611class TestBabylMessage(TestMessage, unittest.TestCase): 1612 1613 _factory = mailbox.BabylMessage 1614 1615 def _post_initialize_hook(self, msg): 1616 self.assertEqual(msg._labels, []) 1617 1618 def test_labels(self): 1619 # Get, set, join, and leave labels 1620 msg = mailbox.BabylMessage(_sample_message) 1621 self.assertEqual(msg.get_labels(), []) 1622 msg.set_labels(['foobar']) 1623 self.assertEqual(msg.get_labels(), ['foobar']) 1624 msg.set_labels([]) 1625 self.assertEqual(msg.get_labels(), []) 1626 msg.add_label('filed') 1627 self.assertEqual(msg.get_labels(), ['filed']) 1628 msg.add_label('resent') 1629 self.assertEqual(msg.get_labels(), ['filed', 'resent']) 1630 msg.add_label('resent') 1631 self.assertEqual(msg.get_labels(), ['filed', 'resent']) 1632 msg.remove_label('filed') 1633 self.assertEqual(msg.get_labels(), ['resent']) 1634 msg.add_label('foobar') 1635 self.assertEqual(msg.get_labels(), ['resent', 'foobar']) 1636 msg.remove_label('unseen') 1637 self.assertEqual(msg.get_labels(), ['resent', 'foobar']) 1638 msg.set_labels(['foobar', 'answered']) 1639 self.assertEqual(msg.get_labels(), ['foobar', 'answered']) 1640 1641 def test_visible(self): 1642 # Get, set, and update visible headers 1643 msg = mailbox.BabylMessage(_sample_message) 1644 visible = msg.get_visible() 1645 self.assertEqual(visible.keys(), []) 1646 self.assertIsNone(visible.get_payload()) 1647 visible['User-Agent'] = 'FooBar 1.0' 1648 visible['X-Whatever'] = 'Blah' 1649 self.assertEqual(msg.get_visible().keys(), []) 1650 msg.set_visible(visible) 1651 visible = msg.get_visible() 1652 self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) 1653 self.assertEqual(visible['User-Agent'], 'FooBar 1.0') 1654 self.assertEqual(visible['X-Whatever'], 'Blah') 1655 self.assertIsNone(visible.get_payload()) 1656 msg.update_visible() 1657 self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) 1658 self.assertIsNone(visible.get_payload()) 1659 visible = msg.get_visible() 1660 self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To', 1661 'Subject']) 1662 for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'): 1663 self.assertEqual(visible[header], msg[header]) 1664 1665 1666class TestMMDFMessage(_TestMboxMMDFMessage, TestMessage): 1667 1668 _factory = mailbox.MMDFMessage 1669 1670 1671class TestMessageConversion(TestBase, unittest.TestCase): 1672 1673 def test_plain_to_x(self): 1674 # Convert Message to all formats 1675 for class_ in self.all_mailbox_types: 1676 msg_plain = mailbox.Message(_sample_message) 1677 msg = class_(msg_plain) 1678 self._check_sample(msg) 1679 1680 def test_x_to_plain(self): 1681 # Convert all formats to Message 1682 for class_ in self.all_mailbox_types: 1683 msg = class_(_sample_message) 1684 msg_plain = mailbox.Message(msg) 1685 self._check_sample(msg_plain) 1686 1687 def test_x_from_bytes(self): 1688 # Convert all formats to Message 1689 for class_ in self.all_mailbox_types: 1690 msg = class_(_bytes_sample_message) 1691 self._check_sample(msg) 1692 1693 def test_x_to_invalid(self): 1694 # Convert all formats to an invalid format 1695 for class_ in self.all_mailbox_types: 1696 self.assertRaises(TypeError, lambda: class_(False)) 1697 1698 def test_type_specific_attributes_removed_on_conversion(self): 1699 reference = {class_: class_(_sample_message).__dict__ 1700 for class_ in self.all_mailbox_types} 1701 for class1 in self.all_mailbox_types: 1702 for class2 in self.all_mailbox_types: 1703 if class1 is class2: 1704 continue 1705 source = class1(_sample_message) 1706 target = class2(source) 1707 type_specific = [a for a in reference[class1] 1708 if a not in reference[class2]] 1709 for attr in type_specific: 1710 self.assertNotIn(attr, target.__dict__, 1711 "while converting {} to {}".format(class1, class2)) 1712 1713 def test_maildir_to_maildir(self): 1714 # Convert MaildirMessage to MaildirMessage 1715 msg_maildir = mailbox.MaildirMessage(_sample_message) 1716 msg_maildir.set_flags('DFPRST') 1717 msg_maildir.set_subdir('cur') 1718 date = msg_maildir.get_date() 1719 msg = mailbox.MaildirMessage(msg_maildir) 1720 self._check_sample(msg) 1721 self.assertEqual(msg.get_flags(), 'DFPRST') 1722 self.assertEqual(msg.get_subdir(), 'cur') 1723 self.assertEqual(msg.get_date(), date) 1724 1725 def test_maildir_to_mboxmmdf(self): 1726 # Convert MaildirMessage to mboxmessage and MMDFMessage 1727 pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'), 1728 ('T', 'D'), ('DFPRST', 'RDFA')) 1729 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1730 msg_maildir = mailbox.MaildirMessage(_sample_message) 1731 msg_maildir.set_date(0.0) 1732 for setting, result in pairs: 1733 msg_maildir.set_flags(setting) 1734 msg = class_(msg_maildir) 1735 self.assertEqual(msg.get_flags(), result) 1736 self.assertEqual(msg.get_from(), 'MAILER-DAEMON %s' % 1737 time.asctime(time.gmtime(0.0))) 1738 msg_maildir.set_subdir('cur') 1739 self.assertEqual(class_(msg_maildir).get_flags(), 'RODFA') 1740 1741 def test_maildir_to_mh(self): 1742 # Convert MaildirMessage to MHMessage 1743 msg_maildir = mailbox.MaildirMessage(_sample_message) 1744 pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']), 1745 ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []), 1746 ('T', ['unseen']), ('DFPRST', ['replied', 'flagged'])) 1747 for setting, result in pairs: 1748 msg_maildir.set_flags(setting) 1749 self.assertEqual(mailbox.MHMessage(msg_maildir).get_sequences(), 1750 result) 1751 1752 def test_maildir_to_babyl(self): 1753 # Convert MaildirMessage to Babyl 1754 msg_maildir = mailbox.MaildirMessage(_sample_message) 1755 pairs = (('D', ['unseen']), ('F', ['unseen']), 1756 ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']), 1757 ('S', []), ('T', ['unseen', 'deleted']), 1758 ('DFPRST', ['deleted', 'answered', 'forwarded'])) 1759 for setting, result in pairs: 1760 msg_maildir.set_flags(setting) 1761 self.assertEqual(mailbox.BabylMessage(msg_maildir).get_labels(), 1762 result) 1763 1764 def test_mboxmmdf_to_maildir(self): 1765 # Convert mboxMessage and MMDFMessage to MaildirMessage 1766 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1767 msg_mboxMMDF = class_(_sample_message) 1768 msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0)) 1769 pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'), 1770 ('RODFA', 'FRST')) 1771 for setting, result in pairs: 1772 msg_mboxMMDF.set_flags(setting) 1773 msg = mailbox.MaildirMessage(msg_mboxMMDF) 1774 self.assertEqual(msg.get_flags(), result) 1775 self.assertEqual(msg.get_date(), 0.0) 1776 msg_mboxMMDF.set_flags('O') 1777 self.assertEqual(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir(), 1778 'cur') 1779 1780 def test_mboxmmdf_to_mboxmmdf(self): 1781 # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage 1782 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1783 msg_mboxMMDF = class_(_sample_message) 1784 msg_mboxMMDF.set_flags('RODFA') 1785 msg_mboxMMDF.set_from('foo@bar') 1786 for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1787 msg2 = class2_(msg_mboxMMDF) 1788 self.assertEqual(msg2.get_flags(), 'RODFA') 1789 self.assertEqual(msg2.get_from(), 'foo@bar') 1790 1791 def test_mboxmmdf_to_mh(self): 1792 # Convert mboxMessage and MMDFMessage to MHMessage 1793 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1794 msg_mboxMMDF = class_(_sample_message) 1795 pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']), 1796 ('F', ['unseen', 'flagged']), 1797 ('A', ['unseen', 'replied']), 1798 ('RODFA', ['replied', 'flagged'])) 1799 for setting, result in pairs: 1800 msg_mboxMMDF.set_flags(setting) 1801 self.assertEqual(mailbox.MHMessage(msg_mboxMMDF).get_sequences(), 1802 result) 1803 1804 def test_mboxmmdf_to_babyl(self): 1805 # Convert mboxMessage and MMDFMessage to BabylMessage 1806 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1807 msg = class_(_sample_message) 1808 pairs = (('R', []), ('O', ['unseen']), 1809 ('D', ['unseen', 'deleted']), ('F', ['unseen']), 1810 ('A', ['unseen', 'answered']), 1811 ('RODFA', ['deleted', 'answered'])) 1812 for setting, result in pairs: 1813 msg.set_flags(setting) 1814 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result) 1815 1816 def test_mh_to_maildir(self): 1817 # Convert MHMessage to MaildirMessage 1818 pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS')) 1819 for setting, result in pairs: 1820 msg = mailbox.MHMessage(_sample_message) 1821 msg.add_sequence(setting) 1822 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result) 1823 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1824 msg = mailbox.MHMessage(_sample_message) 1825 msg.add_sequence('unseen') 1826 msg.add_sequence('replied') 1827 msg.add_sequence('flagged') 1828 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'FR') 1829 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1830 1831 def test_mh_to_mboxmmdf(self): 1832 # Convert MHMessage to mboxMessage and MMDFMessage 1833 pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF')) 1834 for setting, result in pairs: 1835 msg = mailbox.MHMessage(_sample_message) 1836 msg.add_sequence(setting) 1837 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1838 self.assertEqual(class_(msg).get_flags(), result) 1839 msg = mailbox.MHMessage(_sample_message) 1840 msg.add_sequence('unseen') 1841 msg.add_sequence('replied') 1842 msg.add_sequence('flagged') 1843 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1844 self.assertEqual(class_(msg).get_flags(), 'OFA') 1845 1846 def test_mh_to_mh(self): 1847 # Convert MHMessage to MHMessage 1848 msg = mailbox.MHMessage(_sample_message) 1849 msg.add_sequence('unseen') 1850 msg.add_sequence('replied') 1851 msg.add_sequence('flagged') 1852 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), 1853 ['unseen', 'replied', 'flagged']) 1854 1855 def test_mh_to_babyl(self): 1856 # Convert MHMessage to BabylMessage 1857 pairs = (('unseen', ['unseen']), ('replied', ['answered']), 1858 ('flagged', [])) 1859 for setting, result in pairs: 1860 msg = mailbox.MHMessage(_sample_message) 1861 msg.add_sequence(setting) 1862 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result) 1863 msg = mailbox.MHMessage(_sample_message) 1864 msg.add_sequence('unseen') 1865 msg.add_sequence('replied') 1866 msg.add_sequence('flagged') 1867 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), 1868 ['unseen', 'answered']) 1869 1870 def test_babyl_to_maildir(self): 1871 # Convert BabylMessage to MaildirMessage 1872 pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'), 1873 ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'), 1874 ('resent', 'PS')) 1875 for setting, result in pairs: 1876 msg = mailbox.BabylMessage(_sample_message) 1877 msg.add_label(setting) 1878 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result) 1879 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1880 msg = mailbox.BabylMessage(_sample_message) 1881 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1882 'edited', 'resent'): 1883 msg.add_label(label) 1884 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'PRT') 1885 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1886 1887 def test_babyl_to_mboxmmdf(self): 1888 # Convert BabylMessage to mboxMessage and MMDFMessage 1889 pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'), 1890 ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'), 1891 ('resent', 'RO')) 1892 for setting, result in pairs: 1893 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1894 msg = mailbox.BabylMessage(_sample_message) 1895 msg.add_label(setting) 1896 self.assertEqual(class_(msg).get_flags(), result) 1897 msg = mailbox.BabylMessage(_sample_message) 1898 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1899 'edited', 'resent'): 1900 msg.add_label(label) 1901 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1902 self.assertEqual(class_(msg).get_flags(), 'ODA') 1903 1904 def test_babyl_to_mh(self): 1905 # Convert BabylMessage to MHMessage 1906 pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []), 1907 ('answered', ['replied']), ('forwarded', []), ('edited', []), 1908 ('resent', [])) 1909 for setting, result in pairs: 1910 msg = mailbox.BabylMessage(_sample_message) 1911 msg.add_label(setting) 1912 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), result) 1913 msg = mailbox.BabylMessage(_sample_message) 1914 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1915 'edited', 'resent'): 1916 msg.add_label(label) 1917 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), 1918 ['unseen', 'replied']) 1919 1920 def test_babyl_to_babyl(self): 1921 # Convert BabylMessage to BabylMessage 1922 msg = mailbox.BabylMessage(_sample_message) 1923 msg.update_visible() 1924 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1925 'edited', 'resent'): 1926 msg.add_label(label) 1927 msg2 = mailbox.BabylMessage(msg) 1928 self.assertEqual(msg2.get_labels(), ['unseen', 'deleted', 'filed', 1929 'answered', 'forwarded', 'edited', 1930 'resent']) 1931 self.assertEqual(msg.get_visible().keys(), msg2.get_visible().keys()) 1932 for key in msg.get_visible().keys(): 1933 self.assertEqual(msg.get_visible()[key], msg2.get_visible()[key]) 1934 1935 1936class TestProxyFileBase(TestBase): 1937 1938 def _test_read(self, proxy): 1939 # Read by byte 1940 proxy.seek(0) 1941 self.assertEqual(proxy.read(), b'bar') 1942 proxy.seek(1) 1943 self.assertEqual(proxy.read(), b'ar') 1944 proxy.seek(0) 1945 self.assertEqual(proxy.read(2), b'ba') 1946 proxy.seek(1) 1947 self.assertEqual(proxy.read(-1), b'ar') 1948 proxy.seek(2) 1949 self.assertEqual(proxy.read(1000), b'r') 1950 1951 def _test_readline(self, proxy): 1952 # Read by line 1953 linesep = os.linesep.encode() 1954 proxy.seek(0) 1955 self.assertEqual(proxy.readline(), b'foo' + linesep) 1956 self.assertEqual(proxy.readline(), b'bar' + linesep) 1957 self.assertEqual(proxy.readline(), b'fred' + linesep) 1958 self.assertEqual(proxy.readline(), b'bob') 1959 proxy.seek(2) 1960 self.assertEqual(proxy.readline(), b'o' + linesep) 1961 proxy.seek(6 + 2 * len(os.linesep)) 1962 self.assertEqual(proxy.readline(), b'fred' + linesep) 1963 proxy.seek(6 + 2 * len(os.linesep)) 1964 self.assertEqual(proxy.readline(2), b'fr') 1965 self.assertEqual(proxy.readline(-10), b'ed' + linesep) 1966 1967 def _test_readlines(self, proxy): 1968 # Read multiple lines 1969 linesep = os.linesep.encode() 1970 proxy.seek(0) 1971 self.assertEqual(proxy.readlines(), [b'foo' + linesep, 1972 b'bar' + linesep, 1973 b'fred' + linesep, b'bob']) 1974 proxy.seek(0) 1975 self.assertEqual(proxy.readlines(2), [b'foo' + linesep]) 1976 proxy.seek(3 + len(linesep)) 1977 self.assertEqual(proxy.readlines(4 + len(linesep)), 1978 [b'bar' + linesep, b'fred' + linesep]) 1979 proxy.seek(3) 1980 self.assertEqual(proxy.readlines(1000), [linesep, b'bar' + linesep, 1981 b'fred' + linesep, b'bob']) 1982 1983 def _test_iteration(self, proxy): 1984 # Iterate by line 1985 linesep = os.linesep.encode() 1986 proxy.seek(0) 1987 iterator = iter(proxy) 1988 self.assertEqual(next(iterator), b'foo' + linesep) 1989 self.assertEqual(next(iterator), b'bar' + linesep) 1990 self.assertEqual(next(iterator), b'fred' + linesep) 1991 self.assertEqual(next(iterator), b'bob') 1992 self.assertRaises(StopIteration, next, iterator) 1993 1994 def _test_seek_and_tell(self, proxy): 1995 # Seek and use tell to check position 1996 linesep = os.linesep.encode() 1997 proxy.seek(3) 1998 self.assertEqual(proxy.tell(), 3) 1999 self.assertEqual(proxy.read(len(linesep)), linesep) 2000 proxy.seek(2, 1) 2001 self.assertEqual(proxy.read(1 + len(linesep)), b'r' + linesep) 2002 proxy.seek(-3 - len(linesep), 2) 2003 self.assertEqual(proxy.read(3), b'bar') 2004 proxy.seek(2, 0) 2005 self.assertEqual(proxy.read(), b'o' + linesep + b'bar' + linesep) 2006 proxy.seek(100) 2007 self.assertFalse(proxy.read()) 2008 2009 def _test_close(self, proxy): 2010 # Close a file 2011 self.assertFalse(proxy.closed) 2012 proxy.close() 2013 self.assertTrue(proxy.closed) 2014 # Issue 11700 subsequent closes should be a no-op. 2015 proxy.close() 2016 self.assertTrue(proxy.closed) 2017 2018 2019class TestProxyFile(TestProxyFileBase, unittest.TestCase): 2020 2021 def setUp(self): 2022 self._path = support.TESTFN 2023 self._file = open(self._path, 'wb+') 2024 2025 def tearDown(self): 2026 self._file.close() 2027 self._delete_recursively(self._path) 2028 2029 def test_initialize(self): 2030 # Initialize and check position 2031 self._file.write(b'foo') 2032 pos = self._file.tell() 2033 proxy0 = mailbox._ProxyFile(self._file) 2034 self.assertEqual(proxy0.tell(), pos) 2035 self.assertEqual(self._file.tell(), pos) 2036 proxy1 = mailbox._ProxyFile(self._file, 0) 2037 self.assertEqual(proxy1.tell(), 0) 2038 self.assertEqual(self._file.tell(), pos) 2039 2040 def test_read(self): 2041 self._file.write(b'bar') 2042 self._test_read(mailbox._ProxyFile(self._file)) 2043 2044 def test_readline(self): 2045 self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 2046 os.linesep), 'ascii')) 2047 self._test_readline(mailbox._ProxyFile(self._file)) 2048 2049 def test_readlines(self): 2050 self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 2051 os.linesep), 'ascii')) 2052 self._test_readlines(mailbox._ProxyFile(self._file)) 2053 2054 def test_iteration(self): 2055 self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 2056 os.linesep), 'ascii')) 2057 self._test_iteration(mailbox._ProxyFile(self._file)) 2058 2059 def test_seek_and_tell(self): 2060 self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii')) 2061 self._test_seek_and_tell(mailbox._ProxyFile(self._file)) 2062 2063 def test_close(self): 2064 self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii')) 2065 self._test_close(mailbox._ProxyFile(self._file)) 2066 2067 2068class TestPartialFile(TestProxyFileBase, unittest.TestCase): 2069 2070 def setUp(self): 2071 self._path = support.TESTFN 2072 self._file = open(self._path, 'wb+') 2073 2074 def tearDown(self): 2075 self._file.close() 2076 self._delete_recursively(self._path) 2077 2078 def test_initialize(self): 2079 # Initialize and check position 2080 self._file.write(bytes('foo' + os.linesep + 'bar', 'ascii')) 2081 pos = self._file.tell() 2082 proxy = mailbox._PartialFile(self._file, 2, 5) 2083 self.assertEqual(proxy.tell(), 0) 2084 self.assertEqual(self._file.tell(), pos) 2085 2086 def test_read(self): 2087 self._file.write(bytes('***bar***', 'ascii')) 2088 self._test_read(mailbox._PartialFile(self._file, 3, 6)) 2089 2090 def test_readline(self): 2091 self._file.write(bytes('!!!!!foo%sbar%sfred%sbob!!!!!' % 2092 (os.linesep, os.linesep, os.linesep), 'ascii')) 2093 self._test_readline(mailbox._PartialFile(self._file, 5, 2094 18 + 3 * len(os.linesep))) 2095 2096 def test_readlines(self): 2097 self._file.write(bytes('foo%sbar%sfred%sbob?????' % 2098 (os.linesep, os.linesep, os.linesep), 'ascii')) 2099 self._test_readlines(mailbox._PartialFile(self._file, 0, 2100 13 + 3 * len(os.linesep))) 2101 2102 def test_iteration(self): 2103 self._file.write(bytes('____foo%sbar%sfred%sbob####' % 2104 (os.linesep, os.linesep, os.linesep), 'ascii')) 2105 self._test_iteration(mailbox._PartialFile(self._file, 4, 2106 17 + 3 * len(os.linesep))) 2107 2108 def test_seek_and_tell(self): 2109 self._file.write(bytes('(((foo%sbar%s$$$' % (os.linesep, os.linesep), 'ascii')) 2110 self._test_seek_and_tell(mailbox._PartialFile(self._file, 3, 2111 9 + 2 * len(os.linesep))) 2112 2113 def test_close(self): 2114 self._file.write(bytes('&foo%sbar%s^' % (os.linesep, os.linesep), 'ascii')) 2115 self._test_close(mailbox._PartialFile(self._file, 1, 2116 6 + 3 * len(os.linesep))) 2117 2118 2119## Start: tests from the original module (for backward compatibility). 2120 2121FROM_ = "From some.body@dummy.domain Sat Jul 24 13:43:35 2004\n" 2122DUMMY_MESSAGE = """\ 2123From: some.body@dummy.domain 2124To: me@my.domain 2125Subject: Simple Test 2126 2127This is a dummy message. 2128""" 2129 2130class MaildirTestCase(unittest.TestCase): 2131 2132 def setUp(self): 2133 # create a new maildir mailbox to work with: 2134 self._dir = support.TESTFN 2135 if os.path.isdir(self._dir): 2136 support.rmtree(self._dir) 2137 elif os.path.isfile(self._dir): 2138 support.unlink(self._dir) 2139 os.mkdir(self._dir) 2140 os.mkdir(os.path.join(self._dir, "cur")) 2141 os.mkdir(os.path.join(self._dir, "tmp")) 2142 os.mkdir(os.path.join(self._dir, "new")) 2143 self._counter = 1 2144 self._msgfiles = [] 2145 2146 def tearDown(self): 2147 list(map(os.unlink, self._msgfiles)) 2148 support.rmdir(os.path.join(self._dir, "cur")) 2149 support.rmdir(os.path.join(self._dir, "tmp")) 2150 support.rmdir(os.path.join(self._dir, "new")) 2151 support.rmdir(self._dir) 2152 2153 def createMessage(self, dir, mbox=False): 2154 t = int(time.time() % 1000000) 2155 pid = self._counter 2156 self._counter += 1 2157 filename = ".".join((str(t), str(pid), "myhostname", "mydomain")) 2158 tmpname = os.path.join(self._dir, "tmp", filename) 2159 newname = os.path.join(self._dir, dir, filename) 2160 with open(tmpname, "w") as fp: 2161 self._msgfiles.append(tmpname) 2162 if mbox: 2163 fp.write(FROM_) 2164 fp.write(DUMMY_MESSAGE) 2165 try: 2166 os.link(tmpname, newname) 2167 except (AttributeError, PermissionError): 2168 with open(newname, "w") as fp: 2169 fp.write(DUMMY_MESSAGE) 2170 self._msgfiles.append(newname) 2171 return tmpname 2172 2173 def test_empty_maildir(self): 2174 """Test an empty maildir mailbox""" 2175 # Test for regression on bug #117490: 2176 # Make sure the boxes attribute actually gets set. 2177 self.mbox = mailbox.Maildir(support.TESTFN) 2178 #self.assertTrue(hasattr(self.mbox, "boxes")) 2179 #self.assertEqual(len(self.mbox.boxes), 0) 2180 self.assertIsNone(self.mbox.next()) 2181 self.assertIsNone(self.mbox.next()) 2182 2183 def test_nonempty_maildir_cur(self): 2184 self.createMessage("cur") 2185 self.mbox = mailbox.Maildir(support.TESTFN) 2186 #self.assertEqual(len(self.mbox.boxes), 1) 2187 self.assertIsNotNone(self.mbox.next()) 2188 self.assertIsNone(self.mbox.next()) 2189 self.assertIsNone(self.mbox.next()) 2190 2191 def test_nonempty_maildir_new(self): 2192 self.createMessage("new") 2193 self.mbox = mailbox.Maildir(support.TESTFN) 2194 #self.assertEqual(len(self.mbox.boxes), 1) 2195 self.assertIsNotNone(self.mbox.next()) 2196 self.assertIsNone(self.mbox.next()) 2197 self.assertIsNone(self.mbox.next()) 2198 2199 def test_nonempty_maildir_both(self): 2200 self.createMessage("cur") 2201 self.createMessage("new") 2202 self.mbox = mailbox.Maildir(support.TESTFN) 2203 #self.assertEqual(len(self.mbox.boxes), 2) 2204 self.assertIsNotNone(self.mbox.next()) 2205 self.assertIsNotNone(self.mbox.next()) 2206 self.assertIsNone(self.mbox.next()) 2207 self.assertIsNone(self.mbox.next()) 2208 2209## End: tests from the original module (for backward compatibility). 2210 2211 2212_sample_message = """\ 2213Return-Path: <gkj@gregorykjohnson.com> 2214X-Original-To: gkj+person@localhost 2215Delivered-To: gkj+person@localhost 2216Received: from localhost (localhost [127.0.0.1]) 2217 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 2218 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT) 2219Delivered-To: gkj@sundance.gregorykjohnson.com 2220Received: from localhost [127.0.0.1] 2221 by localhost with POP3 (fetchmail-6.2.5) 2222 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT) 2223Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) 2224 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 2225 for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) 2226Received: by andy.gregorykjohnson.com (Postfix, from userid 1000) 2227 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) 2228Date: Wed, 13 Jul 2005 17:23:11 -0400 2229From: "Gregory K. Johnson" <gkj@gregorykjohnson.com> 2230To: gkj@gregorykjohnson.com 2231Subject: Sample message 2232Message-ID: <20050713212311.GC4701@andy.gregorykjohnson.com> 2233Mime-Version: 1.0 2234Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+" 2235Content-Disposition: inline 2236User-Agent: Mutt/1.5.9i 2237 2238 2239--NMuMz9nt05w80d4+ 2240Content-Type: text/plain; charset=us-ascii 2241Content-Disposition: inline 2242 2243This is a sample message. 2244 2245-- 2246Gregory K. Johnson 2247 2248--NMuMz9nt05w80d4+ 2249Content-Type: application/octet-stream 2250Content-Disposition: attachment; filename="text.gz" 2251Content-Transfer-Encoding: base64 2252 2253H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs 22543FYlAAAA 2255 2256--NMuMz9nt05w80d4+-- 2257""" 2258 2259_bytes_sample_message = _sample_message.encode('ascii') 2260 2261_sample_headers = { 2262 "Return-Path":"<gkj@gregorykjohnson.com>", 2263 "X-Original-To":"gkj+person@localhost", 2264 "Delivered-To":"gkj+person@localhost", 2265 "Received":"""from localhost (localhost [127.0.0.1]) 2266 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 2267 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", 2268 "Delivered-To":"gkj@sundance.gregorykjohnson.com", 2269 "Received":"""from localhost [127.0.0.1] 2270 by localhost with POP3 (fetchmail-6.2.5) 2271 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", 2272 "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) 2273 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 2274 for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", 2275 "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000) 2276 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", 2277 "Date":"Wed, 13 Jul 2005 17:23:11 -0400", 2278 "From":""""Gregory K. Johnson" <gkj@gregorykjohnson.com>""", 2279 "To":"gkj@gregorykjohnson.com", 2280 "Subject":"Sample message", 2281 "Mime-Version":"1.0", 2282 "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""", 2283 "Content-Disposition":"inline", 2284 "User-Agent": "Mutt/1.5.9i" } 2285 2286_sample_payloads = ("""This is a sample message. 2287 2288-- 2289Gregory K. Johnson 2290""", 2291"""H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs 22923FYlAAAA 2293""") 2294 2295 2296class MiscTestCase(unittest.TestCase): 2297 def test__all__(self): 2298 blacklist = {"linesep", "fcntl"} 2299 support.check__all__(self, mailbox, blacklist=blacklist) 2300 2301 2302def test_main(): 2303 tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH, 2304 TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage, 2305 TestMHMessage, TestBabylMessage, TestMMDFMessage, 2306 TestMessageConversion, TestProxyFile, TestPartialFile, 2307 MaildirTestCase, TestFakeMailBox, MiscTestCase) 2308 support.run_unittest(*tests) 2309 support.reap_children() 2310 2311 2312if __name__ == '__main__': 2313 test_main() 2314