• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11#     (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
22from __future__ import print_function
23from __future__ import unicode_literals
24
25import os
26import sys
27import time
28import array
29import random
30import unittest
31import weakref
32import abc
33import signal
34import errno
35from itertools import cycle, count
36from collections import deque
37from test import test_support as support
38
39import codecs
40import io  # C implementation of io
41import _pyio as pyio # Python implementation of io
42try:
43    import threading
44except ImportError:
45    threading = None
46
47__metaclass__ = type
48bytes = support.py3k_bytes
49
50def _default_chunk_size():
51    """Get the default TextIOWrapper chunk size"""
52    with io.open(__file__, "r", encoding="latin1") as f:
53        return f._CHUNK_SIZE
54
55
56class MockRawIOWithoutRead:
57    """A RawIO implementation without read(), so as to exercise the default
58    RawIO.read() which calls readinto()."""
59
60    def __init__(self, read_stack=()):
61        self._read_stack = list(read_stack)
62        self._write_stack = []
63        self._reads = 0
64        self._extraneous_reads = 0
65
66    def write(self, b):
67        self._write_stack.append(bytes(b))
68        return len(b)
69
70    def writable(self):
71        return True
72
73    def fileno(self):
74        return 42
75
76    def readable(self):
77        return True
78
79    def seekable(self):
80        return True
81
82    def seek(self, pos, whence):
83        return 0   # wrong but we gotta return something
84
85    def tell(self):
86        return 0   # same comment as above
87
88    def readinto(self, buf):
89        self._reads += 1
90        max_len = len(buf)
91        try:
92            data = self._read_stack[0]
93        except IndexError:
94            self._extraneous_reads += 1
95            return 0
96        if data is None:
97            del self._read_stack[0]
98            return None
99        n = len(data)
100        if len(data) <= max_len:
101            del self._read_stack[0]
102            buf[:n] = data
103            return n
104        else:
105            buf[:] = data[:max_len]
106            self._read_stack[0] = data[max_len:]
107            return max_len
108
109    def truncate(self, pos=None):
110        return pos
111
112class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
113    pass
114
115class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
116    pass
117
118
119class MockRawIO(MockRawIOWithoutRead):
120
121    def read(self, n=None):
122        self._reads += 1
123        try:
124            return self._read_stack.pop(0)
125        except:
126            self._extraneous_reads += 1
127            return b""
128
129class CMockRawIO(MockRawIO, io.RawIOBase):
130    pass
131
132class PyMockRawIO(MockRawIO, pyio.RawIOBase):
133    pass
134
135
136class MisbehavedRawIO(MockRawIO):
137    def write(self, b):
138        return MockRawIO.write(self, b) * 2
139
140    def read(self, n=None):
141        return MockRawIO.read(self, n) * 2
142
143    def seek(self, pos, whence):
144        return -123
145
146    def tell(self):
147        return -456
148
149    def readinto(self, buf):
150        MockRawIO.readinto(self, buf)
151        return len(buf) * 5
152
153class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
154    pass
155
156class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
157    pass
158
159
160class CloseFailureIO(MockRawIO):
161    closed = 0
162
163    def close(self):
164        if not self.closed:
165            self.closed = 1
166            raise IOError
167
168class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
169    pass
170
171class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
172    pass
173
174
175class MockFileIO:
176
177    def __init__(self, data):
178        self.read_history = []
179        super(MockFileIO, self).__init__(data)
180
181    def read(self, n=None):
182        res = super(MockFileIO, self).read(n)
183        self.read_history.append(None if res is None else len(res))
184        return res
185
186    def readinto(self, b):
187        res = super(MockFileIO, self).readinto(b)
188        self.read_history.append(res)
189        return res
190
191class CMockFileIO(MockFileIO, io.BytesIO):
192    pass
193
194class PyMockFileIO(MockFileIO, pyio.BytesIO):
195    pass
196
197
198class MockNonBlockWriterIO:
199
200    def __init__(self):
201        self._write_stack = []
202        self._blocker_char = None
203
204    def pop_written(self):
205        s = b"".join(self._write_stack)
206        self._write_stack[:] = []
207        return s
208
209    def block_on(self, char):
210        """Block when a given char is encountered."""
211        self._blocker_char = char
212
213    def readable(self):
214        return True
215
216    def seekable(self):
217        return True
218
219    def writable(self):
220        return True
221
222    def write(self, b):
223        b = bytes(b)
224        n = -1
225        if self._blocker_char:
226            try:
227                n = b.index(self._blocker_char)
228            except ValueError:
229                pass
230            else:
231                self._blocker_char = None
232                self._write_stack.append(b[:n])
233                raise self.BlockingIOError(0, "test blocking", n)
234        self._write_stack.append(b)
235        return len(b)
236
237class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
238    BlockingIOError = io.BlockingIOError
239
240class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
241    BlockingIOError = pyio.BlockingIOError
242
243
244class IOTest(unittest.TestCase):
245
246    def setUp(self):
247        support.unlink(support.TESTFN)
248
249    def tearDown(self):
250        support.unlink(support.TESTFN)
251
252    def write_ops(self, f):
253        self.assertEqual(f.write(b"blah."), 5)
254        f.truncate(0)
255        self.assertEqual(f.tell(), 5)
256        f.seek(0)
257
258        self.assertEqual(f.write(b"blah."), 5)
259        self.assertEqual(f.seek(0), 0)
260        self.assertEqual(f.write(b"Hello."), 6)
261        self.assertEqual(f.tell(), 6)
262        self.assertEqual(f.seek(-1, 1), 5)
263        self.assertEqual(f.tell(), 5)
264        self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
265        self.assertEqual(f.seek(0), 0)
266        self.assertEqual(f.write(b"h"), 1)
267        self.assertEqual(f.seek(-1, 2), 13)
268        self.assertEqual(f.tell(), 13)
269
270        self.assertEqual(f.truncate(12), 12)
271        self.assertEqual(f.tell(), 13)
272        self.assertRaises(TypeError, f.seek, 0.0)
273
274    def read_ops(self, f, buffered=False):
275        data = f.read(5)
276        self.assertEqual(data, b"hello")
277        data = bytearray(data)
278        self.assertEqual(f.readinto(data), 5)
279        self.assertEqual(data, b" worl")
280        self.assertEqual(f.readinto(data), 2)
281        self.assertEqual(len(data), 5)
282        self.assertEqual(data[:2], b"d\n")
283        self.assertEqual(f.seek(0), 0)
284        self.assertEqual(f.read(20), b"hello world\n")
285        self.assertEqual(f.read(1), b"")
286        self.assertEqual(f.readinto(bytearray(b"x")), 0)
287        self.assertEqual(f.seek(-6, 2), 6)
288        self.assertEqual(f.read(5), b"world")
289        self.assertEqual(f.read(0), b"")
290        self.assertEqual(f.readinto(bytearray()), 0)
291        self.assertEqual(f.seek(-6, 1), 5)
292        self.assertEqual(f.read(5), b" worl")
293        self.assertEqual(f.tell(), 10)
294        self.assertRaises(TypeError, f.seek, 0.0)
295        if buffered:
296            f.seek(0)
297            self.assertEqual(f.read(), b"hello world\n")
298            f.seek(6)
299            self.assertEqual(f.read(), b"world\n")
300            self.assertEqual(f.read(), b"")
301
302    LARGE = 2**31
303
304    def large_file_ops(self, f):
305        assert f.readable()
306        assert f.writable()
307        self.assertEqual(f.seek(self.LARGE), self.LARGE)
308        self.assertEqual(f.tell(), self.LARGE)
309        self.assertEqual(f.write(b"xxx"), 3)
310        self.assertEqual(f.tell(), self.LARGE + 3)
311        self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
312        self.assertEqual(f.truncate(), self.LARGE + 2)
313        self.assertEqual(f.tell(), self.LARGE + 2)
314        self.assertEqual(f.seek(0, 2), self.LARGE + 2)
315        self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
316        self.assertEqual(f.tell(), self.LARGE + 2)
317        self.assertEqual(f.seek(0, 2), self.LARGE + 1)
318        self.assertEqual(f.seek(-1, 2), self.LARGE)
319        self.assertEqual(f.read(2), b"x")
320
321    def test_invalid_operations(self):
322        # Try writing on a file opened in read mode and vice-versa.
323        for mode in ("w", "wb"):
324            with self.open(support.TESTFN, mode) as fp:
325                self.assertRaises(IOError, fp.read)
326                self.assertRaises(IOError, fp.readline)
327        with self.open(support.TESTFN, "rb") as fp:
328            self.assertRaises(IOError, fp.write, b"blah")
329            self.assertRaises(IOError, fp.writelines, [b"blah\n"])
330        with self.open(support.TESTFN, "r") as fp:
331            self.assertRaises(IOError, fp.write, "blah")
332            self.assertRaises(IOError, fp.writelines, ["blah\n"])
333
334    def test_raw_file_io(self):
335        with self.open(support.TESTFN, "wb", buffering=0) as f:
336            self.assertEqual(f.readable(), False)
337            self.assertEqual(f.writable(), True)
338            self.assertEqual(f.seekable(), True)
339            self.write_ops(f)
340        with self.open(support.TESTFN, "rb", buffering=0) as f:
341            self.assertEqual(f.readable(), True)
342            self.assertEqual(f.writable(), False)
343            self.assertEqual(f.seekable(), True)
344            self.read_ops(f)
345
346    def test_buffered_file_io(self):
347        with self.open(support.TESTFN, "wb") as f:
348            self.assertEqual(f.readable(), False)
349            self.assertEqual(f.writable(), True)
350            self.assertEqual(f.seekable(), True)
351            self.write_ops(f)
352        with self.open(support.TESTFN, "rb") as f:
353            self.assertEqual(f.readable(), True)
354            self.assertEqual(f.writable(), False)
355            self.assertEqual(f.seekable(), True)
356            self.read_ops(f, True)
357
358    def test_readline(self):
359        with self.open(support.TESTFN, "wb") as f:
360            f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
361        with self.open(support.TESTFN, "rb") as f:
362            self.assertEqual(f.readline(), b"abc\n")
363            self.assertEqual(f.readline(10), b"def\n")
364            self.assertEqual(f.readline(2), b"xy")
365            self.assertEqual(f.readline(4), b"zzy\n")
366            self.assertEqual(f.readline(), b"foo\x00bar\n")
367            self.assertEqual(f.readline(None), b"another line")
368            self.assertRaises(TypeError, f.readline, 5.3)
369        with self.open(support.TESTFN, "r") as f:
370            self.assertRaises(TypeError, f.readline, 5.3)
371
372    def test_raw_bytes_io(self):
373        f = self.BytesIO()
374        self.write_ops(f)
375        data = f.getvalue()
376        self.assertEqual(data, b"hello world\n")
377        f = self.BytesIO(data)
378        self.read_ops(f, True)
379
380    def test_large_file_ops(self):
381        # On Windows and Mac OSX this test comsumes large resources; It takes
382        # a long time to build the >2GB file and takes >2GB of disk space
383        # therefore the resource must be enabled to run this test.
384        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
385            if not support.is_resource_enabled("largefile"):
386                print("\nTesting large file ops skipped on %s." % sys.platform,
387                      file=sys.stderr)
388                print("It requires %d bytes and a long time." % self.LARGE,
389                      file=sys.stderr)
390                print("Use 'regrtest.py -u largefile test_io' to run it.",
391                      file=sys.stderr)
392                return
393        with self.open(support.TESTFN, "w+b", 0) as f:
394            self.large_file_ops(f)
395        with self.open(support.TESTFN, "w+b") as f:
396            self.large_file_ops(f)
397
398    def test_with_open(self):
399        for bufsize in (0, 1, 100):
400            f = None
401            with self.open(support.TESTFN, "wb", bufsize) as f:
402                f.write(b"xxx")
403            self.assertEqual(f.closed, True)
404            f = None
405            try:
406                with self.open(support.TESTFN, "wb", bufsize) as f:
407                    1 // 0
408            except ZeroDivisionError:
409                self.assertEqual(f.closed, True)
410            else:
411                self.fail("1 // 0 didn't raise an exception")
412
413    # issue 5008
414    def test_append_mode_tell(self):
415        with self.open(support.TESTFN, "wb") as f:
416            f.write(b"xxx")
417        with self.open(support.TESTFN, "ab", buffering=0) as f:
418            self.assertEqual(f.tell(), 3)
419        with self.open(support.TESTFN, "ab") as f:
420            self.assertEqual(f.tell(), 3)
421        with self.open(support.TESTFN, "a") as f:
422            self.assertTrue(f.tell() > 0)
423
424    def test_destructor(self):
425        record = []
426        class MyFileIO(self.FileIO):
427            def __del__(self):
428                record.append(1)
429                try:
430                    f = super(MyFileIO, self).__del__
431                except AttributeError:
432                    pass
433                else:
434                    f()
435            def close(self):
436                record.append(2)
437                super(MyFileIO, self).close()
438            def flush(self):
439                record.append(3)
440                super(MyFileIO, self).flush()
441        f = MyFileIO(support.TESTFN, "wb")
442        f.write(b"xxx")
443        del f
444        support.gc_collect()
445        self.assertEqual(record, [1, 2, 3])
446        with self.open(support.TESTFN, "rb") as f:
447            self.assertEqual(f.read(), b"xxx")
448
449    def _check_base_destructor(self, base):
450        record = []
451        class MyIO(base):
452            def __init__(self):
453                # This exercises the availability of attributes on object
454                # destruction.
455                # (in the C version, close() is called by the tp_dealloc
456                # function, not by __del__)
457                self.on_del = 1
458                self.on_close = 2
459                self.on_flush = 3
460            def __del__(self):
461                record.append(self.on_del)
462                try:
463                    f = super(MyIO, self).__del__
464                except AttributeError:
465                    pass
466                else:
467                    f()
468            def close(self):
469                record.append(self.on_close)
470                super(MyIO, self).close()
471            def flush(self):
472                record.append(self.on_flush)
473                super(MyIO, self).flush()
474        f = MyIO()
475        del f
476        support.gc_collect()
477        self.assertEqual(record, [1, 2, 3])
478
479    def test_IOBase_destructor(self):
480        self._check_base_destructor(self.IOBase)
481
482    def test_RawIOBase_destructor(self):
483        self._check_base_destructor(self.RawIOBase)
484
485    def test_BufferedIOBase_destructor(self):
486        self._check_base_destructor(self.BufferedIOBase)
487
488    def test_TextIOBase_destructor(self):
489        self._check_base_destructor(self.TextIOBase)
490
491    def test_close_flushes(self):
492        with self.open(support.TESTFN, "wb") as f:
493            f.write(b"xxx")
494        with self.open(support.TESTFN, "rb") as f:
495            self.assertEqual(f.read(), b"xxx")
496
497    def test_array_writes(self):
498        a = array.array(b'i', range(10))
499        n = len(a.tostring())
500        with self.open(support.TESTFN, "wb", 0) as f:
501            self.assertEqual(f.write(a), n)
502        with self.open(support.TESTFN, "wb") as f:
503            self.assertEqual(f.write(a), n)
504
505    def test_closefd(self):
506        self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
507                          closefd=False)
508
509    def test_read_closed(self):
510        with self.open(support.TESTFN, "w") as f:
511            f.write("egg\n")
512        with self.open(support.TESTFN, "r") as f:
513            file = self.open(f.fileno(), "r", closefd=False)
514            self.assertEqual(file.read(), "egg\n")
515            file.seek(0)
516            file.close()
517            self.assertRaises(ValueError, file.read)
518
519    def test_no_closefd_with_filename(self):
520        # can't use closefd in combination with a file name
521        self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
522
523    def test_closefd_attr(self):
524        with self.open(support.TESTFN, "wb") as f:
525            f.write(b"egg\n")
526        with self.open(support.TESTFN, "r") as f:
527            self.assertEqual(f.buffer.raw.closefd, True)
528            file = self.open(f.fileno(), "r", closefd=False)
529            self.assertEqual(file.buffer.raw.closefd, False)
530
531    def test_garbage_collection(self):
532        # FileIO objects are collected, and collecting them flushes
533        # all data to disk.
534        f = self.FileIO(support.TESTFN, "wb")
535        f.write(b"abcxxx")
536        f.f = f
537        wr = weakref.ref(f)
538        del f
539        support.gc_collect()
540        self.assertTrue(wr() is None, wr)
541        with self.open(support.TESTFN, "rb") as f:
542            self.assertEqual(f.read(), b"abcxxx")
543
544    def test_unbounded_file(self):
545        # Issue #1174606: reading from an unbounded stream such as /dev/zero.
546        zero = "/dev/zero"
547        if not os.path.exists(zero):
548            self.skipTest("{0} does not exist".format(zero))
549        if sys.maxsize > 0x7FFFFFFF:
550            self.skipTest("test can only run in a 32-bit address space")
551        if support.real_max_memuse < support._2G:
552            self.skipTest("test requires at least 2GB of memory")
553        with self.open(zero, "rb", buffering=0) as f:
554            self.assertRaises(OverflowError, f.read)
555        with self.open(zero, "rb") as f:
556            self.assertRaises(OverflowError, f.read)
557        with self.open(zero, "r") as f:
558            self.assertRaises(OverflowError, f.read)
559
560    def test_flush_error_on_close(self):
561        f = self.open(support.TESTFN, "wb", buffering=0)
562        def bad_flush():
563            raise IOError()
564        f.flush = bad_flush
565        self.assertRaises(IOError, f.close) # exception not swallowed
566
567    def test_multi_close(self):
568        f = self.open(support.TESTFN, "wb", buffering=0)
569        f.close()
570        f.close()
571        f.close()
572        self.assertRaises(ValueError, f.flush)
573
574    def test_RawIOBase_read(self):
575        # Exercise the default RawIOBase.read() implementation (which calls
576        # readinto() internally).
577        rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
578        self.assertEqual(rawio.read(2), b"ab")
579        self.assertEqual(rawio.read(2), b"c")
580        self.assertEqual(rawio.read(2), b"d")
581        self.assertEqual(rawio.read(2), None)
582        self.assertEqual(rawio.read(2), b"ef")
583        self.assertEqual(rawio.read(2), b"g")
584        self.assertEqual(rawio.read(2), None)
585        self.assertEqual(rawio.read(2), b"")
586
587class CIOTest(IOTest):
588    pass
589
590class PyIOTest(IOTest):
591    test_array_writes = unittest.skip(
592        "len(array.array) returns number of elements rather than bytelength"
593    )(IOTest.test_array_writes)
594
595
596class CommonBufferedTests:
597    # Tests common to BufferedReader, BufferedWriter and BufferedRandom
598
599    def test_detach(self):
600        raw = self.MockRawIO()
601        buf = self.tp(raw)
602        self.assertIs(buf.detach(), raw)
603        self.assertRaises(ValueError, buf.detach)
604
605    def test_fileno(self):
606        rawio = self.MockRawIO()
607        bufio = self.tp(rawio)
608
609        self.assertEqual(42, bufio.fileno())
610
611    def test_no_fileno(self):
612        # XXX will we always have fileno() function? If so, kill
613        # this test. Else, write it.
614        pass
615
616    def test_invalid_args(self):
617        rawio = self.MockRawIO()
618        bufio = self.tp(rawio)
619        # Invalid whence
620        self.assertRaises(ValueError, bufio.seek, 0, -1)
621        self.assertRaises(ValueError, bufio.seek, 0, 3)
622
623    def test_override_destructor(self):
624        tp = self.tp
625        record = []
626        class MyBufferedIO(tp):
627            def __del__(self):
628                record.append(1)
629                try:
630                    f = super(MyBufferedIO, self).__del__
631                except AttributeError:
632                    pass
633                else:
634                    f()
635            def close(self):
636                record.append(2)
637                super(MyBufferedIO, self).close()
638            def flush(self):
639                record.append(3)
640                super(MyBufferedIO, self).flush()
641        rawio = self.MockRawIO()
642        bufio = MyBufferedIO(rawio)
643        writable = bufio.writable()
644        del bufio
645        support.gc_collect()
646        if writable:
647            self.assertEqual(record, [1, 2, 3])
648        else:
649            self.assertEqual(record, [1, 2])
650
651    def test_context_manager(self):
652        # Test usability as a context manager
653        rawio = self.MockRawIO()
654        bufio = self.tp(rawio)
655        def _with():
656            with bufio:
657                pass
658        _with()
659        # bufio should now be closed, and using it a second time should raise
660        # a ValueError.
661        self.assertRaises(ValueError, _with)
662
663    def test_error_through_destructor(self):
664        # Test that the exception state is not modified by a destructor,
665        # even if close() fails.
666        rawio = self.CloseFailureIO()
667        def f():
668            self.tp(rawio).xyzzy
669        with support.captured_output("stderr") as s:
670            self.assertRaises(AttributeError, f)
671        s = s.getvalue().strip()
672        if s:
673            # The destructor *may* have printed an unraisable error, check it
674            self.assertEqual(len(s.splitlines()), 1)
675            self.assertTrue(s.startswith("Exception IOError: "), s)
676            self.assertTrue(s.endswith(" ignored"), s)
677
678    def test_repr(self):
679        raw = self.MockRawIO()
680        b = self.tp(raw)
681        clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
682        self.assertEqual(repr(b), "<%s>" % clsname)
683        raw.name = "dummy"
684        self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
685        raw.name = b"dummy"
686        self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
687
688    def test_flush_error_on_close(self):
689        raw = self.MockRawIO()
690        def bad_flush():
691            raise IOError()
692        raw.flush = bad_flush
693        b = self.tp(raw)
694        self.assertRaises(IOError, b.close) # exception not swallowed
695
696    def test_multi_close(self):
697        raw = self.MockRawIO()
698        b = self.tp(raw)
699        b.close()
700        b.close()
701        b.close()
702        self.assertRaises(ValueError, b.flush)
703
704    def test_readonly_attributes(self):
705        raw = self.MockRawIO()
706        buf = self.tp(raw)
707        x = self.MockRawIO()
708        with self.assertRaises((AttributeError, TypeError)):
709            buf.raw = x
710
711
712class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
713    read_mode = "rb"
714
715    def test_constructor(self):
716        rawio = self.MockRawIO([b"abc"])
717        bufio = self.tp(rawio)
718        bufio.__init__(rawio)
719        bufio.__init__(rawio, buffer_size=1024)
720        bufio.__init__(rawio, buffer_size=16)
721        self.assertEqual(b"abc", bufio.read())
722        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
723        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
724        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
725        rawio = self.MockRawIO([b"abc"])
726        bufio.__init__(rawio)
727        self.assertEqual(b"abc", bufio.read())
728
729    def test_read(self):
730        for arg in (None, 7):
731            rawio = self.MockRawIO((b"abc", b"d", b"efg"))
732            bufio = self.tp(rawio)
733            self.assertEqual(b"abcdefg", bufio.read(arg))
734        # Invalid args
735        self.assertRaises(ValueError, bufio.read, -2)
736
737    def test_read1(self):
738        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
739        bufio = self.tp(rawio)
740        self.assertEqual(b"a", bufio.read(1))
741        self.assertEqual(b"b", bufio.read1(1))
742        self.assertEqual(rawio._reads, 1)
743        self.assertEqual(b"c", bufio.read1(100))
744        self.assertEqual(rawio._reads, 1)
745        self.assertEqual(b"d", bufio.read1(100))
746        self.assertEqual(rawio._reads, 2)
747        self.assertEqual(b"efg", bufio.read1(100))
748        self.assertEqual(rawio._reads, 3)
749        self.assertEqual(b"", bufio.read1(100))
750        self.assertEqual(rawio._reads, 4)
751        # Invalid args
752        self.assertRaises(ValueError, bufio.read1, -1)
753
754    def test_readinto(self):
755        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
756        bufio = self.tp(rawio)
757        b = bytearray(2)
758        self.assertEqual(bufio.readinto(b), 2)
759        self.assertEqual(b, b"ab")
760        self.assertEqual(bufio.readinto(b), 2)
761        self.assertEqual(b, b"cd")
762        self.assertEqual(bufio.readinto(b), 2)
763        self.assertEqual(b, b"ef")
764        self.assertEqual(bufio.readinto(b), 1)
765        self.assertEqual(b, b"gf")
766        self.assertEqual(bufio.readinto(b), 0)
767        self.assertEqual(b, b"gf")
768
769    def test_readlines(self):
770        def bufio():
771            rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
772            return self.tp(rawio)
773        self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
774        self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
775        self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
776
777    def test_buffering(self):
778        data = b"abcdefghi"
779        dlen = len(data)
780
781        tests = [
782            [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
783            [ 100, [ 3, 3, 3],     [ dlen ]    ],
784            [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
785        ]
786
787        for bufsize, buf_read_sizes, raw_read_sizes in tests:
788            rawio = self.MockFileIO(data)
789            bufio = self.tp(rawio, buffer_size=bufsize)
790            pos = 0
791            for nbytes in buf_read_sizes:
792                self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
793                pos += nbytes
794            # this is mildly implementation-dependent
795            self.assertEqual(rawio.read_history, raw_read_sizes)
796
797    def test_read_non_blocking(self):
798        # Inject some None's in there to simulate EWOULDBLOCK
799        rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
800        bufio = self.tp(rawio)
801        self.assertEqual(b"abcd", bufio.read(6))
802        self.assertEqual(b"e", bufio.read(1))
803        self.assertEqual(b"fg", bufio.read())
804        self.assertEqual(b"", bufio.peek(1))
805        self.assertIsNone(bufio.read())
806        self.assertEqual(b"", bufio.read())
807
808        rawio = self.MockRawIO((b"a", None, None))
809        self.assertEqual(b"a", rawio.readall())
810        self.assertIsNone(rawio.readall())
811
812    def test_read_past_eof(self):
813        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
814        bufio = self.tp(rawio)
815
816        self.assertEqual(b"abcdefg", bufio.read(9000))
817
818    def test_read_all(self):
819        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
820        bufio = self.tp(rawio)
821
822        self.assertEqual(b"abcdefg", bufio.read())
823
824    @unittest.skipUnless(threading, 'Threading required for this test.')
825    @support.requires_resource('cpu')
826    def test_threads(self):
827        try:
828            # Write out many bytes with exactly the same number of 0's,
829            # 1's... 255's. This will help us check that concurrent reading
830            # doesn't duplicate or forget contents.
831            N = 1000
832            l = list(range(256)) * N
833            random.shuffle(l)
834            s = bytes(bytearray(l))
835            with self.open(support.TESTFN, "wb") as f:
836                f.write(s)
837            with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
838                bufio = self.tp(raw, 8)
839                errors = []
840                results = []
841                def f():
842                    try:
843                        # Intra-buffer read then buffer-flushing read
844                        for n in cycle([1, 19]):
845                            s = bufio.read(n)
846                            if not s:
847                                break
848                            # list.append() is atomic
849                            results.append(s)
850                    except Exception as e:
851                        errors.append(e)
852                        raise
853                threads = [threading.Thread(target=f) for x in range(20)]
854                for t in threads:
855                    t.start()
856                time.sleep(0.02) # yield
857                for t in threads:
858                    t.join()
859                self.assertFalse(errors,
860                    "the following exceptions were caught: %r" % errors)
861                s = b''.join(results)
862                for i in range(256):
863                    c = bytes(bytearray([i]))
864                    self.assertEqual(s.count(c), N)
865        finally:
866            support.unlink(support.TESTFN)
867
868    def test_misbehaved_io(self):
869        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
870        bufio = self.tp(rawio)
871        self.assertRaises(IOError, bufio.seek, 0)
872        self.assertRaises(IOError, bufio.tell)
873
874    def test_no_extraneous_read(self):
875        # Issue #9550; when the raw IO object has satisfied the read request,
876        # we should not issue any additional reads, otherwise it may block
877        # (e.g. socket).
878        bufsize = 16
879        for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
880            rawio = self.MockRawIO([b"x" * n])
881            bufio = self.tp(rawio, bufsize)
882            self.assertEqual(bufio.read(n), b"x" * n)
883            # Simple case: one raw read is enough to satisfy the request.
884            self.assertEqual(rawio._extraneous_reads, 0,
885                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
886            # A more complex case where two raw reads are needed to satisfy
887            # the request.
888            rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
889            bufio = self.tp(rawio, bufsize)
890            self.assertEqual(bufio.read(n), b"x" * n)
891            self.assertEqual(rawio._extraneous_reads, 0,
892                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
893
894
895class CBufferedReaderTest(BufferedReaderTest):
896    tp = io.BufferedReader
897
898    def test_constructor(self):
899        BufferedReaderTest.test_constructor(self)
900        # The allocation can succeed on 32-bit builds, e.g. with more
901        # than 2GB RAM and a 64-bit kernel.
902        if sys.maxsize > 0x7FFFFFFF:
903            rawio = self.MockRawIO()
904            bufio = self.tp(rawio)
905            self.assertRaises((OverflowError, MemoryError, ValueError),
906                bufio.__init__, rawio, sys.maxsize)
907
908    def test_initialization(self):
909        rawio = self.MockRawIO([b"abc"])
910        bufio = self.tp(rawio)
911        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
912        self.assertRaises(ValueError, bufio.read)
913        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
914        self.assertRaises(ValueError, bufio.read)
915        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
916        self.assertRaises(ValueError, bufio.read)
917
918    def test_misbehaved_io_read(self):
919        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
920        bufio = self.tp(rawio)
921        # _pyio.BufferedReader seems to implement reading different, so that
922        # checking this is not so easy.
923        self.assertRaises(IOError, bufio.read, 10)
924
925    def test_garbage_collection(self):
926        # C BufferedReader objects are collected.
927        # The Python version has __del__, so it ends into gc.garbage instead
928        rawio = self.FileIO(support.TESTFN, "w+b")
929        f = self.tp(rawio)
930        f.f = f
931        wr = weakref.ref(f)
932        del f
933        support.gc_collect()
934        self.assertTrue(wr() is None, wr)
935
936class PyBufferedReaderTest(BufferedReaderTest):
937    tp = pyio.BufferedReader
938
939
940class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
941    write_mode = "wb"
942
943    def test_constructor(self):
944        rawio = self.MockRawIO()
945        bufio = self.tp(rawio)
946        bufio.__init__(rawio)
947        bufio.__init__(rawio, buffer_size=1024)
948        bufio.__init__(rawio, buffer_size=16)
949        self.assertEqual(3, bufio.write(b"abc"))
950        bufio.flush()
951        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
952        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
953        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
954        bufio.__init__(rawio)
955        self.assertEqual(3, bufio.write(b"ghi"))
956        bufio.flush()
957        self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
958
959    def test_detach_flush(self):
960        raw = self.MockRawIO()
961        buf = self.tp(raw)
962        buf.write(b"howdy!")
963        self.assertFalse(raw._write_stack)
964        buf.detach()
965        self.assertEqual(raw._write_stack, [b"howdy!"])
966
967    def test_write(self):
968        # Write to the buffered IO but don't overflow the buffer.
969        writer = self.MockRawIO()
970        bufio = self.tp(writer, 8)
971        bufio.write(b"abc")
972        self.assertFalse(writer._write_stack)
973
974    def test_write_overflow(self):
975        writer = self.MockRawIO()
976        bufio = self.tp(writer, 8)
977        contents = b"abcdefghijklmnop"
978        for n in range(0, len(contents), 3):
979            bufio.write(contents[n:n+3])
980        flushed = b"".join(writer._write_stack)
981        # At least (total - 8) bytes were implicitly flushed, perhaps more
982        # depending on the implementation.
983        self.assertTrue(flushed.startswith(contents[:-8]), flushed)
984
985    def check_writes(self, intermediate_func):
986        # Lots of writes, test the flushed output is as expected.
987        contents = bytes(range(256)) * 1000
988        n = 0
989        writer = self.MockRawIO()
990        bufio = self.tp(writer, 13)
991        # Generator of write sizes: repeat each N 15 times then proceed to N+1
992        def gen_sizes():
993            for size in count(1):
994                for i in range(15):
995                    yield size
996        sizes = gen_sizes()
997        while n < len(contents):
998            size = min(next(sizes), len(contents) - n)
999            self.assertEqual(bufio.write(contents[n:n+size]), size)
1000            intermediate_func(bufio)
1001            n += size
1002        bufio.flush()
1003        self.assertEqual(contents,
1004            b"".join(writer._write_stack))
1005
1006    def test_writes(self):
1007        self.check_writes(lambda bufio: None)
1008
1009    def test_writes_and_flushes(self):
1010        self.check_writes(lambda bufio: bufio.flush())
1011
1012    def test_writes_and_seeks(self):
1013        def _seekabs(bufio):
1014            pos = bufio.tell()
1015            bufio.seek(pos + 1, 0)
1016            bufio.seek(pos - 1, 0)
1017            bufio.seek(pos, 0)
1018        self.check_writes(_seekabs)
1019        def _seekrel(bufio):
1020            pos = bufio.seek(0, 1)
1021            bufio.seek(+1, 1)
1022            bufio.seek(-1, 1)
1023            bufio.seek(pos, 0)
1024        self.check_writes(_seekrel)
1025
1026    def test_writes_and_truncates(self):
1027        self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
1028
1029    def test_write_non_blocking(self):
1030        raw = self.MockNonBlockWriterIO()
1031        bufio = self.tp(raw, 8)
1032
1033        self.assertEqual(bufio.write(b"abcd"), 4)
1034        self.assertEqual(bufio.write(b"efghi"), 5)
1035        # 1 byte will be written, the rest will be buffered
1036        raw.block_on(b"k")
1037        self.assertEqual(bufio.write(b"jklmn"), 5)
1038
1039        # 8 bytes will be written, 8 will be buffered and the rest will be lost
1040        raw.block_on(b"0")
1041        try:
1042            bufio.write(b"opqrwxyz0123456789")
1043        except self.BlockingIOError as e:
1044            written = e.characters_written
1045        else:
1046            self.fail("BlockingIOError should have been raised")
1047        self.assertEqual(written, 16)
1048        self.assertEqual(raw.pop_written(),
1049            b"abcdefghijklmnopqrwxyz")
1050
1051        self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
1052        s = raw.pop_written()
1053        # Previously buffered bytes were flushed
1054        self.assertTrue(s.startswith(b"01234567A"), s)
1055
1056    def test_write_and_rewind(self):
1057        raw = io.BytesIO()
1058        bufio = self.tp(raw, 4)
1059        self.assertEqual(bufio.write(b"abcdef"), 6)
1060        self.assertEqual(bufio.tell(), 6)
1061        bufio.seek(0, 0)
1062        self.assertEqual(bufio.write(b"XY"), 2)
1063        bufio.seek(6, 0)
1064        self.assertEqual(raw.getvalue(), b"XYcdef")
1065        self.assertEqual(bufio.write(b"123456"), 6)
1066        bufio.flush()
1067        self.assertEqual(raw.getvalue(), b"XYcdef123456")
1068
1069    def test_flush(self):
1070        writer = self.MockRawIO()
1071        bufio = self.tp(writer, 8)
1072        bufio.write(b"abc")
1073        bufio.flush()
1074        self.assertEqual(b"abc", writer._write_stack[0])
1075
1076    def test_destructor(self):
1077        writer = self.MockRawIO()
1078        bufio = self.tp(writer, 8)
1079        bufio.write(b"abc")
1080        del bufio
1081        support.gc_collect()
1082        self.assertEqual(b"abc", writer._write_stack[0])
1083
1084    def test_truncate(self):
1085        # Truncate implicitly flushes the buffer.
1086        with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1087            bufio = self.tp(raw, 8)
1088            bufio.write(b"abcdef")
1089            self.assertEqual(bufio.truncate(3), 3)
1090            self.assertEqual(bufio.tell(), 6)
1091        with self.open(support.TESTFN, "rb", buffering=0) as f:
1092            self.assertEqual(f.read(), b"abc")
1093
1094    @unittest.skipUnless(threading, 'Threading required for this test.')
1095    @support.requires_resource('cpu')
1096    def test_threads(self):
1097        try:
1098            # Write out many bytes from many threads and test they were
1099            # all flushed.
1100            N = 1000
1101            contents = bytes(range(256)) * N
1102            sizes = cycle([1, 19])
1103            n = 0
1104            queue = deque()
1105            while n < len(contents):
1106                size = next(sizes)
1107                queue.append(contents[n:n+size])
1108                n += size
1109            del contents
1110            # We use a real file object because it allows us to
1111            # exercise situations where the GIL is released before
1112            # writing the buffer to the raw streams. This is in addition
1113            # to concurrency issues due to switching threads in the middle
1114            # of Python code.
1115            with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1116                bufio = self.tp(raw, 8)
1117                errors = []
1118                def f():
1119                    try:
1120                        while True:
1121                            try:
1122                                s = queue.popleft()
1123                            except IndexError:
1124                                return
1125                            bufio.write(s)
1126                    except Exception as e:
1127                        errors.append(e)
1128                        raise
1129                threads = [threading.Thread(target=f) for x in range(20)]
1130                for t in threads:
1131                    t.start()
1132                time.sleep(0.02) # yield
1133                for t in threads:
1134                    t.join()
1135                self.assertFalse(errors,
1136                    "the following exceptions were caught: %r" % errors)
1137                bufio.close()
1138            with self.open(support.TESTFN, "rb") as f:
1139                s = f.read()
1140            for i in range(256):
1141                self.assertEqual(s.count(bytes([i])), N)
1142        finally:
1143            support.unlink(support.TESTFN)
1144
1145    def test_misbehaved_io(self):
1146        rawio = self.MisbehavedRawIO()
1147        bufio = self.tp(rawio, 5)
1148        self.assertRaises(IOError, bufio.seek, 0)
1149        self.assertRaises(IOError, bufio.tell)
1150        self.assertRaises(IOError, bufio.write, b"abcdef")
1151
1152    def test_max_buffer_size_deprecation(self):
1153        with support.check_warnings(("max_buffer_size is deprecated",
1154                                     DeprecationWarning)):
1155            self.tp(self.MockRawIO(), 8, 12)
1156
1157
1158class CBufferedWriterTest(BufferedWriterTest):
1159    tp = io.BufferedWriter
1160
1161    def test_constructor(self):
1162        BufferedWriterTest.test_constructor(self)
1163        # The allocation can succeed on 32-bit builds, e.g. with more
1164        # than 2GB RAM and a 64-bit kernel.
1165        if sys.maxsize > 0x7FFFFFFF:
1166            rawio = self.MockRawIO()
1167            bufio = self.tp(rawio)
1168            self.assertRaises((OverflowError, MemoryError, ValueError),
1169                bufio.__init__, rawio, sys.maxsize)
1170
1171    def test_initialization(self):
1172        rawio = self.MockRawIO()
1173        bufio = self.tp(rawio)
1174        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1175        self.assertRaises(ValueError, bufio.write, b"def")
1176        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1177        self.assertRaises(ValueError, bufio.write, b"def")
1178        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1179        self.assertRaises(ValueError, bufio.write, b"def")
1180
1181    def test_garbage_collection(self):
1182        # C BufferedWriter objects are collected, and collecting them flushes
1183        # all data to disk.
1184        # The Python version has __del__, so it ends into gc.garbage instead
1185        rawio = self.FileIO(support.TESTFN, "w+b")
1186        f = self.tp(rawio)
1187        f.write(b"123xxx")
1188        f.x = f
1189        wr = weakref.ref(f)
1190        del f
1191        support.gc_collect()
1192        self.assertTrue(wr() is None, wr)
1193        with self.open(support.TESTFN, "rb") as f:
1194            self.assertEqual(f.read(), b"123xxx")
1195
1196
1197class PyBufferedWriterTest(BufferedWriterTest):
1198    tp = pyio.BufferedWriter
1199
1200class BufferedRWPairTest(unittest.TestCase):
1201
1202    def test_constructor(self):
1203        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1204        self.assertFalse(pair.closed)
1205
1206    def test_detach(self):
1207        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1208        self.assertRaises(self.UnsupportedOperation, pair.detach)
1209
1210    def test_constructor_max_buffer_size_deprecation(self):
1211        with support.check_warnings(("max_buffer_size is deprecated",
1212                                     DeprecationWarning)):
1213            self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1214
1215    def test_constructor_with_not_readable(self):
1216        class NotReadable(MockRawIO):
1217            def readable(self):
1218                return False
1219
1220        self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1221
1222    def test_constructor_with_not_writeable(self):
1223        class NotWriteable(MockRawIO):
1224            def writable(self):
1225                return False
1226
1227        self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1228
1229    def test_read(self):
1230        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1231
1232        self.assertEqual(pair.read(3), b"abc")
1233        self.assertEqual(pair.read(1), b"d")
1234        self.assertEqual(pair.read(), b"ef")
1235        pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1236        self.assertEqual(pair.read(None), b"abc")
1237
1238    def test_readlines(self):
1239        pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1240        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1241        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1242        self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
1243
1244    def test_read1(self):
1245        # .read1() is delegated to the underlying reader object, so this test
1246        # can be shallow.
1247        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1248
1249        self.assertEqual(pair.read1(3), b"abc")
1250
1251    def test_readinto(self):
1252        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1253
1254        data = bytearray(5)
1255        self.assertEqual(pair.readinto(data), 5)
1256        self.assertEqual(data, b"abcde")
1257
1258    def test_write(self):
1259        w = self.MockRawIO()
1260        pair = self.tp(self.MockRawIO(), w)
1261
1262        pair.write(b"abc")
1263        pair.flush()
1264        pair.write(b"def")
1265        pair.flush()
1266        self.assertEqual(w._write_stack, [b"abc", b"def"])
1267
1268    def test_peek(self):
1269        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1270
1271        self.assertTrue(pair.peek(3).startswith(b"abc"))
1272        self.assertEqual(pair.read(3), b"abc")
1273
1274    def test_readable(self):
1275        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1276        self.assertTrue(pair.readable())
1277
1278    def test_writeable(self):
1279        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1280        self.assertTrue(pair.writable())
1281
1282    def test_seekable(self):
1283        # BufferedRWPairs are never seekable, even if their readers and writers
1284        # are.
1285        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1286        self.assertFalse(pair.seekable())
1287
1288    # .flush() is delegated to the underlying writer object and has been
1289    # tested in the test_write method.
1290
1291    def test_close_and_closed(self):
1292        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1293        self.assertFalse(pair.closed)
1294        pair.close()
1295        self.assertTrue(pair.closed)
1296
1297    def test_isatty(self):
1298        class SelectableIsAtty(MockRawIO):
1299            def __init__(self, isatty):
1300                MockRawIO.__init__(self)
1301                self._isatty = isatty
1302
1303            def isatty(self):
1304                return self._isatty
1305
1306        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1307        self.assertFalse(pair.isatty())
1308
1309        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1310        self.assertTrue(pair.isatty())
1311
1312        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1313        self.assertTrue(pair.isatty())
1314
1315        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1316        self.assertTrue(pair.isatty())
1317
1318class CBufferedRWPairTest(BufferedRWPairTest):
1319    tp = io.BufferedRWPair
1320
1321class PyBufferedRWPairTest(BufferedRWPairTest):
1322    tp = pyio.BufferedRWPair
1323
1324
1325class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1326    read_mode = "rb+"
1327    write_mode = "wb+"
1328
1329    def test_constructor(self):
1330        BufferedReaderTest.test_constructor(self)
1331        BufferedWriterTest.test_constructor(self)
1332
1333    def test_read_and_write(self):
1334        raw = self.MockRawIO((b"asdf", b"ghjk"))
1335        rw = self.tp(raw, 8)
1336
1337        self.assertEqual(b"as", rw.read(2))
1338        rw.write(b"ddd")
1339        rw.write(b"eee")
1340        self.assertFalse(raw._write_stack) # Buffer writes
1341        self.assertEqual(b"ghjk", rw.read())
1342        self.assertEqual(b"dddeee", raw._write_stack[0])
1343
1344    def test_seek_and_tell(self):
1345        raw = self.BytesIO(b"asdfghjkl")
1346        rw = self.tp(raw)
1347
1348        self.assertEqual(b"as", rw.read(2))
1349        self.assertEqual(2, rw.tell())
1350        rw.seek(0, 0)
1351        self.assertEqual(b"asdf", rw.read(4))
1352
1353        rw.write(b"asdf")
1354        rw.seek(0, 0)
1355        self.assertEqual(b"asdfasdfl", rw.read())
1356        self.assertEqual(9, rw.tell())
1357        rw.seek(-4, 2)
1358        self.assertEqual(5, rw.tell())
1359        rw.seek(2, 1)
1360        self.assertEqual(7, rw.tell())
1361        self.assertEqual(b"fl", rw.read(11))
1362        self.assertRaises(TypeError, rw.seek, 0.0)
1363
1364    def check_flush_and_read(self, read_func):
1365        raw = self.BytesIO(b"abcdefghi")
1366        bufio = self.tp(raw)
1367
1368        self.assertEqual(b"ab", read_func(bufio, 2))
1369        bufio.write(b"12")
1370        self.assertEqual(b"ef", read_func(bufio, 2))
1371        self.assertEqual(6, bufio.tell())
1372        bufio.flush()
1373        self.assertEqual(6, bufio.tell())
1374        self.assertEqual(b"ghi", read_func(bufio))
1375        raw.seek(0, 0)
1376        raw.write(b"XYZ")
1377        # flush() resets the read buffer
1378        bufio.flush()
1379        bufio.seek(0, 0)
1380        self.assertEqual(b"XYZ", read_func(bufio, 3))
1381
1382    def test_flush_and_read(self):
1383        self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1384
1385    def test_flush_and_readinto(self):
1386        def _readinto(bufio, n=-1):
1387            b = bytearray(n if n >= 0 else 9999)
1388            n = bufio.readinto(b)
1389            return bytes(b[:n])
1390        self.check_flush_and_read(_readinto)
1391
1392    def test_flush_and_peek(self):
1393        def _peek(bufio, n=-1):
1394            # This relies on the fact that the buffer can contain the whole
1395            # raw stream, otherwise peek() can return less.
1396            b = bufio.peek(n)
1397            if n != -1:
1398                b = b[:n]
1399            bufio.seek(len(b), 1)
1400            return b
1401        self.check_flush_and_read(_peek)
1402
1403    def test_flush_and_write(self):
1404        raw = self.BytesIO(b"abcdefghi")
1405        bufio = self.tp(raw)
1406
1407        bufio.write(b"123")
1408        bufio.flush()
1409        bufio.write(b"45")
1410        bufio.flush()
1411        bufio.seek(0, 0)
1412        self.assertEqual(b"12345fghi", raw.getvalue())
1413        self.assertEqual(b"12345fghi", bufio.read())
1414
1415    def test_threads(self):
1416        BufferedReaderTest.test_threads(self)
1417        BufferedWriterTest.test_threads(self)
1418
1419    def test_writes_and_peek(self):
1420        def _peek(bufio):
1421            bufio.peek(1)
1422        self.check_writes(_peek)
1423        def _peek(bufio):
1424            pos = bufio.tell()
1425            bufio.seek(-1, 1)
1426            bufio.peek(1)
1427            bufio.seek(pos, 0)
1428        self.check_writes(_peek)
1429
1430    def test_writes_and_reads(self):
1431        def _read(bufio):
1432            bufio.seek(-1, 1)
1433            bufio.read(1)
1434        self.check_writes(_read)
1435
1436    def test_writes_and_read1s(self):
1437        def _read1(bufio):
1438            bufio.seek(-1, 1)
1439            bufio.read1(1)
1440        self.check_writes(_read1)
1441
1442    def test_writes_and_readintos(self):
1443        def _read(bufio):
1444            bufio.seek(-1, 1)
1445            bufio.readinto(bytearray(1))
1446        self.check_writes(_read)
1447
1448    def test_write_after_readahead(self):
1449        # Issue #6629: writing after the buffer was filled by readahead should
1450        # first rewind the raw stream.
1451        for overwrite_size in [1, 5]:
1452            raw = self.BytesIO(b"A" * 10)
1453            bufio = self.tp(raw, 4)
1454            # Trigger readahead
1455            self.assertEqual(bufio.read(1), b"A")
1456            self.assertEqual(bufio.tell(), 1)
1457            # Overwriting should rewind the raw stream if it needs so
1458            bufio.write(b"B" * overwrite_size)
1459            self.assertEqual(bufio.tell(), overwrite_size + 1)
1460            # If the write size was smaller than the buffer size, flush() and
1461            # check that rewind happens.
1462            bufio.flush()
1463            self.assertEqual(bufio.tell(), overwrite_size + 1)
1464            s = raw.getvalue()
1465            self.assertEqual(s,
1466                b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1467
1468    def test_write_rewind_write(self):
1469        # Various combinations of reading / writing / seeking backwards / writing again
1470        def mutate(bufio, pos1, pos2):
1471            assert pos2 >= pos1
1472            # Fill the buffer
1473            bufio.seek(pos1)
1474            bufio.read(pos2 - pos1)
1475            bufio.write(b'\x02')
1476            # This writes earlier than the previous write, but still inside
1477            # the buffer.
1478            bufio.seek(pos1)
1479            bufio.write(b'\x01')
1480
1481        b = b"\x80\x81\x82\x83\x84"
1482        for i in range(0, len(b)):
1483            for j in range(i, len(b)):
1484                raw = self.BytesIO(b)
1485                bufio = self.tp(raw, 100)
1486                mutate(bufio, i, j)
1487                bufio.flush()
1488                expected = bytearray(b)
1489                expected[j] = 2
1490                expected[i] = 1
1491                self.assertEqual(raw.getvalue(), expected,
1492                                 "failed result for i=%d, j=%d" % (i, j))
1493
1494    def test_truncate_after_read_or_write(self):
1495        raw = self.BytesIO(b"A" * 10)
1496        bufio = self.tp(raw, 100)
1497        self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1498        self.assertEqual(bufio.truncate(), 2)
1499        self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1500        self.assertEqual(bufio.truncate(), 4)
1501
1502    def test_misbehaved_io(self):
1503        BufferedReaderTest.test_misbehaved_io(self)
1504        BufferedWriterTest.test_misbehaved_io(self)
1505
1506class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1507    tp = io.BufferedRandom
1508
1509    def test_constructor(self):
1510        BufferedRandomTest.test_constructor(self)
1511        # The allocation can succeed on 32-bit builds, e.g. with more
1512        # than 2GB RAM and a 64-bit kernel.
1513        if sys.maxsize > 0x7FFFFFFF:
1514            rawio = self.MockRawIO()
1515            bufio = self.tp(rawio)
1516            self.assertRaises((OverflowError, MemoryError, ValueError),
1517                bufio.__init__, rawio, sys.maxsize)
1518
1519    def test_garbage_collection(self):
1520        CBufferedReaderTest.test_garbage_collection(self)
1521        CBufferedWriterTest.test_garbage_collection(self)
1522
1523class PyBufferedRandomTest(BufferedRandomTest):
1524    tp = pyio.BufferedRandom
1525
1526
1527# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1528# properties:
1529#   - A single output character can correspond to many bytes of input.
1530#   - The number of input bytes to complete the character can be
1531#     undetermined until the last input byte is received.
1532#   - The number of input bytes can vary depending on previous input.
1533#   - A single input byte can correspond to many characters of output.
1534#   - The number of output characters can be undetermined until the
1535#     last input byte is received.
1536#   - The number of output characters can vary depending on previous input.
1537
1538class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1539    """
1540    For testing seek/tell behavior with a stateful, buffering decoder.
1541
1542    Input is a sequence of words.  Words may be fixed-length (length set
1543    by input) or variable-length (period-terminated).  In variable-length
1544    mode, extra periods are ignored.  Possible words are:
1545      - 'i' followed by a number sets the input length, I (maximum 99).
1546        When I is set to 0, words are space-terminated.
1547      - 'o' followed by a number sets the output length, O (maximum 99).
1548      - Any other word is converted into a word followed by a period on
1549        the output.  The output word consists of the input word truncated
1550        or padded out with hyphens to make its length equal to O.  If O
1551        is 0, the word is output verbatim without truncating or padding.
1552    I and O are initially set to 1.  When I changes, any buffered input is
1553    re-scanned according to the new I.  EOF also terminates the last word.
1554    """
1555
1556    def __init__(self, errors='strict'):
1557        codecs.IncrementalDecoder.__init__(self, errors)
1558        self.reset()
1559
1560    def __repr__(self):
1561        return '<SID %x>' % id(self)
1562
1563    def reset(self):
1564        self.i = 1
1565        self.o = 1
1566        self.buffer = bytearray()
1567
1568    def getstate(self):
1569        i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1570        return bytes(self.buffer), i*100 + o
1571
1572    def setstate(self, state):
1573        buffer, io = state
1574        self.buffer = bytearray(buffer)
1575        i, o = divmod(io, 100)
1576        self.i, self.o = i ^ 1, o ^ 1
1577
1578    def decode(self, input, final=False):
1579        output = ''
1580        for b in input:
1581            if self.i == 0: # variable-length, terminated with period
1582                if b == '.':
1583                    if self.buffer:
1584                        output += self.process_word()
1585                else:
1586                    self.buffer.append(b)
1587            else: # fixed-length, terminate after self.i bytes
1588                self.buffer.append(b)
1589                if len(self.buffer) == self.i:
1590                    output += self.process_word()
1591        if final and self.buffer: # EOF terminates the last word
1592            output += self.process_word()
1593        return output
1594
1595    def process_word(self):
1596        output = ''
1597        if self.buffer[0] == ord('i'):
1598            self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1599        elif self.buffer[0] == ord('o'):
1600            self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1601        else:
1602            output = self.buffer.decode('ascii')
1603            if len(output) < self.o:
1604                output += '-'*self.o # pad out with hyphens
1605            if self.o:
1606                output = output[:self.o] # truncate to output length
1607            output += '.'
1608        self.buffer = bytearray()
1609        return output
1610
1611    codecEnabled = False
1612
1613    @classmethod
1614    def lookupTestDecoder(cls, name):
1615        if cls.codecEnabled and name == 'test_decoder':
1616            latin1 = codecs.lookup('latin-1')
1617            return codecs.CodecInfo(
1618                name='test_decoder', encode=latin1.encode, decode=None,
1619                incrementalencoder=None,
1620                streamreader=None, streamwriter=None,
1621                incrementaldecoder=cls)
1622
1623# Register the previous decoder for testing.
1624# Disabled by default, tests will enable it.
1625codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1626
1627
1628class StatefulIncrementalDecoderTest(unittest.TestCase):
1629    """
1630    Make sure the StatefulIncrementalDecoder actually works.
1631    """
1632
1633    test_cases = [
1634        # I=1, O=1 (fixed-length input == fixed-length output)
1635        (b'abcd', False, 'a.b.c.d.'),
1636        # I=0, O=0 (variable-length input, variable-length output)
1637        (b'oiabcd', True, 'abcd.'),
1638        # I=0, O=0 (should ignore extra periods)
1639        (b'oi...abcd...', True, 'abcd.'),
1640        # I=0, O=6 (variable-length input, fixed-length output)
1641        (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1642        # I=2, O=6 (fixed-length input < fixed-length output)
1643        (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1644        # I=6, O=3 (fixed-length input > fixed-length output)
1645        (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1646        # I=0, then 3; O=29, then 15 (with longer output)
1647        (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1648         'a----------------------------.' +
1649         'b----------------------------.' +
1650         'cde--------------------------.' +
1651         'abcdefghijabcde.' +
1652         'a.b------------.' +
1653         '.c.------------.' +
1654         'd.e------------.' +
1655         'k--------------.' +
1656         'l--------------.' +
1657         'm--------------.')
1658    ]
1659
1660    def test_decoder(self):
1661        # Try a few one-shot test cases.
1662        for input, eof, output in self.test_cases:
1663            d = StatefulIncrementalDecoder()
1664            self.assertEqual(d.decode(input, eof), output)
1665
1666        # Also test an unfinished decode, followed by forcing EOF.
1667        d = StatefulIncrementalDecoder()
1668        self.assertEqual(d.decode(b'oiabcd'), '')
1669        self.assertEqual(d.decode(b'', 1), 'abcd.')
1670
1671class TextIOWrapperTest(unittest.TestCase):
1672
1673    def setUp(self):
1674        self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1675        self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
1676        support.unlink(support.TESTFN)
1677
1678    def tearDown(self):
1679        support.unlink(support.TESTFN)
1680
1681    def test_constructor(self):
1682        r = self.BytesIO(b"\xc3\xa9\n\n")
1683        b = self.BufferedReader(r, 1000)
1684        t = self.TextIOWrapper(b)
1685        t.__init__(b, encoding="latin1", newline="\r\n")
1686        self.assertEqual(t.encoding, "latin1")
1687        self.assertEqual(t.line_buffering, False)
1688        t.__init__(b, encoding="utf8", line_buffering=True)
1689        self.assertEqual(t.encoding, "utf8")
1690        self.assertEqual(t.line_buffering, True)
1691        self.assertEqual("\xe9\n", t.readline())
1692        self.assertRaises(TypeError, t.__init__, b, newline=42)
1693        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1694
1695    def test_detach(self):
1696        r = self.BytesIO()
1697        b = self.BufferedWriter(r)
1698        t = self.TextIOWrapper(b)
1699        self.assertIs(t.detach(), b)
1700
1701        t = self.TextIOWrapper(b, encoding="ascii")
1702        t.write("howdy")
1703        self.assertFalse(r.getvalue())
1704        t.detach()
1705        self.assertEqual(r.getvalue(), b"howdy")
1706        self.assertRaises(ValueError, t.detach)
1707
1708    def test_repr(self):
1709        raw = self.BytesIO("hello".encode("utf-8"))
1710        b = self.BufferedReader(raw)
1711        t = self.TextIOWrapper(b, encoding="utf-8")
1712        modname = self.TextIOWrapper.__module__
1713        self.assertEqual(repr(t),
1714                         "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1715        raw.name = "dummy"
1716        self.assertEqual(repr(t),
1717                         "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1718        raw.name = b"dummy"
1719        self.assertEqual(repr(t),
1720                         "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1721
1722    def test_line_buffering(self):
1723        r = self.BytesIO()
1724        b = self.BufferedWriter(r, 1000)
1725        t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1726        t.write("X")
1727        self.assertEqual(r.getvalue(), b"")  # No flush happened
1728        t.write("Y\nZ")
1729        self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
1730        t.write("A\rB")
1731        self.assertEqual(r.getvalue(), b"XY\nZA\rB")
1732
1733    def test_encoding(self):
1734        # Check the encoding attribute is always set, and valid
1735        b = self.BytesIO()
1736        t = self.TextIOWrapper(b, encoding="utf8")
1737        self.assertEqual(t.encoding, "utf8")
1738        t = self.TextIOWrapper(b)
1739        self.assertTrue(t.encoding is not None)
1740        codecs.lookup(t.encoding)
1741
1742    def test_encoding_errors_reading(self):
1743        # (1) default
1744        b = self.BytesIO(b"abc\n\xff\n")
1745        t = self.TextIOWrapper(b, encoding="ascii")
1746        self.assertRaises(UnicodeError, t.read)
1747        # (2) explicit strict
1748        b = self.BytesIO(b"abc\n\xff\n")
1749        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1750        self.assertRaises(UnicodeError, t.read)
1751        # (3) ignore
1752        b = self.BytesIO(b"abc\n\xff\n")
1753        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
1754        self.assertEqual(t.read(), "abc\n\n")
1755        # (4) replace
1756        b = self.BytesIO(b"abc\n\xff\n")
1757        t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
1758        self.assertEqual(t.read(), "abc\n\ufffd\n")
1759
1760    def test_encoding_errors_writing(self):
1761        # (1) default
1762        b = self.BytesIO()
1763        t = self.TextIOWrapper(b, encoding="ascii")
1764        self.assertRaises(UnicodeError, t.write, "\xff")
1765        # (2) explicit strict
1766        b = self.BytesIO()
1767        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1768        self.assertRaises(UnicodeError, t.write, "\xff")
1769        # (3) ignore
1770        b = self.BytesIO()
1771        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
1772                             newline="\n")
1773        t.write("abc\xffdef\n")
1774        t.flush()
1775        self.assertEqual(b.getvalue(), b"abcdef\n")
1776        # (4) replace
1777        b = self.BytesIO()
1778        t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
1779                             newline="\n")
1780        t.write("abc\xffdef\n")
1781        t.flush()
1782        self.assertEqual(b.getvalue(), b"abc?def\n")
1783
1784    def test_newlines(self):
1785        input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1786
1787        tests = [
1788            [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1789            [ '', input_lines ],
1790            [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1791            [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1792            [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1793        ]
1794        encodings = (
1795            'utf-8', 'latin-1',
1796            'utf-16', 'utf-16-le', 'utf-16-be',
1797            'utf-32', 'utf-32-le', 'utf-32-be',
1798        )
1799
1800        # Try a range of buffer sizes to test the case where \r is the last
1801        # character in TextIOWrapper._pending_line.
1802        for encoding in encodings:
1803            # XXX: str.encode() should return bytes
1804            data = bytes(''.join(input_lines).encode(encoding))
1805            for do_reads in (False, True):
1806                for bufsize in range(1, 10):
1807                    for newline, exp_lines in tests:
1808                        bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1809                        textio = self.TextIOWrapper(bufio, newline=newline,
1810                                                  encoding=encoding)
1811                        if do_reads:
1812                            got_lines = []
1813                            while True:
1814                                c2 = textio.read(2)
1815                                if c2 == '':
1816                                    break
1817                                self.assertEqual(len(c2), 2)
1818                                got_lines.append(c2 + textio.readline())
1819                        else:
1820                            got_lines = list(textio)
1821
1822                        for got_line, exp_line in zip(got_lines, exp_lines):
1823                            self.assertEqual(got_line, exp_line)
1824                        self.assertEqual(len(got_lines), len(exp_lines))
1825
1826    def test_newlines_input(self):
1827        testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
1828        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1829        for newline, expected in [
1830            (None, normalized.decode("ascii").splitlines(True)),
1831            ("", testdata.decode("ascii").splitlines(True)),
1832            ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1833            ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1834            ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
1835            ]:
1836            buf = self.BytesIO(testdata)
1837            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1838            self.assertEqual(txt.readlines(), expected)
1839            txt.seek(0)
1840            self.assertEqual(txt.read(), "".join(expected))
1841
1842    def test_newlines_output(self):
1843        testdict = {
1844            "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1845            "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1846            "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1847            "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1848            }
1849        tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1850        for newline, expected in tests:
1851            buf = self.BytesIO()
1852            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1853            txt.write("AAA\nB")
1854            txt.write("BB\nCCC\n")
1855            txt.write("X\rY\r\nZ")
1856            txt.flush()
1857            self.assertEqual(buf.closed, False)
1858            self.assertEqual(buf.getvalue(), expected)
1859
1860    def test_destructor(self):
1861        l = []
1862        base = self.BytesIO
1863        class MyBytesIO(base):
1864            def close(self):
1865                l.append(self.getvalue())
1866                base.close(self)
1867        b = MyBytesIO()
1868        t = self.TextIOWrapper(b, encoding="ascii")
1869        t.write("abc")
1870        del t
1871        support.gc_collect()
1872        self.assertEqual([b"abc"], l)
1873
1874    def test_override_destructor(self):
1875        record = []
1876        class MyTextIO(self.TextIOWrapper):
1877            def __del__(self):
1878                record.append(1)
1879                try:
1880                    f = super(MyTextIO, self).__del__
1881                except AttributeError:
1882                    pass
1883                else:
1884                    f()
1885            def close(self):
1886                record.append(2)
1887                super(MyTextIO, self).close()
1888            def flush(self):
1889                record.append(3)
1890                super(MyTextIO, self).flush()
1891        b = self.BytesIO()
1892        t = MyTextIO(b, encoding="ascii")
1893        del t
1894        support.gc_collect()
1895        self.assertEqual(record, [1, 2, 3])
1896
1897    def test_error_through_destructor(self):
1898        # Test that the exception state is not modified by a destructor,
1899        # even if close() fails.
1900        rawio = self.CloseFailureIO()
1901        def f():
1902            self.TextIOWrapper(rawio).xyzzy
1903        with support.captured_output("stderr") as s:
1904            self.assertRaises(AttributeError, f)
1905        s = s.getvalue().strip()
1906        if s:
1907            # The destructor *may* have printed an unraisable error, check it
1908            self.assertEqual(len(s.splitlines()), 1)
1909            self.assertTrue(s.startswith("Exception IOError: "), s)
1910            self.assertTrue(s.endswith(" ignored"), s)
1911
1912    # Systematic tests of the text I/O API
1913
1914    def test_basic_io(self):
1915        for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1916            for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
1917                f = self.open(support.TESTFN, "w+", encoding=enc)
1918                f._CHUNK_SIZE = chunksize
1919                self.assertEqual(f.write("abc"), 3)
1920                f.close()
1921                f = self.open(support.TESTFN, "r+", encoding=enc)
1922                f._CHUNK_SIZE = chunksize
1923                self.assertEqual(f.tell(), 0)
1924                self.assertEqual(f.read(), "abc")
1925                cookie = f.tell()
1926                self.assertEqual(f.seek(0), 0)
1927                self.assertEqual(f.read(None), "abc")
1928                f.seek(0)
1929                self.assertEqual(f.read(2), "ab")
1930                self.assertEqual(f.read(1), "c")
1931                self.assertEqual(f.read(1), "")
1932                self.assertEqual(f.read(), "")
1933                self.assertEqual(f.tell(), cookie)
1934                self.assertEqual(f.seek(0), 0)
1935                self.assertEqual(f.seek(0, 2), cookie)
1936                self.assertEqual(f.write("def"), 3)
1937                self.assertEqual(f.seek(cookie), cookie)
1938                self.assertEqual(f.read(), "def")
1939                if enc.startswith("utf"):
1940                    self.multi_line_test(f, enc)
1941                f.close()
1942
1943    def multi_line_test(self, f, enc):
1944        f.seek(0)
1945        f.truncate()
1946        sample = "s\xff\u0fff\uffff"
1947        wlines = []
1948        for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1949            chars = []
1950            for i in range(size):
1951                chars.append(sample[i % len(sample)])
1952            line = "".join(chars) + "\n"
1953            wlines.append((f.tell(), line))
1954            f.write(line)
1955        f.seek(0)
1956        rlines = []
1957        while True:
1958            pos = f.tell()
1959            line = f.readline()
1960            if not line:
1961                break
1962            rlines.append((pos, line))
1963        self.assertEqual(rlines, wlines)
1964
1965    def test_telling(self):
1966        f = self.open(support.TESTFN, "w+", encoding="utf8")
1967        p0 = f.tell()
1968        f.write("\xff\n")
1969        p1 = f.tell()
1970        f.write("\xff\n")
1971        p2 = f.tell()
1972        f.seek(0)
1973        self.assertEqual(f.tell(), p0)
1974        self.assertEqual(f.readline(), "\xff\n")
1975        self.assertEqual(f.tell(), p1)
1976        self.assertEqual(f.readline(), "\xff\n")
1977        self.assertEqual(f.tell(), p2)
1978        f.seek(0)
1979        for line in f:
1980            self.assertEqual(line, "\xff\n")
1981            self.assertRaises(IOError, f.tell)
1982        self.assertEqual(f.tell(), p2)
1983        f.close()
1984
1985    def test_seeking(self):
1986        chunk_size = _default_chunk_size()
1987        prefix_size = chunk_size - 2
1988        u_prefix = "a" * prefix_size
1989        prefix = bytes(u_prefix.encode("utf-8"))
1990        self.assertEqual(len(u_prefix), len(prefix))
1991        u_suffix = "\u8888\n"
1992        suffix = bytes(u_suffix.encode("utf-8"))
1993        line = prefix + suffix
1994        f = self.open(support.TESTFN, "wb")
1995        f.write(line*2)
1996        f.close()
1997        f = self.open(support.TESTFN, "r", encoding="utf-8")
1998        s = f.read(prefix_size)
1999        self.assertEqual(s, prefix.decode("ascii"))
2000        self.assertEqual(f.tell(), prefix_size)
2001        self.assertEqual(f.readline(), u_suffix)
2002
2003    def test_seeking_too(self):
2004        # Regression test for a specific bug
2005        data = b'\xe0\xbf\xbf\n'
2006        f = self.open(support.TESTFN, "wb")
2007        f.write(data)
2008        f.close()
2009        f = self.open(support.TESTFN, "r", encoding="utf-8")
2010        f._CHUNK_SIZE  # Just test that it exists
2011        f._CHUNK_SIZE = 2
2012        f.readline()
2013        f.tell()
2014
2015    def test_seek_and_tell(self):
2016        #Test seek/tell using the StatefulIncrementalDecoder.
2017        # Make test faster by doing smaller seeks
2018        CHUNK_SIZE = 128
2019
2020        def test_seek_and_tell_with_data(data, min_pos=0):
2021            """Tell/seek to various points within a data stream and ensure
2022            that the decoded data returned by read() is consistent."""
2023            f = self.open(support.TESTFN, 'wb')
2024            f.write(data)
2025            f.close()
2026            f = self.open(support.TESTFN, encoding='test_decoder')
2027            f._CHUNK_SIZE = CHUNK_SIZE
2028            decoded = f.read()
2029            f.close()
2030
2031            for i in range(min_pos, len(decoded) + 1): # seek positions
2032                for j in [1, 5, len(decoded) - i]: # read lengths
2033                    f = self.open(support.TESTFN, encoding='test_decoder')
2034                    self.assertEqual(f.read(i), decoded[:i])
2035                    cookie = f.tell()
2036                    self.assertEqual(f.read(j), decoded[i:i + j])
2037                    f.seek(cookie)
2038                    self.assertEqual(f.read(), decoded[i:])
2039                    f.close()
2040
2041        # Enable the test decoder.
2042        StatefulIncrementalDecoder.codecEnabled = 1
2043
2044        # Run the tests.
2045        try:
2046            # Try each test case.
2047            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2048                test_seek_and_tell_with_data(input)
2049
2050            # Position each test case so that it crosses a chunk boundary.
2051            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2052                offset = CHUNK_SIZE - len(input)//2
2053                prefix = b'.'*offset
2054                # Don't bother seeking into the prefix (takes too long).
2055                min_pos = offset*2
2056                test_seek_and_tell_with_data(prefix + input, min_pos)
2057
2058        # Ensure our test decoder won't interfere with subsequent tests.
2059        finally:
2060            StatefulIncrementalDecoder.codecEnabled = 0
2061
2062    def test_encoded_writes(self):
2063        data = "1234567890"
2064        tests = ("utf-16",
2065                 "utf-16-le",
2066                 "utf-16-be",
2067                 "utf-32",
2068                 "utf-32-le",
2069                 "utf-32-be")
2070        for encoding in tests:
2071            buf = self.BytesIO()
2072            f = self.TextIOWrapper(buf, encoding=encoding)
2073            # Check if the BOM is written only once (see issue1753).
2074            f.write(data)
2075            f.write(data)
2076            f.seek(0)
2077            self.assertEqual(f.read(), data * 2)
2078            f.seek(0)
2079            self.assertEqual(f.read(), data * 2)
2080            self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
2081
2082    def test_unreadable(self):
2083        class UnReadable(self.BytesIO):
2084            def readable(self):
2085                return False
2086        txt = self.TextIOWrapper(UnReadable())
2087        self.assertRaises(IOError, txt.read)
2088
2089    def test_read_one_by_one(self):
2090        txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
2091        reads = ""
2092        while True:
2093            c = txt.read(1)
2094            if not c:
2095                break
2096            reads += c
2097        self.assertEqual(reads, "AA\nBB")
2098
2099    def test_readlines(self):
2100        txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2101        self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2102        txt.seek(0)
2103        self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2104        txt.seek(0)
2105        self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2106
2107    # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
2108    def test_read_by_chunk(self):
2109        # make sure "\r\n" straddles 128 char boundary.
2110        txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
2111        reads = ""
2112        while True:
2113            c = txt.read(128)
2114            if not c:
2115                break
2116            reads += c
2117        self.assertEqual(reads, "A"*127+"\nB")
2118
2119    def test_issue1395_1(self):
2120        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2121
2122        # read one char at a time
2123        reads = ""
2124        while True:
2125            c = txt.read(1)
2126            if not c:
2127                break
2128            reads += c
2129        self.assertEqual(reads, self.normalized)
2130
2131    def test_issue1395_2(self):
2132        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2133        txt._CHUNK_SIZE = 4
2134
2135        reads = ""
2136        while True:
2137            c = txt.read(4)
2138            if not c:
2139                break
2140            reads += c
2141        self.assertEqual(reads, self.normalized)
2142
2143    def test_issue1395_3(self):
2144        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2145        txt._CHUNK_SIZE = 4
2146
2147        reads = txt.read(4)
2148        reads += txt.read(4)
2149        reads += txt.readline()
2150        reads += txt.readline()
2151        reads += txt.readline()
2152        self.assertEqual(reads, self.normalized)
2153
2154    def test_issue1395_4(self):
2155        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2156        txt._CHUNK_SIZE = 4
2157
2158        reads = txt.read(4)
2159        reads += txt.read()
2160        self.assertEqual(reads, self.normalized)
2161
2162    def test_issue1395_5(self):
2163        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2164        txt._CHUNK_SIZE = 4
2165
2166        reads = txt.read(4)
2167        pos = txt.tell()
2168        txt.seek(0)
2169        txt.seek(pos)
2170        self.assertEqual(txt.read(4), "BBB\n")
2171
2172    def test_issue2282(self):
2173        buffer = self.BytesIO(self.testdata)
2174        txt = self.TextIOWrapper(buffer, encoding="ascii")
2175
2176        self.assertEqual(buffer.seekable(), txt.seekable())
2177
2178    def test_append_bom(self):
2179        # The BOM is not written again when appending to a non-empty file
2180        filename = support.TESTFN
2181        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2182            with self.open(filename, 'w', encoding=charset) as f:
2183                f.write('aaa')
2184                pos = f.tell()
2185            with self.open(filename, 'rb') as f:
2186                self.assertEqual(f.read(), 'aaa'.encode(charset))
2187
2188            with self.open(filename, 'a', encoding=charset) as f:
2189                f.write('xxx')
2190            with self.open(filename, 'rb') as f:
2191                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2192
2193    def test_seek_bom(self):
2194        # Same test, but when seeking manually
2195        filename = support.TESTFN
2196        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2197            with self.open(filename, 'w', encoding=charset) as f:
2198                f.write('aaa')
2199                pos = f.tell()
2200            with self.open(filename, 'r+', encoding=charset) as f:
2201                f.seek(pos)
2202                f.write('zzz')
2203                f.seek(0)
2204                f.write('bbb')
2205            with self.open(filename, 'rb') as f:
2206                self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
2207
2208    def test_errors_property(self):
2209        with self.open(support.TESTFN, "w") as f:
2210            self.assertEqual(f.errors, "strict")
2211        with self.open(support.TESTFN, "w", errors="replace") as f:
2212            self.assertEqual(f.errors, "replace")
2213
2214    @unittest.skipUnless(threading, 'Threading required for this test.')
2215    def test_threads_write(self):
2216        # Issue6750: concurrent writes could duplicate data
2217        event = threading.Event()
2218        with self.open(support.TESTFN, "w", buffering=1) as f:
2219            def run(n):
2220                text = "Thread%03d\n" % n
2221                event.wait()
2222                f.write(text)
2223            threads = [threading.Thread(target=lambda n=x: run(n))
2224                       for x in range(20)]
2225            for t in threads:
2226                t.start()
2227            time.sleep(0.02)
2228            event.set()
2229            for t in threads:
2230                t.join()
2231        with self.open(support.TESTFN) as f:
2232            content = f.read()
2233            for n in range(20):
2234                self.assertEqual(content.count("Thread%03d\n" % n), 1)
2235
2236    def test_flush_error_on_close(self):
2237        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2238        def bad_flush():
2239            raise IOError()
2240        txt.flush = bad_flush
2241        self.assertRaises(IOError, txt.close) # exception not swallowed
2242
2243    def test_multi_close(self):
2244        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2245        txt.close()
2246        txt.close()
2247        txt.close()
2248        self.assertRaises(ValueError, txt.flush)
2249
2250    def test_readonly_attributes(self):
2251        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2252        buf = self.BytesIO(self.testdata)
2253        with self.assertRaises((AttributeError, TypeError)):
2254            txt.buffer = buf
2255
2256class CTextIOWrapperTest(TextIOWrapperTest):
2257
2258    def test_initialization(self):
2259        r = self.BytesIO(b"\xc3\xa9\n\n")
2260        b = self.BufferedReader(r, 1000)
2261        t = self.TextIOWrapper(b)
2262        self.assertRaises(TypeError, t.__init__, b, newline=42)
2263        self.assertRaises(ValueError, t.read)
2264        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2265        self.assertRaises(ValueError, t.read)
2266
2267    def test_garbage_collection(self):
2268        # C TextIOWrapper objects are collected, and collecting them flushes
2269        # all data to disk.
2270        # The Python version has __del__, so it ends in gc.garbage instead.
2271        rawio = io.FileIO(support.TESTFN, "wb")
2272        b = self.BufferedWriter(rawio)
2273        t = self.TextIOWrapper(b, encoding="ascii")
2274        t.write("456def")
2275        t.x = t
2276        wr = weakref.ref(t)
2277        del t
2278        support.gc_collect()
2279        self.assertTrue(wr() is None, wr)
2280        with self.open(support.TESTFN, "rb") as f:
2281            self.assertEqual(f.read(), b"456def")
2282
2283class PyTextIOWrapperTest(TextIOWrapperTest):
2284    pass
2285
2286
2287class IncrementalNewlineDecoderTest(unittest.TestCase):
2288
2289    def check_newline_decoding_utf8(self, decoder):
2290        # UTF-8 specific tests for a newline decoder
2291        def _check_decode(b, s, **kwargs):
2292            # We exercise getstate() / setstate() as well as decode()
2293            state = decoder.getstate()
2294            self.assertEqual(decoder.decode(b, **kwargs), s)
2295            decoder.setstate(state)
2296            self.assertEqual(decoder.decode(b, **kwargs), s)
2297
2298        _check_decode(b'\xe8\xa2\x88', "\u8888")
2299
2300        _check_decode(b'\xe8', "")
2301        _check_decode(b'\xa2', "")
2302        _check_decode(b'\x88', "\u8888")
2303
2304        _check_decode(b'\xe8', "")
2305        _check_decode(b'\xa2', "")
2306        _check_decode(b'\x88', "\u8888")
2307
2308        _check_decode(b'\xe8', "")
2309        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2310
2311        decoder.reset()
2312        _check_decode(b'\n', "\n")
2313        _check_decode(b'\r', "")
2314        _check_decode(b'', "\n", final=True)
2315        _check_decode(b'\r', "\n", final=True)
2316
2317        _check_decode(b'\r', "")
2318        _check_decode(b'a', "\na")
2319
2320        _check_decode(b'\r\r\n', "\n\n")
2321        _check_decode(b'\r', "")
2322        _check_decode(b'\r', "\n")
2323        _check_decode(b'\na', "\na")
2324
2325        _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2326        _check_decode(b'\xe8\xa2\x88', "\u8888")
2327        _check_decode(b'\n', "\n")
2328        _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2329        _check_decode(b'\n', "\n")
2330
2331    def check_newline_decoding(self, decoder, encoding):
2332        result = []
2333        if encoding is not None:
2334            encoder = codecs.getincrementalencoder(encoding)()
2335            def _decode_bytewise(s):
2336                # Decode one byte at a time
2337                for b in encoder.encode(s):
2338                    result.append(decoder.decode(b))
2339        else:
2340            encoder = None
2341            def _decode_bytewise(s):
2342                # Decode one char at a time
2343                for c in s:
2344                    result.append(decoder.decode(c))
2345        self.assertEqual(decoder.newlines, None)
2346        _decode_bytewise("abc\n\r")
2347        self.assertEqual(decoder.newlines, '\n')
2348        _decode_bytewise("\nabc")
2349        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
2350        _decode_bytewise("abc\r")
2351        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
2352        _decode_bytewise("abc")
2353        self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
2354        _decode_bytewise("abc\r")
2355        self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
2356        decoder.reset()
2357        input = "abc"
2358        if encoder is not None:
2359            encoder.reset()
2360            input = encoder.encode(input)
2361        self.assertEqual(decoder.decode(input), "abc")
2362        self.assertEqual(decoder.newlines, None)
2363
2364    def test_newline_decoder(self):
2365        encodings = (
2366            # None meaning the IncrementalNewlineDecoder takes unicode input
2367            # rather than bytes input
2368            None, 'utf-8', 'latin-1',
2369            'utf-16', 'utf-16-le', 'utf-16-be',
2370            'utf-32', 'utf-32-le', 'utf-32-be',
2371        )
2372        for enc in encodings:
2373            decoder = enc and codecs.getincrementaldecoder(enc)()
2374            decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2375            self.check_newline_decoding(decoder, enc)
2376        decoder = codecs.getincrementaldecoder("utf-8")()
2377        decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2378        self.check_newline_decoding_utf8(decoder)
2379
2380    def test_newline_bytes(self):
2381        # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2382        def _check(dec):
2383            self.assertEqual(dec.newlines, None)
2384            self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2385            self.assertEqual(dec.newlines, None)
2386            self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2387            self.assertEqual(dec.newlines, None)
2388        dec = self.IncrementalNewlineDecoder(None, translate=False)
2389        _check(dec)
2390        dec = self.IncrementalNewlineDecoder(None, translate=True)
2391        _check(dec)
2392
2393class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2394    pass
2395
2396class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2397    pass
2398
2399
2400# XXX Tests for open()
2401
2402class MiscIOTest(unittest.TestCase):
2403
2404    def tearDown(self):
2405        support.unlink(support.TESTFN)
2406
2407    def test___all__(self):
2408        for name in self.io.__all__:
2409            obj = getattr(self.io, name, None)
2410            self.assertTrue(obj is not None, name)
2411            if name == "open":
2412                continue
2413            elif "error" in name.lower() or name == "UnsupportedOperation":
2414                self.assertTrue(issubclass(obj, Exception), name)
2415            elif not name.startswith("SEEK_"):
2416                self.assertTrue(issubclass(obj, self.IOBase))
2417
2418    def test_attributes(self):
2419        f = self.open(support.TESTFN, "wb", buffering=0)
2420        self.assertEqual(f.mode, "wb")
2421        f.close()
2422
2423        f = self.open(support.TESTFN, "U")
2424        self.assertEqual(f.name,            support.TESTFN)
2425        self.assertEqual(f.buffer.name,     support.TESTFN)
2426        self.assertEqual(f.buffer.raw.name, support.TESTFN)
2427        self.assertEqual(f.mode,            "U")
2428        self.assertEqual(f.buffer.mode,     "rb")
2429        self.assertEqual(f.buffer.raw.mode, "rb")
2430        f.close()
2431
2432        f = self.open(support.TESTFN, "w+")
2433        self.assertEqual(f.mode,            "w+")
2434        self.assertEqual(f.buffer.mode,     "rb+") # Does it really matter?
2435        self.assertEqual(f.buffer.raw.mode, "rb+")
2436
2437        g = self.open(f.fileno(), "wb", closefd=False)
2438        self.assertEqual(g.mode,     "wb")
2439        self.assertEqual(g.raw.mode, "wb")
2440        self.assertEqual(g.name,     f.fileno())
2441        self.assertEqual(g.raw.name, f.fileno())
2442        f.close()
2443        g.close()
2444
2445    def test_io_after_close(self):
2446        for kwargs in [
2447                {"mode": "w"},
2448                {"mode": "wb"},
2449                {"mode": "w", "buffering": 1},
2450                {"mode": "w", "buffering": 2},
2451                {"mode": "wb", "buffering": 0},
2452                {"mode": "r"},
2453                {"mode": "rb"},
2454                {"mode": "r", "buffering": 1},
2455                {"mode": "r", "buffering": 2},
2456                {"mode": "rb", "buffering": 0},
2457                {"mode": "w+"},
2458                {"mode": "w+b"},
2459                {"mode": "w+", "buffering": 1},
2460                {"mode": "w+", "buffering": 2},
2461                {"mode": "w+b", "buffering": 0},
2462            ]:
2463            f = self.open(support.TESTFN, **kwargs)
2464            f.close()
2465            self.assertRaises(ValueError, f.flush)
2466            self.assertRaises(ValueError, f.fileno)
2467            self.assertRaises(ValueError, f.isatty)
2468            self.assertRaises(ValueError, f.__iter__)
2469            if hasattr(f, "peek"):
2470                self.assertRaises(ValueError, f.peek, 1)
2471            self.assertRaises(ValueError, f.read)
2472            if hasattr(f, "read1"):
2473                self.assertRaises(ValueError, f.read1, 1024)
2474            if hasattr(f, "readall"):
2475                self.assertRaises(ValueError, f.readall)
2476            if hasattr(f, "readinto"):
2477                self.assertRaises(ValueError, f.readinto, bytearray(1024))
2478            self.assertRaises(ValueError, f.readline)
2479            self.assertRaises(ValueError, f.readlines)
2480            self.assertRaises(ValueError, f.seek, 0)
2481            self.assertRaises(ValueError, f.tell)
2482            self.assertRaises(ValueError, f.truncate)
2483            self.assertRaises(ValueError, f.write,
2484                              b"" if "b" in kwargs['mode'] else "")
2485            self.assertRaises(ValueError, f.writelines, [])
2486            self.assertRaises(ValueError, next, f)
2487
2488    def test_blockingioerror(self):
2489        # Various BlockingIOError issues
2490        self.assertRaises(TypeError, self.BlockingIOError)
2491        self.assertRaises(TypeError, self.BlockingIOError, 1)
2492        self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2493        self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2494        b = self.BlockingIOError(1, "")
2495        self.assertEqual(b.characters_written, 0)
2496        class C(unicode):
2497            pass
2498        c = C("")
2499        b = self.BlockingIOError(1, c)
2500        c.b = b
2501        b.c = c
2502        wr = weakref.ref(c)
2503        del c, b
2504        support.gc_collect()
2505        self.assertTrue(wr() is None, wr)
2506
2507    def test_abcs(self):
2508        # Test the visible base classes are ABCs.
2509        self.assertIsInstance(self.IOBase, abc.ABCMeta)
2510        self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2511        self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2512        self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
2513
2514    def _check_abc_inheritance(self, abcmodule):
2515        with self.open(support.TESTFN, "wb", buffering=0) as f:
2516            self.assertIsInstance(f, abcmodule.IOBase)
2517            self.assertIsInstance(f, abcmodule.RawIOBase)
2518            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2519            self.assertNotIsInstance(f, abcmodule.TextIOBase)
2520        with self.open(support.TESTFN, "wb") as f:
2521            self.assertIsInstance(f, abcmodule.IOBase)
2522            self.assertNotIsInstance(f, abcmodule.RawIOBase)
2523            self.assertIsInstance(f, abcmodule.BufferedIOBase)
2524            self.assertNotIsInstance(f, abcmodule.TextIOBase)
2525        with self.open(support.TESTFN, "w") as f:
2526            self.assertIsInstance(f, abcmodule.IOBase)
2527            self.assertNotIsInstance(f, abcmodule.RawIOBase)
2528            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2529            self.assertIsInstance(f, abcmodule.TextIOBase)
2530
2531    def test_abc_inheritance(self):
2532        # Test implementations inherit from their respective ABCs
2533        self._check_abc_inheritance(self)
2534
2535    def test_abc_inheritance_official(self):
2536        # Test implementations inherit from the official ABCs of the
2537        # baseline "io" module.
2538        self._check_abc_inheritance(io)
2539
2540class CMiscIOTest(MiscIOTest):
2541    io = io
2542
2543class PyMiscIOTest(MiscIOTest):
2544    io = pyio
2545
2546
2547@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2548class SignalsTest(unittest.TestCase):
2549
2550    def setUp(self):
2551        self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2552
2553    def tearDown(self):
2554        signal.signal(signal.SIGALRM, self.oldalrm)
2555
2556    def alarm_interrupt(self, sig, frame):
2557        1 // 0
2558
2559    @unittest.skipUnless(threading, 'Threading required for this test.')
2560    def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2561        """Check that a partial write, when it gets interrupted, properly
2562        invokes the signal handler, and bubbles up the exception raised
2563        in the latter."""
2564        read_results = []
2565        def _read():
2566            s = os.read(r, 1)
2567            read_results.append(s)
2568        t = threading.Thread(target=_read)
2569        t.daemon = True
2570        r, w = os.pipe()
2571        try:
2572            wio = self.io.open(w, **fdopen_kwargs)
2573            t.start()
2574            signal.alarm(1)
2575            # Fill the pipe enough that the write will be blocking.
2576            # It will be interrupted by the timer armed above.  Since the
2577            # other thread has read one byte, the low-level write will
2578            # return with a successful (partial) result rather than an EINTR.
2579            # The buffered IO layer must check for pending signal
2580            # handlers, which in this case will invoke alarm_interrupt().
2581            self.assertRaises(ZeroDivisionError,
2582                              wio.write, item * (1024 * 1024))
2583            t.join()
2584            # We got one byte, get another one and check that it isn't a
2585            # repeat of the first one.
2586            read_results.append(os.read(r, 1))
2587            self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2588        finally:
2589            os.close(w)
2590            os.close(r)
2591            # This is deliberate. If we didn't close the file descriptor
2592            # before closing wio, wio would try to flush its internal
2593            # buffer, and block again.
2594            try:
2595                wio.close()
2596            except IOError as e:
2597                if e.errno != errno.EBADF:
2598                    raise
2599
2600    def test_interrupted_write_unbuffered(self):
2601        self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2602
2603    def test_interrupted_write_buffered(self):
2604        self.check_interrupted_write(b"xy", b"xy", mode="wb")
2605
2606    def test_interrupted_write_text(self):
2607        self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2608
2609    def check_reentrant_write(self, data, **fdopen_kwargs):
2610        def on_alarm(*args):
2611            # Will be called reentrantly from the same thread
2612            wio.write(data)
2613            1/0
2614        signal.signal(signal.SIGALRM, on_alarm)
2615        r, w = os.pipe()
2616        wio = self.io.open(w, **fdopen_kwargs)
2617        try:
2618            signal.alarm(1)
2619            # Either the reentrant call to wio.write() fails with RuntimeError,
2620            # or the signal handler raises ZeroDivisionError.
2621            with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2622                while 1:
2623                    for i in range(100):
2624                        wio.write(data)
2625                        wio.flush()
2626                    # Make sure the buffer doesn't fill up and block further writes
2627                    os.read(r, len(data) * 100)
2628            exc = cm.exception
2629            if isinstance(exc, RuntimeError):
2630                self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2631        finally:
2632            wio.close()
2633            os.close(r)
2634
2635    def test_reentrant_write_buffered(self):
2636        self.check_reentrant_write(b"xy", mode="wb")
2637
2638    def test_reentrant_write_text(self):
2639        self.check_reentrant_write("xy", mode="w", encoding="ascii")
2640
2641    def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2642        """Check that a buffered read, when it gets interrupted (either
2643        returning a partial result or EINTR), properly invokes the signal
2644        handler and retries if the latter returned successfully."""
2645        r, w = os.pipe()
2646        fdopen_kwargs["closefd"] = False
2647        def alarm_handler(sig, frame):
2648            os.write(w, b"bar")
2649        signal.signal(signal.SIGALRM, alarm_handler)
2650        try:
2651            rio = self.io.open(r, **fdopen_kwargs)
2652            os.write(w, b"foo")
2653            signal.alarm(1)
2654            # Expected behaviour:
2655            # - first raw read() returns partial b"foo"
2656            # - second raw read() returns EINTR
2657            # - third raw read() returns b"bar"
2658            self.assertEqual(decode(rio.read(6)), "foobar")
2659        finally:
2660            rio.close()
2661            os.close(w)
2662            os.close(r)
2663
2664    def test_interrupterd_read_retry_buffered(self):
2665        self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2666                                          mode="rb")
2667
2668    def test_interrupterd_read_retry_text(self):
2669        self.check_interrupted_read_retry(lambda x: x,
2670                                          mode="r")
2671
2672    @unittest.skipUnless(threading, 'Threading required for this test.')
2673    def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2674        """Check that a buffered write, when it gets interrupted (either
2675        returning a partial result or EINTR), properly invokes the signal
2676        handler and retries if the latter returned successfully."""
2677        select = support.import_module("select")
2678        # A quantity that exceeds the buffer size of an anonymous pipe's
2679        # write end.
2680        N = 1024 * 1024
2681        r, w = os.pipe()
2682        fdopen_kwargs["closefd"] = False
2683        # We need a separate thread to read from the pipe and allow the
2684        # write() to finish.  This thread is started after the SIGALRM is
2685        # received (forcing a first EINTR in write()).
2686        read_results = []
2687        write_finished = False
2688        def _read():
2689            while not write_finished:
2690                while r in select.select([r], [], [], 1.0)[0]:
2691                    s = os.read(r, 1024)
2692                    read_results.append(s)
2693        t = threading.Thread(target=_read)
2694        t.daemon = True
2695        def alarm1(sig, frame):
2696            signal.signal(signal.SIGALRM, alarm2)
2697            signal.alarm(1)
2698        def alarm2(sig, frame):
2699            t.start()
2700        signal.signal(signal.SIGALRM, alarm1)
2701        try:
2702            wio = self.io.open(w, **fdopen_kwargs)
2703            signal.alarm(1)
2704            # Expected behaviour:
2705            # - first raw write() is partial (because of the limited pipe buffer
2706            #   and the first alarm)
2707            # - second raw write() returns EINTR (because of the second alarm)
2708            # - subsequent write()s are successful (either partial or complete)
2709            self.assertEqual(N, wio.write(item * N))
2710            wio.flush()
2711            write_finished = True
2712            t.join()
2713            self.assertEqual(N, sum(len(x) for x in read_results))
2714        finally:
2715            write_finished = True
2716            os.close(w)
2717            os.close(r)
2718            # This is deliberate. If we didn't close the file descriptor
2719            # before closing wio, wio would try to flush its internal
2720            # buffer, and could block (in case of failure).
2721            try:
2722                wio.close()
2723            except IOError as e:
2724                if e.errno != errno.EBADF:
2725                    raise
2726
2727    def test_interrupterd_write_retry_buffered(self):
2728        self.check_interrupted_write_retry(b"x", mode="wb")
2729
2730    def test_interrupterd_write_retry_text(self):
2731        self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2732
2733
2734class CSignalsTest(SignalsTest):
2735    io = io
2736
2737class PySignalsTest(SignalsTest):
2738    io = pyio
2739
2740    # Handling reentrancy issues would slow down _pyio even more, so the
2741    # tests are disabled.
2742    test_reentrant_write_buffered = None
2743    test_reentrant_write_text = None
2744
2745
2746def test_main():
2747    tests = (CIOTest, PyIOTest,
2748             CBufferedReaderTest, PyBufferedReaderTest,
2749             CBufferedWriterTest, PyBufferedWriterTest,
2750             CBufferedRWPairTest, PyBufferedRWPairTest,
2751             CBufferedRandomTest, PyBufferedRandomTest,
2752             StatefulIncrementalDecoderTest,
2753             CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2754             CTextIOWrapperTest, PyTextIOWrapperTest,
2755             CMiscIOTest, PyMiscIOTest,
2756             CSignalsTest, PySignalsTest,
2757             )
2758
2759    # Put the namespaces of the IO module we are testing and some useful mock
2760    # classes in the __dict__ of each test.
2761    mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2762             MockNonBlockWriterIO, MockRawIOWithoutRead)
2763    all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2764    c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2765    py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2766    globs = globals()
2767    c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2768    py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2769    # Avoid turning open into a bound method.
2770    py_io_ns["open"] = pyio.OpenWrapper
2771    for test in tests:
2772        if test.__name__.startswith("C"):
2773            for name, obj in c_io_ns.items():
2774                setattr(test, name, obj)
2775        elif test.__name__.startswith("Py"):
2776            for name, obj in py_io_ns.items():
2777                setattr(test, name, obj)
2778
2779    support.run_unittest(*tests)
2780
2781if __name__ == "__main__":
2782    test_main()
2783