• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import os
2import sys
3import time
4import stat
5import socket
6import email
7import email.message
8import re
9import io
10import tempfile
11from test import support
12import unittest
13import textwrap
14import mailbox
15import glob
16
17
18class TestBase:
19
20    all_mailbox_types = (mailbox.Message, mailbox.MaildirMessage,
21                         mailbox.mboxMessage, mailbox.MHMessage,
22                         mailbox.BabylMessage, mailbox.MMDFMessage)
23
24    def _check_sample(self, msg):
25        # Inspect a mailbox.Message representation of the sample message
26        self.assertIsInstance(msg, email.message.Message)
27        self.assertIsInstance(msg, mailbox.Message)
28        for key, value in _sample_headers.items():
29            self.assertIn(value, msg.get_all(key))
30        self.assertTrue(msg.is_multipart())
31        self.assertEqual(len(msg.get_payload()), len(_sample_payloads))
32        for i, payload in enumerate(_sample_payloads):
33            part = msg.get_payload(i)
34            self.assertIsInstance(part, email.message.Message)
35            self.assertNotIsInstance(part, mailbox.Message)
36            self.assertEqual(part.get_payload(), payload)
37
38    def _delete_recursively(self, target):
39        # Delete a file or delete a directory recursively
40        if os.path.isdir(target):
41            support.rmtree(target)
42        elif os.path.exists(target):
43            support.unlink(target)
44
45
46class TestMailbox(TestBase):
47
48    maxDiff = None
49
50    _factory = None     # Overridden by subclasses to reuse tests
51    _template = 'From: foo\n\n%s\n'
52
53    def setUp(self):
54        self._path = support.TESTFN
55        self._delete_recursively(self._path)
56        self._box = self._factory(self._path)
57
58    def tearDown(self):
59        self._box.close()
60        self._delete_recursively(self._path)
61
62    def test_add(self):
63        # Add copies of a sample message
64        keys = []
65        keys.append(self._box.add(self._template % 0))
66        self.assertEqual(len(self._box), 1)
67        keys.append(self._box.add(mailbox.Message(_sample_message)))
68        self.assertEqual(len(self._box), 2)
69        keys.append(self._box.add(email.message_from_string(_sample_message)))
70        self.assertEqual(len(self._box), 3)
71        keys.append(self._box.add(io.BytesIO(_bytes_sample_message)))
72        self.assertEqual(len(self._box), 4)
73        keys.append(self._box.add(_sample_message))
74        self.assertEqual(len(self._box), 5)
75        keys.append(self._box.add(_bytes_sample_message))
76        self.assertEqual(len(self._box), 6)
77        with self.assertWarns(DeprecationWarning):
78            keys.append(self._box.add(
79                io.TextIOWrapper(io.BytesIO(_bytes_sample_message))))
80        self.assertEqual(len(self._box), 7)
81        self.assertEqual(self._box.get_string(keys[0]), self._template % 0)
82        for i in (1, 2, 3, 4, 5, 6):
83            self._check_sample(self._box[keys[i]])
84
85    _nonascii_msg = textwrap.dedent("""\
86            From: foo
87            Subject: Falinaptár házhozszállítással. Már rendeltél?
88
89            0
90            """)
91
92    def test_add_invalid_8bit_bytes_header(self):
93        key = self._box.add(self._nonascii_msg.encode('latin-1'))
94        self.assertEqual(len(self._box), 1)
95        self.assertEqual(self._box.get_bytes(key),
96            self._nonascii_msg.encode('latin-1'))
97
98    def test_invalid_nonascii_header_as_string(self):
99        subj = self._nonascii_msg.splitlines()[1]
100        key = self._box.add(subj.encode('latin-1'))
101        self.assertEqual(self._box.get_string(key),
102            'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz'
103            'YWwuIE3hciByZW5kZWx06Ww/?=\n\n')
104
105    def test_add_nonascii_string_header_raises(self):
106        with self.assertRaisesRegex(ValueError, "ASCII-only"):
107            self._box.add(self._nonascii_msg)
108        self._box.flush()
109        self.assertEqual(len(self._box), 0)
110        self.assertMailboxEmpty()
111
112    def test_add_that_raises_leaves_mailbox_empty(self):
113        def raiser(*args, **kw):
114            raise Exception("a fake error")
115        support.patch(self, email.generator.BytesGenerator, 'flatten', raiser)
116        with self.assertRaises(Exception):
117            self._box.add(email.message_from_string("From: Alphöso"))
118        self.assertEqual(len(self._box), 0)
119        self._box.close()
120        self.assertMailboxEmpty()
121
122    _non_latin_bin_msg = textwrap.dedent("""\
123        From: foo@bar.com
124        To: báz
125        Subject: Maintenant je vous présente mon collègue, le pouf célèbre
126        \tJean de Baddie
127        Mime-Version: 1.0
128        Content-Type: text/plain; charset="utf-8"
129        Content-Transfer-Encoding: 8bit
130
131        Да, они летят.
132        """).encode('utf-8')
133
134    def test_add_8bit_body(self):
135        key = self._box.add(self._non_latin_bin_msg)
136        self.assertEqual(self._box.get_bytes(key),
137                         self._non_latin_bin_msg)
138        with self._box.get_file(key) as f:
139            self.assertEqual(f.read(),
140                             self._non_latin_bin_msg.replace(b'\n',
141                                os.linesep.encode()))
142        self.assertEqual(self._box[key].get_payload(),
143                        "Да, они летят.\n")
144
145    def test_add_binary_file(self):
146        with tempfile.TemporaryFile('wb+') as f:
147            f.write(_bytes_sample_message)
148            f.seek(0)
149            key = self._box.add(f)
150        self.assertEqual(self._box.get_bytes(key).split(b'\n'),
151            _bytes_sample_message.split(b'\n'))
152
153    def test_add_binary_nonascii_file(self):
154        with tempfile.TemporaryFile('wb+') as f:
155            f.write(self._non_latin_bin_msg)
156            f.seek(0)
157            key = self._box.add(f)
158        self.assertEqual(self._box.get_bytes(key).split(b'\n'),
159            self._non_latin_bin_msg.split(b'\n'))
160
161    def test_add_text_file_warns(self):
162        with tempfile.TemporaryFile('w+') as f:
163            f.write(_sample_message)
164            f.seek(0)
165            with self.assertWarns(DeprecationWarning):
166                key = self._box.add(f)
167        self.assertEqual(self._box.get_bytes(key).split(b'\n'),
168            _bytes_sample_message.split(b'\n'))
169
170    def test_add_StringIO_warns(self):
171        with self.assertWarns(DeprecationWarning):
172            key = self._box.add(io.StringIO(self._template % "0"))
173        self.assertEqual(self._box.get_string(key), self._template % "0")
174
175    def test_add_nonascii_StringIO_raises(self):
176        with self.assertWarns(DeprecationWarning):
177            with self.assertRaisesRegex(ValueError, "ASCII-only"):
178                self._box.add(io.StringIO(self._nonascii_msg))
179        self.assertEqual(len(self._box), 0)
180        self._box.close()
181        self.assertMailboxEmpty()
182
183    def test_remove(self):
184        # Remove messages using remove()
185        self._test_remove_or_delitem(self._box.remove)
186
187    def test_delitem(self):
188        # Remove messages using __delitem__()
189        self._test_remove_or_delitem(self._box.__delitem__)
190
191    def _test_remove_or_delitem(self, method):
192        # (Used by test_remove() and test_delitem().)
193        key0 = self._box.add(self._template % 0)
194        key1 = self._box.add(self._template % 1)
195        self.assertEqual(len(self._box), 2)
196        method(key0)
197        self.assertEqual(len(self._box), 1)
198        self.assertRaises(KeyError, lambda: self._box[key0])
199        self.assertRaises(KeyError, lambda: method(key0))
200        self.assertEqual(self._box.get_string(key1), self._template % 1)
201        key2 = self._box.add(self._template % 2)
202        self.assertEqual(len(self._box), 2)
203        method(key2)
204        self.assertEqual(len(self._box), 1)
205        self.assertRaises(KeyError, lambda: self._box[key2])
206        self.assertRaises(KeyError, lambda: method(key2))
207        self.assertEqual(self._box.get_string(key1), self._template % 1)
208        method(key1)
209        self.assertEqual(len(self._box), 0)
210        self.assertRaises(KeyError, lambda: self._box[key1])
211        self.assertRaises(KeyError, lambda: method(key1))
212
213    def test_discard(self, repetitions=10):
214        # Discard messages
215        key0 = self._box.add(self._template % 0)
216        key1 = self._box.add(self._template % 1)
217        self.assertEqual(len(self._box), 2)
218        self._box.discard(key0)
219        self.assertEqual(len(self._box), 1)
220        self.assertRaises(KeyError, lambda: self._box[key0])
221        self._box.discard(key0)
222        self.assertEqual(len(self._box), 1)
223        self.assertRaises(KeyError, lambda: self._box[key0])
224
225    def test_get(self):
226        # Retrieve messages using get()
227        key0 = self._box.add(self._template % 0)
228        msg = self._box.get(key0)
229        self.assertEqual(msg['from'], 'foo')
230        self.assertEqual(msg.get_payload(), '0\n')
231        self.assertIsNone(self._box.get('foo'))
232        self.assertIs(self._box.get('foo', False), False)
233        self._box.close()
234        self._box = self._factory(self._path)
235        key1 = self._box.add(self._template % 1)
236        msg = self._box.get(key1)
237        self.assertEqual(msg['from'], 'foo')
238        self.assertEqual(msg.get_payload(), '1\n')
239
240    def test_getitem(self):
241        # Retrieve message using __getitem__()
242        key0 = self._box.add(self._template % 0)
243        msg = self._box[key0]
244        self.assertEqual(msg['from'], 'foo')
245        self.assertEqual(msg.get_payload(), '0\n')
246        self.assertRaises(KeyError, lambda: self._box['foo'])
247        self._box.discard(key0)
248        self.assertRaises(KeyError, lambda: self._box[key0])
249
250    def test_get_message(self):
251        # Get Message representations of messages
252        key0 = self._box.add(self._template % 0)
253        key1 = self._box.add(_sample_message)
254        msg0 = self._box.get_message(key0)
255        self.assertIsInstance(msg0, mailbox.Message)
256        self.assertEqual(msg0['from'], 'foo')
257        self.assertEqual(msg0.get_payload(), '0\n')
258        self._check_sample(self._box.get_message(key1))
259
260    def test_get_bytes(self):
261        # Get bytes representations of messages
262        key0 = self._box.add(self._template % 0)
263        key1 = self._box.add(_sample_message)
264        self.assertEqual(self._box.get_bytes(key0),
265            (self._template % 0).encode('ascii'))
266        self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message)
267
268    def test_get_string(self):
269        # Get string representations of messages
270        key0 = self._box.add(self._template % 0)
271        key1 = self._box.add(_sample_message)
272        self.assertEqual(self._box.get_string(key0), self._template % 0)
273        self.assertEqual(self._box.get_string(key1).split('\n'),
274                         _sample_message.split('\n'))
275
276    def test_get_file(self):
277        # Get file representations of messages
278        key0 = self._box.add(self._template % 0)
279        key1 = self._box.add(_sample_message)
280        with self._box.get_file(key0) as file:
281            data0 = file.read()
282        with self._box.get_file(key1) as file:
283            data1 = file.read()
284        self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'),
285                         self._template % 0)
286        self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'),
287                         _sample_message)
288
289    def test_get_file_can_be_closed_twice(self):
290        # Issue 11700
291        key = self._box.add(_sample_message)
292        f = self._box.get_file(key)
293        f.close()
294        f.close()
295
296    def test_iterkeys(self):
297        # Get keys using iterkeys()
298        self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False)
299
300    def test_keys(self):
301        # Get keys using keys()
302        self._check_iteration(self._box.keys, do_keys=True, do_values=False)
303
304    def test_itervalues(self):
305        # Get values using itervalues()
306        self._check_iteration(self._box.itervalues, do_keys=False,
307                              do_values=True)
308
309    def test_iter(self):
310        # Get values using __iter__()
311        self._check_iteration(self._box.__iter__, do_keys=False,
312                              do_values=True)
313
314    def test_values(self):
315        # Get values using values()
316        self._check_iteration(self._box.values, do_keys=False, do_values=True)
317
318    def test_iteritems(self):
319        # Get keys and values using iteritems()
320        self._check_iteration(self._box.iteritems, do_keys=True,
321                              do_values=True)
322
323    def test_items(self):
324        # Get keys and values using items()
325        self._check_iteration(self._box.items, do_keys=True, do_values=True)
326
327    def _check_iteration(self, method, do_keys, do_values, repetitions=10):
328        for value in method():
329            self.fail("Not empty")
330        keys, values = [], []
331        for i in range(repetitions):
332            keys.append(self._box.add(self._template % i))
333            values.append(self._template % i)
334        if do_keys and not do_values:
335            returned_keys = list(method())
336        elif do_values and not do_keys:
337            returned_values = list(method())
338        else:
339            returned_keys, returned_values = [], []
340            for key, value in method():
341                returned_keys.append(key)
342                returned_values.append(value)
343        if do_keys:
344            self.assertEqual(len(keys), len(returned_keys))
345            self.assertEqual(set(keys), set(returned_keys))
346        if do_values:
347            count = 0
348            for value in returned_values:
349                self.assertEqual(value['from'], 'foo')
350                self.assertLess(int(value.get_payload()), repetitions)
351                count += 1
352            self.assertEqual(len(values), count)
353
354    def test_contains(self):
355        # Check existence of keys using __contains__()
356        self.assertNotIn('foo', self._box)
357        key0 = self._box.add(self._template % 0)
358        self.assertIn(key0, self._box)
359        self.assertNotIn('foo', self._box)
360        key1 = self._box.add(self._template % 1)
361        self.assertIn(key1, self._box)
362        self.assertIn(key0, self._box)
363        self.assertNotIn('foo', self._box)
364        self._box.remove(key0)
365        self.assertNotIn(key0, self._box)
366        self.assertIn(key1, self._box)
367        self.assertNotIn('foo', self._box)
368        self._box.remove(key1)
369        self.assertNotIn(key1, self._box)
370        self.assertNotIn(key0, self._box)
371        self.assertNotIn('foo', self._box)
372
373    def test_len(self, repetitions=10):
374        # Get message count
375        keys = []
376        for i in range(repetitions):
377            self.assertEqual(len(self._box), i)
378            keys.append(self._box.add(self._template % i))
379            self.assertEqual(len(self._box), i + 1)
380        for i in range(repetitions):
381            self.assertEqual(len(self._box), repetitions - i)
382            self._box.remove(keys[i])
383            self.assertEqual(len(self._box), repetitions - i - 1)
384
385    def test_set_item(self):
386        # Modify messages using __setitem__()
387        key0 = self._box.add(self._template % 'original 0')
388        self.assertEqual(self._box.get_string(key0),
389                         self._template % 'original 0')
390        key1 = self._box.add(self._template % 'original 1')
391        self.assertEqual(self._box.get_string(key1),
392                         self._template % 'original 1')
393        self._box[key0] = self._template % 'changed 0'
394        self.assertEqual(self._box.get_string(key0),
395                         self._template % 'changed 0')
396        self._box[key1] = self._template % 'changed 1'
397        self.assertEqual(self._box.get_string(key1),
398                         self._template % 'changed 1')
399        self._box[key0] = _sample_message
400        self._check_sample(self._box[key0])
401        self._box[key1] = self._box[key0]
402        self._check_sample(self._box[key1])
403        self._box[key0] = self._template % 'original 0'
404        self.assertEqual(self._box.get_string(key0),
405                     self._template % 'original 0')
406        self._check_sample(self._box[key1])
407        self.assertRaises(KeyError,
408                          lambda: self._box.__setitem__('foo', 'bar'))
409        self.assertRaises(KeyError, lambda: self._box['foo'])
410        self.assertEqual(len(self._box), 2)
411
412    def test_clear(self, iterations=10):
413        # Remove all messages using clear()
414        keys = []
415        for i in range(iterations):
416            self._box.add(self._template % i)
417        for i, key in enumerate(keys):
418            self.assertEqual(self._box.get_string(key), self._template % i)
419        self._box.clear()
420        self.assertEqual(len(self._box), 0)
421        for i, key in enumerate(keys):
422            self.assertRaises(KeyError, lambda: self._box.get_string(key))
423
424    def test_pop(self):
425        # Get and remove a message using pop()
426        key0 = self._box.add(self._template % 0)
427        self.assertIn(key0, self._box)
428        key1 = self._box.add(self._template % 1)
429        self.assertIn(key1, self._box)
430        self.assertEqual(self._box.pop(key0).get_payload(), '0\n')
431        self.assertNotIn(key0, self._box)
432        self.assertIn(key1, self._box)
433        key2 = self._box.add(self._template % 2)
434        self.assertIn(key2, self._box)
435        self.assertEqual(self._box.pop(key2).get_payload(), '2\n')
436        self.assertNotIn(key2, self._box)
437        self.assertIn(key1, self._box)
438        self.assertEqual(self._box.pop(key1).get_payload(), '1\n')
439        self.assertNotIn(key1, self._box)
440        self.assertEqual(len(self._box), 0)
441
442    def test_popitem(self, iterations=10):
443        # Get and remove an arbitrary (key, message) using popitem()
444        keys = []
445        for i in range(10):
446            keys.append(self._box.add(self._template % i))
447        seen = []
448        for i in range(10):
449            key, msg = self._box.popitem()
450            self.assertIn(key, keys)
451            self.assertNotIn(key, seen)
452            seen.append(key)
453            self.assertEqual(int(msg.get_payload()), keys.index(key))
454        self.assertEqual(len(self._box), 0)
455        for key in keys:
456            self.assertRaises(KeyError, lambda: self._box[key])
457
458    def test_update(self):
459        # Modify multiple messages using update()
460        key0 = self._box.add(self._template % 'original 0')
461        key1 = self._box.add(self._template % 'original 1')
462        key2 = self._box.add(self._template % 'original 2')
463        self._box.update({key0: self._template % 'changed 0',
464                          key2: _sample_message})
465        self.assertEqual(len(self._box), 3)
466        self.assertEqual(self._box.get_string(key0),
467                     self._template % 'changed 0')
468        self.assertEqual(self._box.get_string(key1),
469                     self._template % 'original 1')
470        self._check_sample(self._box[key2])
471        self._box.update([(key2, self._template % 'changed 2'),
472                    (key1, self._template % 'changed 1'),
473                    (key0, self._template % 'original 0')])
474        self.assertEqual(len(self._box), 3)
475        self.assertEqual(self._box.get_string(key0),
476                     self._template % 'original 0')
477        self.assertEqual(self._box.get_string(key1),
478                     self._template % 'changed 1')
479        self.assertEqual(self._box.get_string(key2),
480                     self._template % 'changed 2')
481        self.assertRaises(KeyError,
482                          lambda: self._box.update({'foo': 'bar',
483                                          key0: self._template % "changed 0"}))
484        self.assertEqual(len(self._box), 3)
485        self.assertEqual(self._box.get_string(key0),
486                     self._template % "changed 0")
487        self.assertEqual(self._box.get_string(key1),
488                     self._template % "changed 1")
489        self.assertEqual(self._box.get_string(key2),
490                     self._template % "changed 2")
491
492    def test_flush(self):
493        # Write changes to disk
494        self._test_flush_or_close(self._box.flush, True)
495
496    def test_popitem_and_flush_twice(self):
497        # See #15036.
498        self._box.add(self._template % 0)
499        self._box.add(self._template % 1)
500        self._box.flush()
501
502        self._box.popitem()
503        self._box.flush()
504        self._box.popitem()
505        self._box.flush()
506
507    def test_lock_unlock(self):
508        # Lock and unlock the mailbox
509        self.assertFalse(os.path.exists(self._get_lock_path()))
510        self._box.lock()
511        self.assertTrue(os.path.exists(self._get_lock_path()))
512        self._box.unlock()
513        self.assertFalse(os.path.exists(self._get_lock_path()))
514
515    def test_close(self):
516        # Close mailbox and flush changes to disk
517        self._test_flush_or_close(self._box.close, False)
518
519    def _test_flush_or_close(self, method, should_call_close):
520        contents = [self._template % i for i in range(3)]
521        self._box.add(contents[0])
522        self._box.add(contents[1])
523        self._box.add(contents[2])
524        oldbox = self._box
525        method()
526        if should_call_close:
527            self._box.close()
528        self._box = self._factory(self._path)
529        keys = self._box.keys()
530        self.assertEqual(len(keys), 3)
531        for key in keys:
532            self.assertIn(self._box.get_string(key), contents)
533        oldbox.close()
534
535    def test_dump_message(self):
536        # Write message representations to disk
537        for input in (email.message_from_string(_sample_message),
538                      _sample_message, io.BytesIO(_bytes_sample_message)):
539            output = io.BytesIO()
540            self._box._dump_message(input, output)
541            self.assertEqual(output.getvalue(),
542                _bytes_sample_message.replace(b'\n', os.linesep.encode()))
543        output = io.BytesIO()
544        self.assertRaises(TypeError,
545                          lambda: self._box._dump_message(None, output))
546
547    def _get_lock_path(self):
548        # Return the path of the dot lock file. May be overridden.
549        return self._path + '.lock'
550
551
552class TestMailboxSuperclass(TestBase, unittest.TestCase):
553
554    def test_notimplemented(self):
555        # Test that all Mailbox methods raise NotImplementedException.
556        box = mailbox.Mailbox('path')
557        self.assertRaises(NotImplementedError, lambda: box.add(''))
558        self.assertRaises(NotImplementedError, lambda: box.remove(''))
559        self.assertRaises(NotImplementedError, lambda: box.__delitem__(''))
560        self.assertRaises(NotImplementedError, lambda: box.discard(''))
561        self.assertRaises(NotImplementedError, lambda: box.__setitem__('', ''))
562        self.assertRaises(NotImplementedError, lambda: box.iterkeys())
563        self.assertRaises(NotImplementedError, lambda: box.keys())
564        self.assertRaises(NotImplementedError, lambda: box.itervalues().__next__())
565        self.assertRaises(NotImplementedError, lambda: box.__iter__().__next__())
566        self.assertRaises(NotImplementedError, lambda: box.values())
567        self.assertRaises(NotImplementedError, lambda: box.iteritems().__next__())
568        self.assertRaises(NotImplementedError, lambda: box.items())
569        self.assertRaises(NotImplementedError, lambda: box.get(''))
570        self.assertRaises(NotImplementedError, lambda: box.__getitem__(''))
571        self.assertRaises(NotImplementedError, lambda: box.get_message(''))
572        self.assertRaises(NotImplementedError, lambda: box.get_string(''))
573        self.assertRaises(NotImplementedError, lambda: box.get_bytes(''))
574        self.assertRaises(NotImplementedError, lambda: box.get_file(''))
575        self.assertRaises(NotImplementedError, lambda: '' in box)
576        self.assertRaises(NotImplementedError, lambda: box.__contains__(''))
577        self.assertRaises(NotImplementedError, lambda: box.__len__())
578        self.assertRaises(NotImplementedError, lambda: box.clear())
579        self.assertRaises(NotImplementedError, lambda: box.pop(''))
580        self.assertRaises(NotImplementedError, lambda: box.popitem())
581        self.assertRaises(NotImplementedError, lambda: box.update((('', ''),)))
582        self.assertRaises(NotImplementedError, lambda: box.flush())
583        self.assertRaises(NotImplementedError, lambda: box.lock())
584        self.assertRaises(NotImplementedError, lambda: box.unlock())
585        self.assertRaises(NotImplementedError, lambda: box.close())
586
587
588class TestMaildir(TestMailbox, unittest.TestCase):
589
590    _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)
591
592    def setUp(self):
593        TestMailbox.setUp(self)
594        if (os.name == 'nt') or (sys.platform == 'cygwin'):
595            self._box.colon = '!'
596
597    def assertMailboxEmpty(self):
598        self.assertEqual(os.listdir(os.path.join(self._path, 'tmp')), [])
599
600    def test_add_MM(self):
601        # Add a MaildirMessage instance
602        msg = mailbox.MaildirMessage(self._template % 0)
603        msg.set_subdir('cur')
604        msg.set_info('foo')
605        key = self._box.add(msg)
606        self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' %
607                                                 (key, self._box.colon))))
608
609    def test_get_MM(self):
610        # Get a MaildirMessage instance
611        msg = mailbox.MaildirMessage(self._template % 0)
612        msg.set_subdir('cur')
613        msg.set_flags('RF')
614        key = self._box.add(msg)
615        msg_returned = self._box.get_message(key)
616        self.assertIsInstance(msg_returned, mailbox.MaildirMessage)
617        self.assertEqual(msg_returned.get_subdir(), 'cur')
618        self.assertEqual(msg_returned.get_flags(), 'FR')
619
620    def test_set_MM(self):
621        # Set with a MaildirMessage instance
622        msg0 = mailbox.MaildirMessage(self._template % 0)
623        msg0.set_flags('TP')
624        key = self._box.add(msg0)
625        msg_returned = self._box.get_message(key)
626        self.assertEqual(msg_returned.get_subdir(), 'new')
627        self.assertEqual(msg_returned.get_flags(), 'PT')
628        msg1 = mailbox.MaildirMessage(self._template % 1)
629        self._box[key] = msg1
630        msg_returned = self._box.get_message(key)
631        self.assertEqual(msg_returned.get_subdir(), 'new')
632        self.assertEqual(msg_returned.get_flags(), '')
633        self.assertEqual(msg_returned.get_payload(), '1\n')
634        msg2 = mailbox.MaildirMessage(self._template % 2)
635        msg2.set_info('2,S')
636        self._box[key] = msg2
637        self._box[key] = self._template % 3
638        msg_returned = self._box.get_message(key)
639        self.assertEqual(msg_returned.get_subdir(), 'new')
640        self.assertEqual(msg_returned.get_flags(), 'S')
641        self.assertEqual(msg_returned.get_payload(), '3\n')
642
643    def test_consistent_factory(self):
644        # Add a message.
645        msg = mailbox.MaildirMessage(self._template % 0)
646        msg.set_subdir('cur')
647        msg.set_flags('RF')
648        key = self._box.add(msg)
649
650        # Create new mailbox with
651        class FakeMessage(mailbox.MaildirMessage):
652            pass
653        box = mailbox.Maildir(self._path, factory=FakeMessage)
654        box.colon = self._box.colon
655        msg2 = box.get_message(key)
656        self.assertIsInstance(msg2, FakeMessage)
657
658    def test_initialize_new(self):
659        # Initialize a non-existent mailbox
660        self.tearDown()
661        self._box = mailbox.Maildir(self._path)
662        self._check_basics()
663        self._delete_recursively(self._path)
664        self._box = self._factory(self._path, factory=None)
665        self._check_basics()
666
667    def test_initialize_existing(self):
668        # Initialize an existing mailbox
669        self.tearDown()
670        for subdir in '', 'tmp', 'new', 'cur':
671            os.mkdir(os.path.normpath(os.path.join(self._path, subdir)))
672        self._box = mailbox.Maildir(self._path)
673        self._check_basics()
674
675    def _check_basics(self, factory=None):
676        # (Used by test_open_new() and test_open_existing().)
677        self.assertEqual(self._box._path, os.path.abspath(self._path))
678        self.assertEqual(self._box._factory, factory)
679        for subdir in '', 'tmp', 'new', 'cur':
680            path = os.path.join(self._path, subdir)
681            mode = os.stat(path)[stat.ST_MODE]
682            self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path)
683
684    def test_list_folders(self):
685        # List folders
686        self._box.add_folder('one')
687        self._box.add_folder('two')
688        self._box.add_folder('three')
689        self.assertEqual(len(self._box.list_folders()), 3)
690        self.assertEqual(set(self._box.list_folders()),
691                     set(('one', 'two', 'three')))
692
693    def test_get_folder(self):
694        # Open folders
695        self._box.add_folder('foo.bar')
696        folder0 = self._box.get_folder('foo.bar')
697        folder0.add(self._template % 'bar')
698        self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar')))
699        folder1 = self._box.get_folder('foo.bar')
700        self.assertEqual(folder1.get_string(folder1.keys()[0]),
701                         self._template % 'bar')
702
703    def test_add_and_remove_folders(self):
704        # Delete folders
705        self._box.add_folder('one')
706        self._box.add_folder('two')
707        self.assertEqual(len(self._box.list_folders()), 2)
708        self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))
709        self._box.remove_folder('one')
710        self.assertEqual(len(self._box.list_folders()), 1)
711        self.assertEqual(set(self._box.list_folders()), set(('two',)))
712        self._box.add_folder('three')
713        self.assertEqual(len(self._box.list_folders()), 2)
714        self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))
715        self._box.remove_folder('three')
716        self.assertEqual(len(self._box.list_folders()), 1)
717        self.assertEqual(set(self._box.list_folders()), set(('two',)))
718        self._box.remove_folder('two')
719        self.assertEqual(len(self._box.list_folders()), 0)
720        self.assertEqual(self._box.list_folders(), [])
721
722    def test_clean(self):
723        # Remove old files from 'tmp'
724        foo_path = os.path.join(self._path, 'tmp', 'foo')
725        bar_path = os.path.join(self._path, 'tmp', 'bar')
726        with open(foo_path, 'w') as f:
727            f.write("@")
728        with open(bar_path, 'w') as f:
729            f.write("@")
730        self._box.clean()
731        self.assertTrue(os.path.exists(foo_path))
732        self.assertTrue(os.path.exists(bar_path))
733        foo_stat = os.stat(foo_path)
734        os.utime(foo_path, (time.time() - 129600 - 2,
735                            foo_stat.st_mtime))
736        self._box.clean()
737        self.assertFalse(os.path.exists(foo_path))
738        self.assertTrue(os.path.exists(bar_path))
739
740    def test_create_tmp(self, repetitions=10):
741        # Create files in tmp directory
742        hostname = socket.gethostname()
743        if '/' in hostname:
744            hostname = hostname.replace('/', r'\057')
745        if ':' in hostname:
746            hostname = hostname.replace(':', r'\072')
747        pid = os.getpid()
748        pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)"
749                             r"Q(?P<Q>\d+)\.(?P<host>[^:/]*)")
750        previous_groups = None
751        for x in range(repetitions):
752            tmp_file = self._box._create_tmp()
753            head, tail = os.path.split(tmp_file.name)
754            self.assertEqual(head, os.path.abspath(os.path.join(self._path,
755                                                                "tmp")),
756                             "File in wrong location: '%s'" % head)
757            match = pattern.match(tail)
758            self.assertIsNotNone(match, "Invalid file name: '%s'" % tail)
759            groups = match.groups()
760            if previous_groups is not None:
761                self.assertGreaterEqual(int(groups[0]), int(previous_groups[0]),
762                             "Non-monotonic seconds: '%s' before '%s'" %
763                             (previous_groups[0], groups[0]))
764                if int(groups[0]) == int(previous_groups[0]):
765                    self.assertGreaterEqual(int(groups[1]), int(previous_groups[1]),
766                                "Non-monotonic milliseconds: '%s' before '%s'" %
767                                (previous_groups[1], groups[1]))
768                self.assertEqual(int(groups[2]), pid,
769                             "Process ID mismatch: '%s' should be '%s'" %
770                             (groups[2], pid))
771                self.assertEqual(int(groups[3]), int(previous_groups[3]) + 1,
772                             "Non-sequential counter: '%s' before '%s'" %
773                             (previous_groups[3], groups[3]))
774                self.assertEqual(groups[4], hostname,
775                             "Host name mismatch: '%s' should be '%s'" %
776                             (groups[4], hostname))
777            previous_groups = groups
778            tmp_file.write(_bytes_sample_message)
779            tmp_file.seek(0)
780            self.assertEqual(tmp_file.read(), _bytes_sample_message)
781            tmp_file.close()
782        file_count = len(os.listdir(os.path.join(self._path, "tmp")))
783        self.assertEqual(file_count, repetitions,
784                     "Wrong file count: '%s' should be '%s'" %
785                     (file_count, repetitions))
786
787    def test_refresh(self):
788        # Update the table of contents
789        self.assertEqual(self._box._toc, {})
790        key0 = self._box.add(self._template % 0)
791        key1 = self._box.add(self._template % 1)
792        self.assertEqual(self._box._toc, {})
793        self._box._refresh()
794        self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
795                                          key1: os.path.join('new', key1)})
796        key2 = self._box.add(self._template % 2)
797        self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
798                                          key1: os.path.join('new', key1)})
799        self._box._refresh()
800        self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
801                                          key1: os.path.join('new', key1),
802                                          key2: os.path.join('new', key2)})
803
804    def test_refresh_after_safety_period(self):
805        # Issue #13254: Call _refresh after the "file system safety
806        # period" of 2 seconds has passed; _toc should still be
807        # updated because this is the first call to _refresh.
808        key0 = self._box.add(self._template % 0)
809        key1 = self._box.add(self._template % 1)
810
811        self._box = self._factory(self._path)
812        self.assertEqual(self._box._toc, {})
813
814        # Emulate sleeping. Instead of sleeping for 2 seconds, use the
815        # skew factor to make _refresh think that the filesystem
816        # safety period has passed and re-reading the _toc is only
817        # required if mtimes differ.
818        self._box._skewfactor = -3
819
820        self._box._refresh()
821        self.assertEqual(sorted(self._box._toc.keys()), sorted([key0, key1]))
822
823    def test_lookup(self):
824        # Look up message subpaths in the TOC
825        self.assertRaises(KeyError, lambda: self._box._lookup('foo'))
826        key0 = self._box.add(self._template % 0)
827        self.assertEqual(self._box._lookup(key0), os.path.join('new', key0))
828        os.remove(os.path.join(self._path, 'new', key0))
829        self.assertEqual(self._box._toc, {key0: os.path.join('new', key0)})
830        # Be sure that the TOC is read back from disk (see issue #6896
831        # about bad mtime behaviour on some systems).
832        self._box.flush()
833        self.assertRaises(KeyError, lambda: self._box._lookup(key0))
834        self.assertEqual(self._box._toc, {})
835
836    def test_lock_unlock(self):
837        # Lock and unlock the mailbox. For Maildir, this does nothing.
838        self._box.lock()
839        self._box.unlock()
840
841    def test_folder (self):
842        # Test for bug #1569790: verify that folders returned by .get_folder()
843        # use the same factory function.
844        def dummy_factory (s):
845            return None
846        box = self._factory(self._path, factory=dummy_factory)
847        folder = box.add_folder('folder1')
848        self.assertIs(folder._factory, dummy_factory)
849
850        folder1_alias = box.get_folder('folder1')
851        self.assertIs(folder1_alias._factory, dummy_factory)
852
853    def test_directory_in_folder (self):
854        # Test that mailboxes still work if there's a stray extra directory
855        # in a folder.
856        for i in range(10):
857            self._box.add(mailbox.Message(_sample_message))
858
859        # Create a stray directory
860        os.mkdir(os.path.join(self._path, 'cur', 'stray-dir'))
861
862        # Check that looping still works with the directory present.
863        for msg in self._box:
864            pass
865
866    @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
867    def test_file_permissions(self):
868        # Verify that message files are created without execute permissions
869        msg = mailbox.MaildirMessage(self._template % 0)
870        orig_umask = os.umask(0)
871        try:
872            key = self._box.add(msg)
873        finally:
874            os.umask(orig_umask)
875        path = os.path.join(self._path, self._box._lookup(key))
876        mode = os.stat(path).st_mode
877        self.assertFalse(mode & 0o111)
878
879    @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
880    def test_folder_file_perms(self):
881        # From bug #3228, we want to verify that the file created inside a Maildir
882        # subfolder isn't marked as executable.
883        orig_umask = os.umask(0)
884        try:
885            subfolder = self._box.add_folder('subfolder')
886        finally:
887            os.umask(orig_umask)
888
889        path = os.path.join(subfolder._path, 'maildirfolder')
890        st = os.stat(path)
891        perms = st.st_mode
892        self.assertFalse((perms & 0o111)) # Execute bits should all be off.
893
894    def test_reread(self):
895        # Do an initial unconditional refresh
896        self._box._refresh()
897
898        # Put the last modified times more than two seconds into the past
899        # (because mtime may have a two second granularity)
900        for subdir in ('cur', 'new'):
901            os.utime(os.path.join(self._box._path, subdir),
902                     (time.time()-5,)*2)
903
904        # Because mtime has a two second granularity in worst case (FAT), a
905        # refresh is done unconditionally if called for within
906        # two-second-plus-a-bit of the last one, just in case the mbox has
907        # changed; so now we have to wait for that interval to expire.
908        #
909        # Because this is a test, emulate sleeping. Instead of
910        # sleeping for 2 seconds, use the skew factor to make _refresh
911        # think that 2 seconds have passed and re-reading the _toc is
912        # only required if mtimes differ.
913        self._box._skewfactor = -3
914
915        # Re-reading causes the ._toc attribute to be assigned a new dictionary
916        # object, so we'll check that the ._toc attribute isn't a different
917        # object.
918        orig_toc = self._box._toc
919        def refreshed():
920            return self._box._toc is not orig_toc
921
922        self._box._refresh()
923        self.assertFalse(refreshed())
924
925        # Now, write something into cur and remove it.  This changes
926        # the mtime and should cause a re-read. Note that "sleep
927        # emulation" is still in effect, as skewfactor is -3.
928        filename = os.path.join(self._path, 'cur', 'stray-file')
929        support.create_empty_file(filename)
930        os.unlink(filename)
931        self._box._refresh()
932        self.assertTrue(refreshed())
933
934
935class _TestSingleFile(TestMailbox):
936    '''Common tests for single-file mailboxes'''
937
938    def test_add_doesnt_rewrite(self):
939        # When only adding messages, flush() should not rewrite the
940        # mailbox file. See issue #9559.
941
942        # Inode number changes if the contents are written to another
943        # file which is then renamed over the original file. So we
944        # must check that the inode number doesn't change.
945        inode_before = os.stat(self._path).st_ino
946
947        self._box.add(self._template % 0)
948        self._box.flush()
949
950        inode_after = os.stat(self._path).st_ino
951        self.assertEqual(inode_before, inode_after)
952
953        # Make sure the message was really added
954        self._box.close()
955        self._box = self._factory(self._path)
956        self.assertEqual(len(self._box), 1)
957
958    def test_permissions_after_flush(self):
959        # See issue #5346
960
961        # Make the mailbox world writable. It's unlikely that the new
962        # mailbox file would have these permissions after flush(),
963        # because umask usually prevents it.
964        mode = os.stat(self._path).st_mode | 0o666
965        os.chmod(self._path, mode)
966
967        self._box.add(self._template % 0)
968        i = self._box.add(self._template % 1)
969        # Need to remove one message to make flush() create a new file
970        self._box.remove(i)
971        self._box.flush()
972
973        self.assertEqual(os.stat(self._path).st_mode, mode)
974
975
976class _TestMboxMMDF(_TestSingleFile):
977
978    def tearDown(self):
979        super().tearDown()
980        self._box.close()
981        self._delete_recursively(self._path)
982        for lock_remnant in glob.glob(glob.escape(self._path) + '.*'):
983            support.unlink(lock_remnant)
984
985    def assertMailboxEmpty(self):
986        with open(self._path) as f:
987            self.assertEqual(f.readlines(), [])
988
989    def test_get_bytes_from(self):
990        # Get bytes representations of messages with _unixfrom.
991        unixfrom = 'From foo@bar blah\n'
992        key0 = self._box.add(unixfrom + self._template % 0)
993        key1 = self._box.add(unixfrom + _sample_message)
994        self.assertEqual(self._box.get_bytes(key0, from_=False),
995            (self._template % 0).encode('ascii'))
996        self.assertEqual(self._box.get_bytes(key1, from_=False),
997            _bytes_sample_message)
998        self.assertEqual(self._box.get_bytes(key0, from_=True),
999            (unixfrom + self._template % 0).encode('ascii'))
1000        self.assertEqual(self._box.get_bytes(key1, from_=True),
1001            unixfrom.encode('ascii') + _bytes_sample_message)
1002
1003    def test_get_string_from(self):
1004        # Get string representations of messages with _unixfrom.
1005        unixfrom = 'From foo@bar blah\n'
1006        key0 = self._box.add(unixfrom + self._template % 0)
1007        key1 = self._box.add(unixfrom + _sample_message)
1008        self.assertEqual(self._box.get_string(key0, from_=False),
1009                         self._template % 0)
1010        self.assertEqual(self._box.get_string(key1, from_=False).split('\n'),
1011                         _sample_message.split('\n'))
1012        self.assertEqual(self._box.get_string(key0, from_=True),
1013                         unixfrom + self._template % 0)
1014        self.assertEqual(self._box.get_string(key1, from_=True).split('\n'),
1015                         (unixfrom + _sample_message).split('\n'))
1016
1017    def test_add_from_string(self):
1018        # Add a string starting with 'From ' to the mailbox
1019        key = self._box.add('From foo@bar blah\nFrom: foo\n\n0\n')
1020        self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
1021        self.assertEqual(self._box[key].get_payload(), '0\n')
1022
1023    def test_add_from_bytes(self):
1024        # Add a byte string starting with 'From ' to the mailbox
1025        key = self._box.add(b'From foo@bar blah\nFrom: foo\n\n0\n')
1026        self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
1027        self.assertEqual(self._box[key].get_payload(), '0\n')
1028
1029    def test_add_mbox_or_mmdf_message(self):
1030        # Add an mboxMessage or MMDFMessage
1031        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1032            msg = class_('From foo@bar blah\nFrom: foo\n\n0\n')
1033            key = self._box.add(msg)
1034
1035    def test_open_close_open(self):
1036        # Open and inspect previously-created mailbox
1037        values = [self._template % i for i in range(3)]
1038        for value in values:
1039            self._box.add(value)
1040        self._box.close()
1041        mtime = os.path.getmtime(self._path)
1042        self._box = self._factory(self._path)
1043        self.assertEqual(len(self._box), 3)
1044        for key in self._box.iterkeys():
1045            self.assertIn(self._box.get_string(key), values)
1046        self._box.close()
1047        self.assertEqual(mtime, os.path.getmtime(self._path))
1048
1049    def test_add_and_close(self):
1050        # Verifying that closing a mailbox doesn't change added items
1051        self._box.add(_sample_message)
1052        for i in range(3):
1053            self._box.add(self._template % i)
1054        self._box.add(_sample_message)
1055        self._box._file.flush()
1056        self._box._file.seek(0)
1057        contents = self._box._file.read()
1058        self._box.close()
1059        with open(self._path, 'rb') as f:
1060            self.assertEqual(contents, f.read())
1061        self._box = self._factory(self._path)
1062
1063    @unittest.skipUnless(hasattr(os, 'fork'), "Test needs fork().")
1064    @unittest.skipUnless(hasattr(socket, 'socketpair'), "Test needs socketpair().")
1065    def test_lock_conflict(self):
1066        # Fork off a child process that will lock the mailbox temporarily,
1067        # unlock it and exit.
1068        c, p = socket.socketpair()
1069        self.addCleanup(c.close)
1070        self.addCleanup(p.close)
1071
1072        pid = os.fork()
1073        if pid == 0:
1074            # child
1075            try:
1076                # lock the mailbox, and signal the parent it can proceed
1077                self._box.lock()
1078                c.send(b'c')
1079
1080                # wait until the parent is done, and unlock the mailbox
1081                c.recv(1)
1082                self._box.unlock()
1083            finally:
1084                os._exit(0)
1085
1086        # In the parent, wait until the child signals it locked the mailbox.
1087        p.recv(1)
1088        try:
1089            self.assertRaises(mailbox.ExternalClashError,
1090                              self._box.lock)
1091        finally:
1092            # Signal the child it can now release the lock and exit.
1093            p.send(b'p')
1094            # Wait for child to exit.  Locking should now succeed.
1095            exited_pid, status = os.waitpid(pid, 0)
1096
1097        self._box.lock()
1098        self._box.unlock()
1099
1100    def test_relock(self):
1101        # Test case for bug #1575506: the mailbox class was locking the
1102        # wrong file object in its flush() method.
1103        msg = "Subject: sub\n\nbody\n"
1104        key1 = self._box.add(msg)
1105        self._box.flush()
1106        self._box.close()
1107
1108        self._box = self._factory(self._path)
1109        self._box.lock()
1110        key2 = self._box.add(msg)
1111        self._box.flush()
1112        self.assertTrue(self._box._locked)
1113        self._box.close()
1114
1115
1116class TestMbox(_TestMboxMMDF, unittest.TestCase):
1117
1118    _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
1119
1120    @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
1121    def test_file_perms(self):
1122        # From bug #3228, we want to verify that the mailbox file isn't executable,
1123        # even if the umask is set to something that would leave executable bits set.
1124        # We only run this test on platforms that support umask.
1125        try:
1126            old_umask = os.umask(0o077)
1127            self._box.close()
1128            os.unlink(self._path)
1129            self._box = mailbox.mbox(self._path, create=True)
1130            self._box.add('')
1131            self._box.close()
1132        finally:
1133            os.umask(old_umask)
1134
1135        st = os.stat(self._path)
1136        perms = st.st_mode
1137        self.assertFalse((perms & 0o111)) # Execute bits should all be off.
1138
1139    def test_terminating_newline(self):
1140        message = email.message.Message()
1141        message['From'] = 'john@example.com'
1142        message.set_payload('No newline at the end')
1143        i = self._box.add(message)
1144
1145        # A newline should have been appended to the payload
1146        message = self._box.get(i)
1147        self.assertEqual(message.get_payload(), 'No newline at the end\n')
1148
1149    def test_message_separator(self):
1150        # Check there's always a single blank line after each message
1151        self._box.add('From: foo\n\n0')  # No newline at the end
1152        with open(self._path) as f:
1153            data = f.read()
1154            self.assertEqual(data[-3:], '0\n\n')
1155
1156        self._box.add('From: foo\n\n0\n')  # Newline at the end
1157        with open(self._path) as f:
1158            data = f.read()
1159            self.assertEqual(data[-3:], '0\n\n')
1160
1161
1162class TestMMDF(_TestMboxMMDF, unittest.TestCase):
1163
1164    _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)
1165
1166
1167class TestMH(TestMailbox, unittest.TestCase):
1168
1169    _factory = lambda self, path, factory=None: mailbox.MH(path, factory)
1170
1171    def assertMailboxEmpty(self):
1172        self.assertEqual(os.listdir(self._path), ['.mh_sequences'])
1173
1174    def test_list_folders(self):
1175        # List folders
1176        self._box.add_folder('one')
1177        self._box.add_folder('two')
1178        self._box.add_folder('three')
1179        self.assertEqual(len(self._box.list_folders()), 3)
1180        self.assertEqual(set(self._box.list_folders()),
1181                     set(('one', 'two', 'three')))
1182
1183    def test_get_folder(self):
1184        # Open folders
1185        def dummy_factory (s):
1186            return None
1187        self._box = self._factory(self._path, dummy_factory)
1188
1189        new_folder = self._box.add_folder('foo.bar')
1190        folder0 = self._box.get_folder('foo.bar')
1191        folder0.add(self._template % 'bar')
1192        self.assertTrue(os.path.isdir(os.path.join(self._path, 'foo.bar')))
1193        folder1 = self._box.get_folder('foo.bar')
1194        self.assertEqual(folder1.get_string(folder1.keys()[0]),
1195                         self._template % 'bar')
1196
1197        # Test for bug #1569790: verify that folders returned by .get_folder()
1198        # use the same factory function.
1199        self.assertIs(new_folder._factory, self._box._factory)
1200        self.assertIs(folder0._factory, self._box._factory)
1201
1202    def test_add_and_remove_folders(self):
1203        # Delete folders
1204        self._box.add_folder('one')
1205        self._box.add_folder('two')
1206        self.assertEqual(len(self._box.list_folders()), 2)
1207        self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))
1208        self._box.remove_folder('one')
1209        self.assertEqual(len(self._box.list_folders()), 1)
1210        self.assertEqual(set(self._box.list_folders()), set(('two',)))
1211        self._box.add_folder('three')
1212        self.assertEqual(len(self._box.list_folders()), 2)
1213        self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))
1214        self._box.remove_folder('three')
1215        self.assertEqual(len(self._box.list_folders()), 1)
1216        self.assertEqual(set(self._box.list_folders()), set(('two',)))
1217        self._box.remove_folder('two')
1218        self.assertEqual(len(self._box.list_folders()), 0)
1219        self.assertEqual(self._box.list_folders(), [])
1220
1221    def test_sequences(self):
1222        # Get and set sequences
1223        self.assertEqual(self._box.get_sequences(), {})
1224        msg0 = mailbox.MHMessage(self._template % 0)
1225        msg0.add_sequence('foo')
1226        key0 = self._box.add(msg0)
1227        self.assertEqual(self._box.get_sequences(), {'foo':[key0]})
1228        msg1 = mailbox.MHMessage(self._template % 1)
1229        msg1.set_sequences(['bar', 'replied', 'foo'])
1230        key1 = self._box.add(msg1)
1231        self.assertEqual(self._box.get_sequences(),
1232                     {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]})
1233        msg0.set_sequences(['flagged'])
1234        self._box[key0] = msg0
1235        self.assertEqual(self._box.get_sequences(),
1236                     {'foo':[key1], 'bar':[key1], 'replied':[key1],
1237                      'flagged':[key0]})
1238        self._box.remove(key1)
1239        self.assertEqual(self._box.get_sequences(), {'flagged':[key0]})
1240
1241    def test_issue2625(self):
1242        msg0 = mailbox.MHMessage(self._template % 0)
1243        msg0.add_sequence('foo')
1244        key0 = self._box.add(msg0)
1245        refmsg0 = self._box.get_message(key0)
1246
1247    def test_issue7627(self):
1248        msg0 = mailbox.MHMessage(self._template % 0)
1249        key0 = self._box.add(msg0)
1250        self._box.lock()
1251        self._box.remove(key0)
1252        self._box.unlock()
1253
1254    def test_pack(self):
1255        # Pack the contents of the mailbox
1256        msg0 = mailbox.MHMessage(self._template % 0)
1257        msg1 = mailbox.MHMessage(self._template % 1)
1258        msg2 = mailbox.MHMessage(self._template % 2)
1259        msg3 = mailbox.MHMessage(self._template % 3)
1260        msg0.set_sequences(['foo', 'unseen'])
1261        msg1.set_sequences(['foo'])
1262        msg2.set_sequences(['foo', 'flagged'])
1263        msg3.set_sequences(['foo', 'bar', 'replied'])
1264        key0 = self._box.add(msg0)
1265        key1 = self._box.add(msg1)
1266        key2 = self._box.add(msg2)
1267        key3 = self._box.add(msg3)
1268        self.assertEqual(self._box.get_sequences(),
1269                     {'foo':[key0,key1,key2,key3], 'unseen':[key0],
1270                      'flagged':[key2], 'bar':[key3], 'replied':[key3]})
1271        self._box.remove(key2)
1272        self.assertEqual(self._box.get_sequences(),
1273                     {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3],
1274                      'replied':[key3]})
1275        self._box.pack()
1276        self.assertEqual(self._box.keys(), [1, 2, 3])
1277        key0 = key0
1278        key1 = key0 + 1
1279        key2 = key1 + 1
1280        self.assertEqual(self._box.get_sequences(),
1281                     {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]})
1282
1283        # Test case for packing while holding the mailbox locked.
1284        key0 = self._box.add(msg1)
1285        key1 = self._box.add(msg1)
1286        key2 = self._box.add(msg1)
1287        key3 = self._box.add(msg1)
1288
1289        self._box.remove(key0)
1290        self._box.remove(key2)
1291        self._box.lock()
1292        self._box.pack()
1293        self._box.unlock()
1294        self.assertEqual(self._box.get_sequences(),
1295                     {'foo':[1, 2, 3, 4, 5],
1296                      'unseen':[1], 'bar':[3], 'replied':[3]})
1297
1298    def _get_lock_path(self):
1299        return os.path.join(self._path, '.mh_sequences.lock')
1300
1301
1302class TestBabyl(_TestSingleFile, unittest.TestCase):
1303
1304    _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
1305
1306    def assertMailboxEmpty(self):
1307        with open(self._path) as f:
1308            self.assertEqual(f.readlines(), [])
1309
1310    def tearDown(self):
1311        super().tearDown()
1312        self._box.close()
1313        self._delete_recursively(self._path)
1314        for lock_remnant in glob.glob(glob.escape(self._path) + '.*'):
1315            support.unlink(lock_remnant)
1316
1317    def test_labels(self):
1318        # Get labels from the mailbox
1319        self.assertEqual(self._box.get_labels(), [])
1320        msg0 = mailbox.BabylMessage(self._template % 0)
1321        msg0.add_label('foo')
1322        key0 = self._box.add(msg0)
1323        self.assertEqual(self._box.get_labels(), ['foo'])
1324        msg1 = mailbox.BabylMessage(self._template % 1)
1325        msg1.set_labels(['bar', 'answered', 'foo'])
1326        key1 = self._box.add(msg1)
1327        self.assertEqual(set(self._box.get_labels()), set(['foo', 'bar']))
1328        msg0.set_labels(['blah', 'filed'])
1329        self._box[key0] = msg0
1330        self.assertEqual(set(self._box.get_labels()),
1331                     set(['foo', 'bar', 'blah']))
1332        self._box.remove(key1)
1333        self.assertEqual(set(self._box.get_labels()), set(['blah']))
1334
1335
1336class FakeFileLikeObject:
1337
1338    def __init__(self):
1339        self.closed = False
1340
1341    def close(self):
1342        self.closed = True
1343
1344
1345class FakeMailBox(mailbox.Mailbox):
1346
1347    def __init__(self):
1348        mailbox.Mailbox.__init__(self, '', lambda file: None)
1349        self.files = [FakeFileLikeObject() for i in range(10)]
1350
1351    def get_file(self, key):
1352        return self.files[key]
1353
1354
1355class TestFakeMailBox(unittest.TestCase):
1356
1357    def test_closing_fd(self):
1358        box = FakeMailBox()
1359        for i in range(10):
1360            self.assertFalse(box.files[i].closed)
1361        for i in range(10):
1362            box[i]
1363        for i in range(10):
1364            self.assertTrue(box.files[i].closed)
1365
1366
1367class TestMessage(TestBase, unittest.TestCase):
1368
1369    _factory = mailbox.Message      # Overridden by subclasses to reuse tests
1370
1371    def setUp(self):
1372        self._path = support.TESTFN
1373
1374    def tearDown(self):
1375        self._delete_recursively(self._path)
1376
1377    def test_initialize_with_eMM(self):
1378        # Initialize based on email.message.Message instance
1379        eMM = email.message_from_string(_sample_message)
1380        msg = self._factory(eMM)
1381        self._post_initialize_hook(msg)
1382        self._check_sample(msg)
1383
1384    def test_initialize_with_string(self):
1385        # Initialize based on string
1386        msg = self._factory(_sample_message)
1387        self._post_initialize_hook(msg)
1388        self._check_sample(msg)
1389
1390    def test_initialize_with_file(self):
1391        # Initialize based on contents of file
1392        with open(self._path, 'w+') as f:
1393            f.write(_sample_message)
1394            f.seek(0)
1395            msg = self._factory(f)
1396            self._post_initialize_hook(msg)
1397            self._check_sample(msg)
1398
1399    def test_initialize_with_binary_file(self):
1400        # Initialize based on contents of binary file
1401        with open(self._path, 'wb+') as f:
1402            f.write(_bytes_sample_message)
1403            f.seek(0)
1404            msg = self._factory(f)
1405            self._post_initialize_hook(msg)
1406            self._check_sample(msg)
1407
1408    def test_initialize_with_nothing(self):
1409        # Initialize without arguments
1410        msg = self._factory()
1411        self._post_initialize_hook(msg)
1412        self.assertIsInstance(msg, email.message.Message)
1413        self.assertIsInstance(msg, mailbox.Message)
1414        self.assertIsInstance(msg, self._factory)
1415        self.assertEqual(msg.keys(), [])
1416        self.assertFalse(msg.is_multipart())
1417        self.assertIsNone(msg.get_payload())
1418
1419    def test_initialize_incorrectly(self):
1420        # Initialize with invalid argument
1421        self.assertRaises(TypeError, lambda: self._factory(object()))
1422
1423    def test_all_eMM_attribues_exist(self):
1424        # Issue 12537
1425        eMM = email.message_from_string(_sample_message)
1426        msg = self._factory(_sample_message)
1427        for attr in eMM.__dict__:
1428            self.assertIn(attr, msg.__dict__,
1429                '{} attribute does not exist'.format(attr))
1430
1431    def test_become_message(self):
1432        # Take on the state of another message
1433        eMM = email.message_from_string(_sample_message)
1434        msg = self._factory()
1435        msg._become_message(eMM)
1436        self._check_sample(msg)
1437
1438    def test_explain_to(self):
1439        # Copy self's format-specific data to other message formats.
1440        # This test is superficial; better ones are in TestMessageConversion.
1441        msg = self._factory()
1442        for class_ in self.all_mailbox_types:
1443            other_msg = class_()
1444            msg._explain_to(other_msg)
1445        other_msg = email.message.Message()
1446        self.assertRaises(TypeError, lambda: msg._explain_to(other_msg))
1447
1448    def _post_initialize_hook(self, msg):
1449        # Overridden by subclasses to check extra things after initialization
1450        pass
1451
1452
1453class TestMaildirMessage(TestMessage, unittest.TestCase):
1454
1455    _factory = mailbox.MaildirMessage
1456
1457    def _post_initialize_hook(self, msg):
1458        self.assertEqual(msg._subdir, 'new')
1459        self.assertEqual(msg._info, '')
1460
1461    def test_subdir(self):
1462        # Use get_subdir() and set_subdir()
1463        msg = mailbox.MaildirMessage(_sample_message)
1464        self.assertEqual(msg.get_subdir(), 'new')
1465        msg.set_subdir('cur')
1466        self.assertEqual(msg.get_subdir(), 'cur')
1467        msg.set_subdir('new')
1468        self.assertEqual(msg.get_subdir(), 'new')
1469        self.assertRaises(ValueError, lambda: msg.set_subdir('tmp'))
1470        self.assertEqual(msg.get_subdir(), 'new')
1471        msg.set_subdir('new')
1472        self.assertEqual(msg.get_subdir(), 'new')
1473        self._check_sample(msg)
1474
1475    def test_flags(self):
1476        # Use get_flags(), set_flags(), add_flag(), remove_flag()
1477        msg = mailbox.MaildirMessage(_sample_message)
1478        self.assertEqual(msg.get_flags(), '')
1479        self.assertEqual(msg.get_subdir(), 'new')
1480        msg.set_flags('F')
1481        self.assertEqual(msg.get_subdir(), 'new')
1482        self.assertEqual(msg.get_flags(), 'F')
1483        msg.set_flags('SDTP')
1484        self.assertEqual(msg.get_flags(), 'DPST')
1485        msg.add_flag('FT')
1486        self.assertEqual(msg.get_flags(), 'DFPST')
1487        msg.remove_flag('TDRP')
1488        self.assertEqual(msg.get_flags(), 'FS')
1489        self.assertEqual(msg.get_subdir(), 'new')
1490        self._check_sample(msg)
1491
1492    def test_date(self):
1493        # Use get_date() and set_date()
1494        msg = mailbox.MaildirMessage(_sample_message)
1495        self.assertLess(abs(msg.get_date() - time.time()), 60)
1496        msg.set_date(0.0)
1497        self.assertEqual(msg.get_date(), 0.0)
1498
1499    def test_info(self):
1500        # Use get_info() and set_info()
1501        msg = mailbox.MaildirMessage(_sample_message)
1502        self.assertEqual(msg.get_info(), '')
1503        msg.set_info('1,foo=bar')
1504        self.assertEqual(msg.get_info(), '1,foo=bar')
1505        self.assertRaises(TypeError, lambda: msg.set_info(None))
1506        self._check_sample(msg)
1507
1508    def test_info_and_flags(self):
1509        # Test interaction of info and flag methods
1510        msg = mailbox.MaildirMessage(_sample_message)
1511        self.assertEqual(msg.get_info(), '')
1512        msg.set_flags('SF')
1513        self.assertEqual(msg.get_flags(), 'FS')
1514        self.assertEqual(msg.get_info(), '2,FS')
1515        msg.set_info('1,')
1516        self.assertEqual(msg.get_flags(), '')
1517        self.assertEqual(msg.get_info(), '1,')
1518        msg.remove_flag('RPT')
1519        self.assertEqual(msg.get_flags(), '')
1520        self.assertEqual(msg.get_info(), '1,')
1521        msg.add_flag('D')
1522        self.assertEqual(msg.get_flags(), 'D')
1523        self.assertEqual(msg.get_info(), '2,D')
1524        self._check_sample(msg)
1525
1526
1527class _TestMboxMMDFMessage:
1528
1529    _factory = mailbox._mboxMMDFMessage
1530
1531    def _post_initialize_hook(self, msg):
1532        self._check_from(msg)
1533
1534    def test_initialize_with_unixfrom(self):
1535        # Initialize with a message that already has a _unixfrom attribute
1536        msg = mailbox.Message(_sample_message)
1537        msg.set_unixfrom('From foo@bar blah')
1538        msg = mailbox.mboxMessage(msg)
1539        self.assertEqual(msg.get_from(), 'foo@bar blah', msg.get_from())
1540
1541    def test_from(self):
1542        # Get and set "From " line
1543        msg = mailbox.mboxMessage(_sample_message)
1544        self._check_from(msg)
1545        msg.set_from('foo bar')
1546        self.assertEqual(msg.get_from(), 'foo bar')
1547        msg.set_from('foo@bar', True)
1548        self._check_from(msg, 'foo@bar')
1549        msg.set_from('blah@temp', time.localtime())
1550        self._check_from(msg, 'blah@temp')
1551
1552    def test_flags(self):
1553        # Use get_flags(), set_flags(), add_flag(), remove_flag()
1554        msg = mailbox.mboxMessage(_sample_message)
1555        self.assertEqual(msg.get_flags(), '')
1556        msg.set_flags('F')
1557        self.assertEqual(msg.get_flags(), 'F')
1558        msg.set_flags('XODR')
1559        self.assertEqual(msg.get_flags(), 'RODX')
1560        msg.add_flag('FA')
1561        self.assertEqual(msg.get_flags(), 'RODFAX')
1562        msg.remove_flag('FDXA')
1563        self.assertEqual(msg.get_flags(), 'RO')
1564        self._check_sample(msg)
1565
1566    def _check_from(self, msg, sender=None):
1567        # Check contents of "From " line
1568        if sender is None:
1569            sender = "MAILER-DAEMON"
1570        self.assertIsNotNone(re.match(
1571                sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:\d{2} \d{4}",
1572                msg.get_from()))
1573
1574
1575class TestMboxMessage(_TestMboxMMDFMessage, TestMessage):
1576
1577    _factory = mailbox.mboxMessage
1578
1579
1580class TestMHMessage(TestMessage, unittest.TestCase):
1581
1582    _factory = mailbox.MHMessage
1583
1584    def _post_initialize_hook(self, msg):
1585        self.assertEqual(msg._sequences, [])
1586
1587    def test_sequences(self):
1588        # Get, set, join, and leave sequences
1589        msg = mailbox.MHMessage(_sample_message)
1590        self.assertEqual(msg.get_sequences(), [])
1591        msg.set_sequences(['foobar'])
1592        self.assertEqual(msg.get_sequences(), ['foobar'])
1593        msg.set_sequences([])
1594        self.assertEqual(msg.get_sequences(), [])
1595        msg.add_sequence('unseen')
1596        self.assertEqual(msg.get_sequences(), ['unseen'])
1597        msg.add_sequence('flagged')
1598        self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])
1599        msg.add_sequence('flagged')
1600        self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])
1601        msg.remove_sequence('unseen')
1602        self.assertEqual(msg.get_sequences(), ['flagged'])
1603        msg.add_sequence('foobar')
1604        self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])
1605        msg.remove_sequence('replied')
1606        self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])
1607        msg.set_sequences(['foobar', 'replied'])
1608        self.assertEqual(msg.get_sequences(), ['foobar', 'replied'])
1609
1610
1611class TestBabylMessage(TestMessage, unittest.TestCase):
1612
1613    _factory = mailbox.BabylMessage
1614
1615    def _post_initialize_hook(self, msg):
1616        self.assertEqual(msg._labels, [])
1617
1618    def test_labels(self):
1619        # Get, set, join, and leave labels
1620        msg = mailbox.BabylMessage(_sample_message)
1621        self.assertEqual(msg.get_labels(), [])
1622        msg.set_labels(['foobar'])
1623        self.assertEqual(msg.get_labels(), ['foobar'])
1624        msg.set_labels([])
1625        self.assertEqual(msg.get_labels(), [])
1626        msg.add_label('filed')
1627        self.assertEqual(msg.get_labels(), ['filed'])
1628        msg.add_label('resent')
1629        self.assertEqual(msg.get_labels(), ['filed', 'resent'])
1630        msg.add_label('resent')
1631        self.assertEqual(msg.get_labels(), ['filed', 'resent'])
1632        msg.remove_label('filed')
1633        self.assertEqual(msg.get_labels(), ['resent'])
1634        msg.add_label('foobar')
1635        self.assertEqual(msg.get_labels(), ['resent', 'foobar'])
1636        msg.remove_label('unseen')
1637        self.assertEqual(msg.get_labels(), ['resent', 'foobar'])
1638        msg.set_labels(['foobar', 'answered'])
1639        self.assertEqual(msg.get_labels(), ['foobar', 'answered'])
1640
1641    def test_visible(self):
1642        # Get, set, and update visible headers
1643        msg = mailbox.BabylMessage(_sample_message)
1644        visible = msg.get_visible()
1645        self.assertEqual(visible.keys(), [])
1646        self.assertIsNone(visible.get_payload())
1647        visible['User-Agent'] = 'FooBar 1.0'
1648        visible['X-Whatever'] = 'Blah'
1649        self.assertEqual(msg.get_visible().keys(), [])
1650        msg.set_visible(visible)
1651        visible = msg.get_visible()
1652        self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
1653        self.assertEqual(visible['User-Agent'], 'FooBar 1.0')
1654        self.assertEqual(visible['X-Whatever'], 'Blah')
1655        self.assertIsNone(visible.get_payload())
1656        msg.update_visible()
1657        self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
1658        self.assertIsNone(visible.get_payload())
1659        visible = msg.get_visible()
1660        self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To',
1661                                          'Subject'])
1662        for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'):
1663            self.assertEqual(visible[header], msg[header])
1664
1665
1666class TestMMDFMessage(_TestMboxMMDFMessage, TestMessage):
1667
1668    _factory = mailbox.MMDFMessage
1669
1670
1671class TestMessageConversion(TestBase, unittest.TestCase):
1672
1673    def test_plain_to_x(self):
1674        # Convert Message to all formats
1675        for class_ in self.all_mailbox_types:
1676            msg_plain = mailbox.Message(_sample_message)
1677            msg = class_(msg_plain)
1678            self._check_sample(msg)
1679
1680    def test_x_to_plain(self):
1681        # Convert all formats to Message
1682        for class_ in self.all_mailbox_types:
1683            msg = class_(_sample_message)
1684            msg_plain = mailbox.Message(msg)
1685            self._check_sample(msg_plain)
1686
1687    def test_x_from_bytes(self):
1688        # Convert all formats to Message
1689        for class_ in self.all_mailbox_types:
1690            msg = class_(_bytes_sample_message)
1691            self._check_sample(msg)
1692
1693    def test_x_to_invalid(self):
1694        # Convert all formats to an invalid format
1695        for class_ in self.all_mailbox_types:
1696            self.assertRaises(TypeError, lambda: class_(False))
1697
1698    def test_type_specific_attributes_removed_on_conversion(self):
1699        reference = {class_: class_(_sample_message).__dict__
1700                        for class_ in self.all_mailbox_types}
1701        for class1 in self.all_mailbox_types:
1702            for class2 in self.all_mailbox_types:
1703                if class1 is class2:
1704                    continue
1705                source = class1(_sample_message)
1706                target = class2(source)
1707                type_specific = [a for a in reference[class1]
1708                                   if a not in reference[class2]]
1709                for attr in type_specific:
1710                    self.assertNotIn(attr, target.__dict__,
1711                        "while converting {} to {}".format(class1, class2))
1712
1713    def test_maildir_to_maildir(self):
1714        # Convert MaildirMessage to MaildirMessage
1715        msg_maildir = mailbox.MaildirMessage(_sample_message)
1716        msg_maildir.set_flags('DFPRST')
1717        msg_maildir.set_subdir('cur')
1718        date = msg_maildir.get_date()
1719        msg = mailbox.MaildirMessage(msg_maildir)
1720        self._check_sample(msg)
1721        self.assertEqual(msg.get_flags(), 'DFPRST')
1722        self.assertEqual(msg.get_subdir(), 'cur')
1723        self.assertEqual(msg.get_date(), date)
1724
1725    def test_maildir_to_mboxmmdf(self):
1726        # Convert MaildirMessage to mboxmessage and MMDFMessage
1727        pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'),
1728                 ('T', 'D'), ('DFPRST', 'RDFA'))
1729        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1730            msg_maildir = mailbox.MaildirMessage(_sample_message)
1731            msg_maildir.set_date(0.0)
1732            for setting, result in pairs:
1733                msg_maildir.set_flags(setting)
1734                msg = class_(msg_maildir)
1735                self.assertEqual(msg.get_flags(), result)
1736                self.assertEqual(msg.get_from(), 'MAILER-DAEMON %s' %
1737                             time.asctime(time.gmtime(0.0)))
1738            msg_maildir.set_subdir('cur')
1739            self.assertEqual(class_(msg_maildir).get_flags(), 'RODFA')
1740
1741    def test_maildir_to_mh(self):
1742        # Convert MaildirMessage to MHMessage
1743        msg_maildir = mailbox.MaildirMessage(_sample_message)
1744        pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']),
1745                 ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []),
1746                 ('T', ['unseen']), ('DFPRST', ['replied', 'flagged']))
1747        for setting, result in pairs:
1748            msg_maildir.set_flags(setting)
1749            self.assertEqual(mailbox.MHMessage(msg_maildir).get_sequences(),
1750                             result)
1751
1752    def test_maildir_to_babyl(self):
1753        # Convert MaildirMessage to Babyl
1754        msg_maildir = mailbox.MaildirMessage(_sample_message)
1755        pairs = (('D', ['unseen']), ('F', ['unseen']),
1756                 ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']),
1757                 ('S', []), ('T', ['unseen', 'deleted']),
1758                 ('DFPRST', ['deleted', 'answered', 'forwarded']))
1759        for setting, result in pairs:
1760            msg_maildir.set_flags(setting)
1761            self.assertEqual(mailbox.BabylMessage(msg_maildir).get_labels(),
1762                             result)
1763
1764    def test_mboxmmdf_to_maildir(self):
1765        # Convert mboxMessage and MMDFMessage to MaildirMessage
1766        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1767            msg_mboxMMDF = class_(_sample_message)
1768            msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0))
1769            pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'),
1770                     ('RODFA', 'FRST'))
1771            for setting, result in pairs:
1772                msg_mboxMMDF.set_flags(setting)
1773                msg = mailbox.MaildirMessage(msg_mboxMMDF)
1774                self.assertEqual(msg.get_flags(), result)
1775                self.assertEqual(msg.get_date(), 0.0)
1776            msg_mboxMMDF.set_flags('O')
1777            self.assertEqual(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir(),
1778                             'cur')
1779
1780    def test_mboxmmdf_to_mboxmmdf(self):
1781        # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage
1782        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1783            msg_mboxMMDF = class_(_sample_message)
1784            msg_mboxMMDF.set_flags('RODFA')
1785            msg_mboxMMDF.set_from('foo@bar')
1786            for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1787                msg2 = class2_(msg_mboxMMDF)
1788                self.assertEqual(msg2.get_flags(), 'RODFA')
1789                self.assertEqual(msg2.get_from(), 'foo@bar')
1790
1791    def test_mboxmmdf_to_mh(self):
1792        # Convert mboxMessage and MMDFMessage to MHMessage
1793        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1794            msg_mboxMMDF = class_(_sample_message)
1795            pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']),
1796                     ('F', ['unseen', 'flagged']),
1797                     ('A', ['unseen', 'replied']),
1798                     ('RODFA', ['replied', 'flagged']))
1799            for setting, result in pairs:
1800                msg_mboxMMDF.set_flags(setting)
1801                self.assertEqual(mailbox.MHMessage(msg_mboxMMDF).get_sequences(),
1802                                 result)
1803
1804    def test_mboxmmdf_to_babyl(self):
1805        # Convert mboxMessage and MMDFMessage to BabylMessage
1806        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1807            msg = class_(_sample_message)
1808            pairs = (('R', []), ('O', ['unseen']),
1809                     ('D', ['unseen', 'deleted']), ('F', ['unseen']),
1810                     ('A', ['unseen', 'answered']),
1811                     ('RODFA', ['deleted', 'answered']))
1812            for setting, result in pairs:
1813                msg.set_flags(setting)
1814                self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)
1815
1816    def test_mh_to_maildir(self):
1817        # Convert MHMessage to MaildirMessage
1818        pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS'))
1819        for setting, result in pairs:
1820            msg = mailbox.MHMessage(_sample_message)
1821            msg.add_sequence(setting)
1822            self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)
1823            self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1824        msg = mailbox.MHMessage(_sample_message)
1825        msg.add_sequence('unseen')
1826        msg.add_sequence('replied')
1827        msg.add_sequence('flagged')
1828        self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'FR')
1829        self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1830
1831    def test_mh_to_mboxmmdf(self):
1832        # Convert MHMessage to mboxMessage and MMDFMessage
1833        pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF'))
1834        for setting, result in pairs:
1835            msg = mailbox.MHMessage(_sample_message)
1836            msg.add_sequence(setting)
1837            for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1838                self.assertEqual(class_(msg).get_flags(), result)
1839        msg = mailbox.MHMessage(_sample_message)
1840        msg.add_sequence('unseen')
1841        msg.add_sequence('replied')
1842        msg.add_sequence('flagged')
1843        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1844            self.assertEqual(class_(msg).get_flags(), 'OFA')
1845
1846    def test_mh_to_mh(self):
1847        # Convert MHMessage to MHMessage
1848        msg = mailbox.MHMessage(_sample_message)
1849        msg.add_sequence('unseen')
1850        msg.add_sequence('replied')
1851        msg.add_sequence('flagged')
1852        self.assertEqual(mailbox.MHMessage(msg).get_sequences(),
1853                         ['unseen', 'replied', 'flagged'])
1854
1855    def test_mh_to_babyl(self):
1856        # Convert MHMessage to BabylMessage
1857        pairs = (('unseen', ['unseen']), ('replied', ['answered']),
1858                 ('flagged', []))
1859        for setting, result in pairs:
1860            msg = mailbox.MHMessage(_sample_message)
1861            msg.add_sequence(setting)
1862            self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)
1863        msg = mailbox.MHMessage(_sample_message)
1864        msg.add_sequence('unseen')
1865        msg.add_sequence('replied')
1866        msg.add_sequence('flagged')
1867        self.assertEqual(mailbox.BabylMessage(msg).get_labels(),
1868                         ['unseen', 'answered'])
1869
1870    def test_babyl_to_maildir(self):
1871        # Convert BabylMessage to MaildirMessage
1872        pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'),
1873                 ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'),
1874                 ('resent', 'PS'))
1875        for setting, result in pairs:
1876            msg = mailbox.BabylMessage(_sample_message)
1877            msg.add_label(setting)
1878            self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)
1879            self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1880        msg = mailbox.BabylMessage(_sample_message)
1881        for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1882                      'edited', 'resent'):
1883            msg.add_label(label)
1884        self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'PRT')
1885        self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1886
1887    def test_babyl_to_mboxmmdf(self):
1888        # Convert BabylMessage to mboxMessage and MMDFMessage
1889        pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'),
1890                 ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'),
1891                 ('resent', 'RO'))
1892        for setting, result in pairs:
1893            for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1894                msg = mailbox.BabylMessage(_sample_message)
1895                msg.add_label(setting)
1896                self.assertEqual(class_(msg).get_flags(), result)
1897        msg = mailbox.BabylMessage(_sample_message)
1898        for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1899                      'edited', 'resent'):
1900            msg.add_label(label)
1901        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1902            self.assertEqual(class_(msg).get_flags(), 'ODA')
1903
1904    def test_babyl_to_mh(self):
1905        # Convert BabylMessage to MHMessage
1906        pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []),
1907                 ('answered', ['replied']), ('forwarded', []), ('edited', []),
1908                 ('resent', []))
1909        for setting, result in pairs:
1910            msg = mailbox.BabylMessage(_sample_message)
1911            msg.add_label(setting)
1912            self.assertEqual(mailbox.MHMessage(msg).get_sequences(), result)
1913        msg = mailbox.BabylMessage(_sample_message)
1914        for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1915                      'edited', 'resent'):
1916            msg.add_label(label)
1917        self.assertEqual(mailbox.MHMessage(msg).get_sequences(),
1918                         ['unseen', 'replied'])
1919
1920    def test_babyl_to_babyl(self):
1921        # Convert BabylMessage to BabylMessage
1922        msg = mailbox.BabylMessage(_sample_message)
1923        msg.update_visible()
1924        for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1925                      'edited', 'resent'):
1926            msg.add_label(label)
1927        msg2 = mailbox.BabylMessage(msg)
1928        self.assertEqual(msg2.get_labels(), ['unseen', 'deleted', 'filed',
1929                                             'answered', 'forwarded', 'edited',
1930                                             'resent'])
1931        self.assertEqual(msg.get_visible().keys(), msg2.get_visible().keys())
1932        for key in msg.get_visible().keys():
1933            self.assertEqual(msg.get_visible()[key], msg2.get_visible()[key])
1934
1935
1936class TestProxyFileBase(TestBase):
1937
1938    def _test_read(self, proxy):
1939        # Read by byte
1940        proxy.seek(0)
1941        self.assertEqual(proxy.read(), b'bar')
1942        proxy.seek(1)
1943        self.assertEqual(proxy.read(), b'ar')
1944        proxy.seek(0)
1945        self.assertEqual(proxy.read(2), b'ba')
1946        proxy.seek(1)
1947        self.assertEqual(proxy.read(-1), b'ar')
1948        proxy.seek(2)
1949        self.assertEqual(proxy.read(1000), b'r')
1950
1951    def _test_readline(self, proxy):
1952        # Read by line
1953        linesep = os.linesep.encode()
1954        proxy.seek(0)
1955        self.assertEqual(proxy.readline(), b'foo' + linesep)
1956        self.assertEqual(proxy.readline(), b'bar' + linesep)
1957        self.assertEqual(proxy.readline(), b'fred' + linesep)
1958        self.assertEqual(proxy.readline(), b'bob')
1959        proxy.seek(2)
1960        self.assertEqual(proxy.readline(), b'o' + linesep)
1961        proxy.seek(6 + 2 * len(os.linesep))
1962        self.assertEqual(proxy.readline(), b'fred' + linesep)
1963        proxy.seek(6 + 2 * len(os.linesep))
1964        self.assertEqual(proxy.readline(2), b'fr')
1965        self.assertEqual(proxy.readline(-10), b'ed' + linesep)
1966
1967    def _test_readlines(self, proxy):
1968        # Read multiple lines
1969        linesep = os.linesep.encode()
1970        proxy.seek(0)
1971        self.assertEqual(proxy.readlines(), [b'foo' + linesep,
1972                                           b'bar' + linesep,
1973                                           b'fred' + linesep, b'bob'])
1974        proxy.seek(0)
1975        self.assertEqual(proxy.readlines(2), [b'foo' + linesep])
1976        proxy.seek(3 + len(linesep))
1977        self.assertEqual(proxy.readlines(4 + len(linesep)),
1978                     [b'bar' + linesep, b'fred' + linesep])
1979        proxy.seek(3)
1980        self.assertEqual(proxy.readlines(1000), [linesep, b'bar' + linesep,
1981                                               b'fred' + linesep, b'bob'])
1982
1983    def _test_iteration(self, proxy):
1984        # Iterate by line
1985        linesep = os.linesep.encode()
1986        proxy.seek(0)
1987        iterator = iter(proxy)
1988        self.assertEqual(next(iterator), b'foo' + linesep)
1989        self.assertEqual(next(iterator), b'bar' + linesep)
1990        self.assertEqual(next(iterator), b'fred' + linesep)
1991        self.assertEqual(next(iterator), b'bob')
1992        self.assertRaises(StopIteration, next, iterator)
1993
1994    def _test_seek_and_tell(self, proxy):
1995        # Seek and use tell to check position
1996        linesep = os.linesep.encode()
1997        proxy.seek(3)
1998        self.assertEqual(proxy.tell(), 3)
1999        self.assertEqual(proxy.read(len(linesep)), linesep)
2000        proxy.seek(2, 1)
2001        self.assertEqual(proxy.read(1 + len(linesep)), b'r' + linesep)
2002        proxy.seek(-3 - len(linesep), 2)
2003        self.assertEqual(proxy.read(3), b'bar')
2004        proxy.seek(2, 0)
2005        self.assertEqual(proxy.read(), b'o' + linesep + b'bar' + linesep)
2006        proxy.seek(100)
2007        self.assertFalse(proxy.read())
2008
2009    def _test_close(self, proxy):
2010        # Close a file
2011        self.assertFalse(proxy.closed)
2012        proxy.close()
2013        self.assertTrue(proxy.closed)
2014        # Issue 11700 subsequent closes should be a no-op.
2015        proxy.close()
2016        self.assertTrue(proxy.closed)
2017
2018
2019class TestProxyFile(TestProxyFileBase, unittest.TestCase):
2020
2021    def setUp(self):
2022        self._path = support.TESTFN
2023        self._file = open(self._path, 'wb+')
2024
2025    def tearDown(self):
2026        self._file.close()
2027        self._delete_recursively(self._path)
2028
2029    def test_initialize(self):
2030        # Initialize and check position
2031        self._file.write(b'foo')
2032        pos = self._file.tell()
2033        proxy0 = mailbox._ProxyFile(self._file)
2034        self.assertEqual(proxy0.tell(), pos)
2035        self.assertEqual(self._file.tell(), pos)
2036        proxy1 = mailbox._ProxyFile(self._file, 0)
2037        self.assertEqual(proxy1.tell(), 0)
2038        self.assertEqual(self._file.tell(), pos)
2039
2040    def test_read(self):
2041        self._file.write(b'bar')
2042        self._test_read(mailbox._ProxyFile(self._file))
2043
2044    def test_readline(self):
2045        self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
2046                                                  os.linesep), 'ascii'))
2047        self._test_readline(mailbox._ProxyFile(self._file))
2048
2049    def test_readlines(self):
2050        self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
2051                                                  os.linesep), 'ascii'))
2052        self._test_readlines(mailbox._ProxyFile(self._file))
2053
2054    def test_iteration(self):
2055        self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
2056                                                  os.linesep), 'ascii'))
2057        self._test_iteration(mailbox._ProxyFile(self._file))
2058
2059    def test_seek_and_tell(self):
2060        self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii'))
2061        self._test_seek_and_tell(mailbox._ProxyFile(self._file))
2062
2063    def test_close(self):
2064        self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii'))
2065        self._test_close(mailbox._ProxyFile(self._file))
2066
2067
2068class TestPartialFile(TestProxyFileBase, unittest.TestCase):
2069
2070    def setUp(self):
2071        self._path = support.TESTFN
2072        self._file = open(self._path, 'wb+')
2073
2074    def tearDown(self):
2075        self._file.close()
2076        self._delete_recursively(self._path)
2077
2078    def test_initialize(self):
2079        # Initialize and check position
2080        self._file.write(bytes('foo' + os.linesep + 'bar', 'ascii'))
2081        pos = self._file.tell()
2082        proxy = mailbox._PartialFile(self._file, 2, 5)
2083        self.assertEqual(proxy.tell(), 0)
2084        self.assertEqual(self._file.tell(), pos)
2085
2086    def test_read(self):
2087        self._file.write(bytes('***bar***', 'ascii'))
2088        self._test_read(mailbox._PartialFile(self._file, 3, 6))
2089
2090    def test_readline(self):
2091        self._file.write(bytes('!!!!!foo%sbar%sfred%sbob!!!!!' %
2092                         (os.linesep, os.linesep, os.linesep), 'ascii'))
2093        self._test_readline(mailbox._PartialFile(self._file, 5,
2094                                                 18 + 3 * len(os.linesep)))
2095
2096    def test_readlines(self):
2097        self._file.write(bytes('foo%sbar%sfred%sbob?????' %
2098                         (os.linesep, os.linesep, os.linesep), 'ascii'))
2099        self._test_readlines(mailbox._PartialFile(self._file, 0,
2100                                                  13 + 3 * len(os.linesep)))
2101
2102    def test_iteration(self):
2103        self._file.write(bytes('____foo%sbar%sfred%sbob####' %
2104                         (os.linesep, os.linesep, os.linesep), 'ascii'))
2105        self._test_iteration(mailbox._PartialFile(self._file, 4,
2106                                                  17 + 3 * len(os.linesep)))
2107
2108    def test_seek_and_tell(self):
2109        self._file.write(bytes('(((foo%sbar%s$$$' % (os.linesep, os.linesep), 'ascii'))
2110        self._test_seek_and_tell(mailbox._PartialFile(self._file, 3,
2111                                                      9 + 2 * len(os.linesep)))
2112
2113    def test_close(self):
2114        self._file.write(bytes('&foo%sbar%s^' % (os.linesep, os.linesep), 'ascii'))
2115        self._test_close(mailbox._PartialFile(self._file, 1,
2116                                              6 + 3 * len(os.linesep)))
2117
2118
2119## Start: tests from the original module (for backward compatibility).
2120
2121FROM_ = "From some.body@dummy.domain  Sat Jul 24 13:43:35 2004\n"
2122DUMMY_MESSAGE = """\
2123From: some.body@dummy.domain
2124To: me@my.domain
2125Subject: Simple Test
2126
2127This is a dummy message.
2128"""
2129
2130class MaildirTestCase(unittest.TestCase):
2131
2132    def setUp(self):
2133        # create a new maildir mailbox to work with:
2134        self._dir = support.TESTFN
2135        if os.path.isdir(self._dir):
2136            support.rmtree(self._dir)
2137        elif os.path.isfile(self._dir):
2138            support.unlink(self._dir)
2139        os.mkdir(self._dir)
2140        os.mkdir(os.path.join(self._dir, "cur"))
2141        os.mkdir(os.path.join(self._dir, "tmp"))
2142        os.mkdir(os.path.join(self._dir, "new"))
2143        self._counter = 1
2144        self._msgfiles = []
2145
2146    def tearDown(self):
2147        list(map(os.unlink, self._msgfiles))
2148        support.rmdir(os.path.join(self._dir, "cur"))
2149        support.rmdir(os.path.join(self._dir, "tmp"))
2150        support.rmdir(os.path.join(self._dir, "new"))
2151        support.rmdir(self._dir)
2152
2153    def createMessage(self, dir, mbox=False):
2154        t = int(time.time() % 1000000)
2155        pid = self._counter
2156        self._counter += 1
2157        filename = ".".join((str(t), str(pid), "myhostname", "mydomain"))
2158        tmpname = os.path.join(self._dir, "tmp", filename)
2159        newname = os.path.join(self._dir, dir, filename)
2160        with open(tmpname, "w") as fp:
2161            self._msgfiles.append(tmpname)
2162            if mbox:
2163                fp.write(FROM_)
2164            fp.write(DUMMY_MESSAGE)
2165        try:
2166            os.link(tmpname, newname)
2167        except (AttributeError, PermissionError):
2168            with open(newname, "w") as fp:
2169                fp.write(DUMMY_MESSAGE)
2170        self._msgfiles.append(newname)
2171        return tmpname
2172
2173    def test_empty_maildir(self):
2174        """Test an empty maildir mailbox"""
2175        # Test for regression on bug #117490:
2176        # Make sure the boxes attribute actually gets set.
2177        self.mbox = mailbox.Maildir(support.TESTFN)
2178        #self.assertTrue(hasattr(self.mbox, "boxes"))
2179        #self.assertEqual(len(self.mbox.boxes), 0)
2180        self.assertIsNone(self.mbox.next())
2181        self.assertIsNone(self.mbox.next())
2182
2183    def test_nonempty_maildir_cur(self):
2184        self.createMessage("cur")
2185        self.mbox = mailbox.Maildir(support.TESTFN)
2186        #self.assertEqual(len(self.mbox.boxes), 1)
2187        self.assertIsNotNone(self.mbox.next())
2188        self.assertIsNone(self.mbox.next())
2189        self.assertIsNone(self.mbox.next())
2190
2191    def test_nonempty_maildir_new(self):
2192        self.createMessage("new")
2193        self.mbox = mailbox.Maildir(support.TESTFN)
2194        #self.assertEqual(len(self.mbox.boxes), 1)
2195        self.assertIsNotNone(self.mbox.next())
2196        self.assertIsNone(self.mbox.next())
2197        self.assertIsNone(self.mbox.next())
2198
2199    def test_nonempty_maildir_both(self):
2200        self.createMessage("cur")
2201        self.createMessage("new")
2202        self.mbox = mailbox.Maildir(support.TESTFN)
2203        #self.assertEqual(len(self.mbox.boxes), 2)
2204        self.assertIsNotNone(self.mbox.next())
2205        self.assertIsNotNone(self.mbox.next())
2206        self.assertIsNone(self.mbox.next())
2207        self.assertIsNone(self.mbox.next())
2208
2209## End: tests from the original module (for backward compatibility).
2210
2211
2212_sample_message = """\
2213Return-Path: <gkj@gregorykjohnson.com>
2214X-Original-To: gkj+person@localhost
2215Delivered-To: gkj+person@localhost
2216Received: from localhost (localhost [127.0.0.1])
2217        by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
2218        for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
2219Delivered-To: gkj@sundance.gregorykjohnson.com
2220Received: from localhost [127.0.0.1]
2221        by localhost with POP3 (fetchmail-6.2.5)
2222        for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
2223Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
2224        by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
2225        for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
2226Received: by andy.gregorykjohnson.com (Postfix, from userid 1000)
2227        id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
2228Date: Wed, 13 Jul 2005 17:23:11 -0400
2229From: "Gregory K. Johnson" <gkj@gregorykjohnson.com>
2230To: gkj@gregorykjohnson.com
2231Subject: Sample message
2232Message-ID: <20050713212311.GC4701@andy.gregorykjohnson.com>
2233Mime-Version: 1.0
2234Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+"
2235Content-Disposition: inline
2236User-Agent: Mutt/1.5.9i
2237
2238
2239--NMuMz9nt05w80d4+
2240Content-Type: text/plain; charset=us-ascii
2241Content-Disposition: inline
2242
2243This is a sample message.
2244
2245--
2246Gregory K. Johnson
2247
2248--NMuMz9nt05w80d4+
2249Content-Type: application/octet-stream
2250Content-Disposition: attachment; filename="text.gz"
2251Content-Transfer-Encoding: base64
2252
2253H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
22543FYlAAAA
2255
2256--NMuMz9nt05w80d4+--
2257"""
2258
2259_bytes_sample_message = _sample_message.encode('ascii')
2260
2261_sample_headers = {
2262    "Return-Path":"<gkj@gregorykjohnson.com>",
2263    "X-Original-To":"gkj+person@localhost",
2264    "Delivered-To":"gkj+person@localhost",
2265    "Received":"""from localhost (localhost [127.0.0.1])
2266        by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
2267        for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
2268    "Delivered-To":"gkj@sundance.gregorykjohnson.com",
2269    "Received":"""from localhost [127.0.0.1]
2270        by localhost with POP3 (fetchmail-6.2.5)
2271        for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
2272    "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
2273        by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
2274        for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
2275    "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000)
2276        id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
2277    "Date":"Wed, 13 Jul 2005 17:23:11 -0400",
2278    "From":""""Gregory K. Johnson" <gkj@gregorykjohnson.com>""",
2279    "To":"gkj@gregorykjohnson.com",
2280    "Subject":"Sample message",
2281    "Mime-Version":"1.0",
2282    "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""",
2283    "Content-Disposition":"inline",
2284    "User-Agent": "Mutt/1.5.9i" }
2285
2286_sample_payloads = ("""This is a sample message.
2287
2288--
2289Gregory K. Johnson
2290""",
2291"""H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
22923FYlAAAA
2293""")
2294
2295
2296class MiscTestCase(unittest.TestCase):
2297    def test__all__(self):
2298        blacklist = {"linesep", "fcntl"}
2299        support.check__all__(self, mailbox, blacklist=blacklist)
2300
2301
2302def test_main():
2303    tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH,
2304             TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage,
2305             TestMHMessage, TestBabylMessage, TestMMDFMessage,
2306             TestMessageConversion, TestProxyFile, TestPartialFile,
2307             MaildirTestCase, TestFakeMailBox, MiscTestCase)
2308    support.run_unittest(*tests)
2309    support.reap_children()
2310
2311
2312if __name__ == '__main__':
2313    test_main()
2314