• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11#     (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as an attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
22import abc
23import array
24import errno
25import locale
26import os
27import pickle
28import random
29import signal
30import sys
31import sysconfig
32import threading
33import time
34import unittest
35import warnings
36import weakref
37from collections import deque, UserList
38from itertools import cycle, count
39from test import support
40from test.support.script_helper import assert_python_ok, run_python_until_end
41from test.support import FakePath
42
43import codecs
44import io  # C implementation of io
45import _pyio as pyio # Python implementation of io
46
47try:
48    import ctypes
49except ImportError:
50    def byteslike(*pos, **kw):
51        return array.array("b", bytes(*pos, **kw))
52else:
53    def byteslike(*pos, **kw):
54        """Create a bytes-like object having no string or sequence methods"""
55        data = bytes(*pos, **kw)
56        obj = EmptyStruct()
57        ctypes.resize(obj, len(data))
58        memoryview(obj).cast("B")[:] = data
59        return obj
60    class EmptyStruct(ctypes.Structure):
61        pass
62
63_cflags = sysconfig.get_config_var('CFLAGS') or ''
64_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
65MEMORY_SANITIZER = (
66    '-fsanitize=memory' in _cflags or
67    '--with-memory-sanitizer' in _config_args
68)
69
70# Does io.IOBase finalizer log the exception if the close() method fails?
71# The exception is ignored silently by default in release build.
72IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
73
74
75def _default_chunk_size():
76    """Get the default TextIOWrapper chunk size"""
77    with open(__file__, "r", encoding="latin-1") as f:
78        return f._CHUNK_SIZE
79
80
81class MockRawIOWithoutRead:
82    """A RawIO implementation without read(), so as to exercise the default
83    RawIO.read() which calls readinto()."""
84
85    def __init__(self, read_stack=()):
86        self._read_stack = list(read_stack)
87        self._write_stack = []
88        self._reads = 0
89        self._extraneous_reads = 0
90
91    def write(self, b):
92        self._write_stack.append(bytes(b))
93        return len(b)
94
95    def writable(self):
96        return True
97
98    def fileno(self):
99        return 42
100
101    def readable(self):
102        return True
103
104    def seekable(self):
105        return True
106
107    def seek(self, pos, whence):
108        return 0   # wrong but we gotta return something
109
110    def tell(self):
111        return 0   # same comment as above
112
113    def readinto(self, buf):
114        self._reads += 1
115        max_len = len(buf)
116        try:
117            data = self._read_stack[0]
118        except IndexError:
119            self._extraneous_reads += 1
120            return 0
121        if data is None:
122            del self._read_stack[0]
123            return None
124        n = len(data)
125        if len(data) <= max_len:
126            del self._read_stack[0]
127            buf[:n] = data
128            return n
129        else:
130            buf[:] = data[:max_len]
131            self._read_stack[0] = data[max_len:]
132            return max_len
133
134    def truncate(self, pos=None):
135        return pos
136
137class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
138    pass
139
140class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
141    pass
142
143
144class MockRawIO(MockRawIOWithoutRead):
145
146    def read(self, n=None):
147        self._reads += 1
148        try:
149            return self._read_stack.pop(0)
150        except:
151            self._extraneous_reads += 1
152            return b""
153
154class CMockRawIO(MockRawIO, io.RawIOBase):
155    pass
156
157class PyMockRawIO(MockRawIO, pyio.RawIOBase):
158    pass
159
160
161class MisbehavedRawIO(MockRawIO):
162    def write(self, b):
163        return super().write(b) * 2
164
165    def read(self, n=None):
166        return super().read(n) * 2
167
168    def seek(self, pos, whence):
169        return -123
170
171    def tell(self):
172        return -456
173
174    def readinto(self, buf):
175        super().readinto(buf)
176        return len(buf) * 5
177
178class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
179    pass
180
181class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
182    pass
183
184
185class SlowFlushRawIO(MockRawIO):
186    def __init__(self):
187        super().__init__()
188        self.in_flush = threading.Event()
189
190    def flush(self):
191        self.in_flush.set()
192        time.sleep(0.25)
193
194class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
195    pass
196
197class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
198    pass
199
200
201class CloseFailureIO(MockRawIO):
202    closed = 0
203
204    def close(self):
205        if not self.closed:
206            self.closed = 1
207            raise OSError
208
209class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
210    pass
211
212class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
213    pass
214
215
216class MockFileIO:
217
218    def __init__(self, data):
219        self.read_history = []
220        super().__init__(data)
221
222    def read(self, n=None):
223        res = super().read(n)
224        self.read_history.append(None if res is None else len(res))
225        return res
226
227    def readinto(self, b):
228        res = super().readinto(b)
229        self.read_history.append(res)
230        return res
231
232class CMockFileIO(MockFileIO, io.BytesIO):
233    pass
234
235class PyMockFileIO(MockFileIO, pyio.BytesIO):
236    pass
237
238
239class MockUnseekableIO:
240    def seekable(self):
241        return False
242
243    def seek(self, *args):
244        raise self.UnsupportedOperation("not seekable")
245
246    def tell(self, *args):
247        raise self.UnsupportedOperation("not seekable")
248
249    def truncate(self, *args):
250        raise self.UnsupportedOperation("not seekable")
251
252class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
253    UnsupportedOperation = io.UnsupportedOperation
254
255class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
256    UnsupportedOperation = pyio.UnsupportedOperation
257
258
259class MockNonBlockWriterIO:
260
261    def __init__(self):
262        self._write_stack = []
263        self._blocker_char = None
264
265    def pop_written(self):
266        s = b"".join(self._write_stack)
267        self._write_stack[:] = []
268        return s
269
270    def block_on(self, char):
271        """Block when a given char is encountered."""
272        self._blocker_char = char
273
274    def readable(self):
275        return True
276
277    def seekable(self):
278        return True
279
280    def seek(self, pos, whence=0):
281        # naive implementation, enough for tests
282        return 0
283
284    def writable(self):
285        return True
286
287    def write(self, b):
288        b = bytes(b)
289        n = -1
290        if self._blocker_char:
291            try:
292                n = b.index(self._blocker_char)
293            except ValueError:
294                pass
295            else:
296                if n > 0:
297                    # write data up to the first blocker
298                    self._write_stack.append(b[:n])
299                    return n
300                else:
301                    # cancel blocker and indicate would block
302                    self._blocker_char = None
303                    return None
304        self._write_stack.append(b)
305        return len(b)
306
307class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
308    BlockingIOError = io.BlockingIOError
309
310class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
311    BlockingIOError = pyio.BlockingIOError
312
313
314class IOTest(unittest.TestCase):
315
316    def setUp(self):
317        support.unlink(support.TESTFN)
318
319    def tearDown(self):
320        support.unlink(support.TESTFN)
321
322    def write_ops(self, f):
323        self.assertEqual(f.write(b"blah."), 5)
324        f.truncate(0)
325        self.assertEqual(f.tell(), 5)
326        f.seek(0)
327
328        self.assertEqual(f.write(b"blah."), 5)
329        self.assertEqual(f.seek(0), 0)
330        self.assertEqual(f.write(b"Hello."), 6)
331        self.assertEqual(f.tell(), 6)
332        self.assertEqual(f.seek(-1, 1), 5)
333        self.assertEqual(f.tell(), 5)
334        buffer = bytearray(b" world\n\n\n")
335        self.assertEqual(f.write(buffer), 9)
336        buffer[:] = b"*" * 9  # Overwrite our copy of the data
337        self.assertEqual(f.seek(0), 0)
338        self.assertEqual(f.write(b"h"), 1)
339        self.assertEqual(f.seek(-1, 2), 13)
340        self.assertEqual(f.tell(), 13)
341
342        self.assertEqual(f.truncate(12), 12)
343        self.assertEqual(f.tell(), 13)
344        self.assertRaises(TypeError, f.seek, 0.0)
345
346    def read_ops(self, f, buffered=False):
347        data = f.read(5)
348        self.assertEqual(data, b"hello")
349        data = byteslike(data)
350        self.assertEqual(f.readinto(data), 5)
351        self.assertEqual(bytes(data), b" worl")
352        data = bytearray(5)
353        self.assertEqual(f.readinto(data), 2)
354        self.assertEqual(len(data), 5)
355        self.assertEqual(data[:2], b"d\n")
356        self.assertEqual(f.seek(0), 0)
357        self.assertEqual(f.read(20), b"hello world\n")
358        self.assertEqual(f.read(1), b"")
359        self.assertEqual(f.readinto(byteslike(b"x")), 0)
360        self.assertEqual(f.seek(-6, 2), 6)
361        self.assertEqual(f.read(5), b"world")
362        self.assertEqual(f.read(0), b"")
363        self.assertEqual(f.readinto(byteslike()), 0)
364        self.assertEqual(f.seek(-6, 1), 5)
365        self.assertEqual(f.read(5), b" worl")
366        self.assertEqual(f.tell(), 10)
367        self.assertRaises(TypeError, f.seek, 0.0)
368        if buffered:
369            f.seek(0)
370            self.assertEqual(f.read(), b"hello world\n")
371            f.seek(6)
372            self.assertEqual(f.read(), b"world\n")
373            self.assertEqual(f.read(), b"")
374            f.seek(0)
375            data = byteslike(5)
376            self.assertEqual(f.readinto1(data), 5)
377            self.assertEqual(bytes(data), b"hello")
378
379    LARGE = 2**31
380
381    def large_file_ops(self, f):
382        assert f.readable()
383        assert f.writable()
384        try:
385            self.assertEqual(f.seek(self.LARGE), self.LARGE)
386        except (OverflowError, ValueError):
387            self.skipTest("no largefile support")
388        self.assertEqual(f.tell(), self.LARGE)
389        self.assertEqual(f.write(b"xxx"), 3)
390        self.assertEqual(f.tell(), self.LARGE + 3)
391        self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
392        self.assertEqual(f.truncate(), self.LARGE + 2)
393        self.assertEqual(f.tell(), self.LARGE + 2)
394        self.assertEqual(f.seek(0, 2), self.LARGE + 2)
395        self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
396        self.assertEqual(f.tell(), self.LARGE + 2)
397        self.assertEqual(f.seek(0, 2), self.LARGE + 1)
398        self.assertEqual(f.seek(-1, 2), self.LARGE)
399        self.assertEqual(f.read(2), b"x")
400
401    def test_invalid_operations(self):
402        # Try writing on a file opened in read mode and vice-versa.
403        exc = self.UnsupportedOperation
404        for mode in ("w", "wb"):
405            with self.open(support.TESTFN, mode) as fp:
406                self.assertRaises(exc, fp.read)
407                self.assertRaises(exc, fp.readline)
408        with self.open(support.TESTFN, "wb", buffering=0) as fp:
409            self.assertRaises(exc, fp.read)
410            self.assertRaises(exc, fp.readline)
411        with self.open(support.TESTFN, "rb", buffering=0) as fp:
412            self.assertRaises(exc, fp.write, b"blah")
413            self.assertRaises(exc, fp.writelines, [b"blah\n"])
414        with self.open(support.TESTFN, "rb") as fp:
415            self.assertRaises(exc, fp.write, b"blah")
416            self.assertRaises(exc, fp.writelines, [b"blah\n"])
417        with self.open(support.TESTFN, "r") as fp:
418            self.assertRaises(exc, fp.write, "blah")
419            self.assertRaises(exc, fp.writelines, ["blah\n"])
420            # Non-zero seeking from current or end pos
421            self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
422            self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
423
424    def test_optional_abilities(self):
425        # Test for OSError when optional APIs are not supported
426        # The purpose of this test is to try fileno(), reading, writing and
427        # seeking operations with various objects that indicate they do not
428        # support these operations.
429
430        def pipe_reader():
431            [r, w] = os.pipe()
432            os.close(w)  # So that read() is harmless
433            return self.FileIO(r, "r")
434
435        def pipe_writer():
436            [r, w] = os.pipe()
437            self.addCleanup(os.close, r)
438            # Guarantee that we can write into the pipe without blocking
439            thread = threading.Thread(target=os.read, args=(r, 100))
440            thread.start()
441            self.addCleanup(thread.join)
442            return self.FileIO(w, "w")
443
444        def buffered_reader():
445            return self.BufferedReader(self.MockUnseekableIO())
446
447        def buffered_writer():
448            return self.BufferedWriter(self.MockUnseekableIO())
449
450        def buffered_random():
451            return self.BufferedRandom(self.BytesIO())
452
453        def buffered_rw_pair():
454            return self.BufferedRWPair(self.MockUnseekableIO(),
455                self.MockUnseekableIO())
456
457        def text_reader():
458            class UnseekableReader(self.MockUnseekableIO):
459                writable = self.BufferedIOBase.writable
460                write = self.BufferedIOBase.write
461            return self.TextIOWrapper(UnseekableReader(), "ascii")
462
463        def text_writer():
464            class UnseekableWriter(self.MockUnseekableIO):
465                readable = self.BufferedIOBase.readable
466                read = self.BufferedIOBase.read
467            return self.TextIOWrapper(UnseekableWriter(), "ascii")
468
469        tests = (
470            (pipe_reader, "fr"), (pipe_writer, "fw"),
471            (buffered_reader, "r"), (buffered_writer, "w"),
472            (buffered_random, "rws"), (buffered_rw_pair, "rw"),
473            (text_reader, "r"), (text_writer, "w"),
474            (self.BytesIO, "rws"), (self.StringIO, "rws"),
475        )
476        for [test, abilities] in tests:
477            with self.subTest(test), test() as obj:
478                readable = "r" in abilities
479                self.assertEqual(obj.readable(), readable)
480                writable = "w" in abilities
481                self.assertEqual(obj.writable(), writable)
482
483                if isinstance(obj, self.TextIOBase):
484                    data = "3"
485                elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
486                    data = b"3"
487                else:
488                    self.fail("Unknown base class")
489
490                if "f" in abilities:
491                    obj.fileno()
492                else:
493                    self.assertRaises(OSError, obj.fileno)
494
495                if readable:
496                    obj.read(1)
497                    obj.read()
498                else:
499                    self.assertRaises(OSError, obj.read, 1)
500                    self.assertRaises(OSError, obj.read)
501
502                if writable:
503                    obj.write(data)
504                else:
505                    self.assertRaises(OSError, obj.write, data)
506
507                if sys.platform.startswith("win") and test in (
508                        pipe_reader, pipe_writer):
509                    # Pipes seem to appear as seekable on Windows
510                    continue
511                seekable = "s" in abilities
512                self.assertEqual(obj.seekable(), seekable)
513
514                if seekable:
515                    obj.tell()
516                    obj.seek(0)
517                else:
518                    self.assertRaises(OSError, obj.tell)
519                    self.assertRaises(OSError, obj.seek, 0)
520
521                if writable and seekable:
522                    obj.truncate()
523                    obj.truncate(0)
524                else:
525                    self.assertRaises(OSError, obj.truncate)
526                    self.assertRaises(OSError, obj.truncate, 0)
527
528    def test_open_handles_NUL_chars(self):
529        fn_with_NUL = 'foo\0bar'
530        self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
531
532        bytes_fn = bytes(fn_with_NUL, 'ascii')
533        with warnings.catch_warnings():
534            warnings.simplefilter("ignore", DeprecationWarning)
535            self.assertRaises(ValueError, self.open, bytes_fn, 'w')
536
537    def test_raw_file_io(self):
538        with self.open(support.TESTFN, "wb", buffering=0) as f:
539            self.assertEqual(f.readable(), False)
540            self.assertEqual(f.writable(), True)
541            self.assertEqual(f.seekable(), True)
542            self.write_ops(f)
543        with self.open(support.TESTFN, "rb", buffering=0) as f:
544            self.assertEqual(f.readable(), True)
545            self.assertEqual(f.writable(), False)
546            self.assertEqual(f.seekable(), True)
547            self.read_ops(f)
548
549    def test_buffered_file_io(self):
550        with self.open(support.TESTFN, "wb") as f:
551            self.assertEqual(f.readable(), False)
552            self.assertEqual(f.writable(), True)
553            self.assertEqual(f.seekable(), True)
554            self.write_ops(f)
555        with self.open(support.TESTFN, "rb") as f:
556            self.assertEqual(f.readable(), True)
557            self.assertEqual(f.writable(), False)
558            self.assertEqual(f.seekable(), True)
559            self.read_ops(f, True)
560
561    def test_readline(self):
562        with self.open(support.TESTFN, "wb") as f:
563            f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
564        with self.open(support.TESTFN, "rb") as f:
565            self.assertEqual(f.readline(), b"abc\n")
566            self.assertEqual(f.readline(10), b"def\n")
567            self.assertEqual(f.readline(2), b"xy")
568            self.assertEqual(f.readline(4), b"zzy\n")
569            self.assertEqual(f.readline(), b"foo\x00bar\n")
570            self.assertEqual(f.readline(None), b"another line")
571            self.assertRaises(TypeError, f.readline, 5.3)
572        with self.open(support.TESTFN, "r") as f:
573            self.assertRaises(TypeError, f.readline, 5.3)
574
575    def test_readline_nonsizeable(self):
576        # Issue #30061
577        # Crash when readline() returns an object without __len__
578        class R(self.IOBase):
579            def readline(self):
580                return None
581        self.assertRaises((TypeError, StopIteration), next, R())
582
583    def test_next_nonsizeable(self):
584        # Issue #30061
585        # Crash when __next__() returns an object without __len__
586        class R(self.IOBase):
587            def __next__(self):
588                return None
589        self.assertRaises(TypeError, R().readlines, 1)
590
591    def test_raw_bytes_io(self):
592        f = self.BytesIO()
593        self.write_ops(f)
594        data = f.getvalue()
595        self.assertEqual(data, b"hello world\n")
596        f = self.BytesIO(data)
597        self.read_ops(f, True)
598
599    def test_large_file_ops(self):
600        # On Windows and Mac OSX this test consumes large resources; It takes
601        # a long time to build the >2 GiB file and takes >2 GiB of disk space
602        # therefore the resource must be enabled to run this test.
603        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
604            support.requires(
605                'largefile',
606                'test requires %s bytes and a long time to run' % self.LARGE)
607        with self.open(support.TESTFN, "w+b", 0) as f:
608            self.large_file_ops(f)
609        with self.open(support.TESTFN, "w+b") as f:
610            self.large_file_ops(f)
611
612    def test_with_open(self):
613        for bufsize in (0, 100):
614            f = None
615            with self.open(support.TESTFN, "wb", bufsize) as f:
616                f.write(b"xxx")
617            self.assertEqual(f.closed, True)
618            f = None
619            try:
620                with self.open(support.TESTFN, "wb", bufsize) as f:
621                    1/0
622            except ZeroDivisionError:
623                self.assertEqual(f.closed, True)
624            else:
625                self.fail("1/0 didn't raise an exception")
626
627    # issue 5008
628    def test_append_mode_tell(self):
629        with self.open(support.TESTFN, "wb") as f:
630            f.write(b"xxx")
631        with self.open(support.TESTFN, "ab", buffering=0) as f:
632            self.assertEqual(f.tell(), 3)
633        with self.open(support.TESTFN, "ab") as f:
634            self.assertEqual(f.tell(), 3)
635        with self.open(support.TESTFN, "a") as f:
636            self.assertGreater(f.tell(), 0)
637
638    def test_destructor(self):
639        record = []
640        class MyFileIO(self.FileIO):
641            def __del__(self):
642                record.append(1)
643                try:
644                    f = super().__del__
645                except AttributeError:
646                    pass
647                else:
648                    f()
649            def close(self):
650                record.append(2)
651                super().close()
652            def flush(self):
653                record.append(3)
654                super().flush()
655        with support.check_warnings(('', ResourceWarning)):
656            f = MyFileIO(support.TESTFN, "wb")
657            f.write(b"xxx")
658            del f
659            support.gc_collect()
660            self.assertEqual(record, [1, 2, 3])
661            with self.open(support.TESTFN, "rb") as f:
662                self.assertEqual(f.read(), b"xxx")
663
664    def _check_base_destructor(self, base):
665        record = []
666        class MyIO(base):
667            def __init__(self):
668                # This exercises the availability of attributes on object
669                # destruction.
670                # (in the C version, close() is called by the tp_dealloc
671                # function, not by __del__)
672                self.on_del = 1
673                self.on_close = 2
674                self.on_flush = 3
675            def __del__(self):
676                record.append(self.on_del)
677                try:
678                    f = super().__del__
679                except AttributeError:
680                    pass
681                else:
682                    f()
683            def close(self):
684                record.append(self.on_close)
685                super().close()
686            def flush(self):
687                record.append(self.on_flush)
688                super().flush()
689        f = MyIO()
690        del f
691        support.gc_collect()
692        self.assertEqual(record, [1, 2, 3])
693
694    def test_IOBase_destructor(self):
695        self._check_base_destructor(self.IOBase)
696
697    def test_RawIOBase_destructor(self):
698        self._check_base_destructor(self.RawIOBase)
699
700    def test_BufferedIOBase_destructor(self):
701        self._check_base_destructor(self.BufferedIOBase)
702
703    def test_TextIOBase_destructor(self):
704        self._check_base_destructor(self.TextIOBase)
705
706    def test_close_flushes(self):
707        with self.open(support.TESTFN, "wb") as f:
708            f.write(b"xxx")
709        with self.open(support.TESTFN, "rb") as f:
710            self.assertEqual(f.read(), b"xxx")
711
712    def test_array_writes(self):
713        a = array.array('i', range(10))
714        n = len(a.tobytes())
715        def check(f):
716            with f:
717                self.assertEqual(f.write(a), n)
718                f.writelines((a,))
719        check(self.BytesIO())
720        check(self.FileIO(support.TESTFN, "w"))
721        check(self.BufferedWriter(self.MockRawIO()))
722        check(self.BufferedRandom(self.MockRawIO()))
723        check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
724
725    def test_closefd(self):
726        self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
727                          closefd=False)
728
729    def test_read_closed(self):
730        with self.open(support.TESTFN, "w") as f:
731            f.write("egg\n")
732        with self.open(support.TESTFN, "r") as f:
733            file = self.open(f.fileno(), "r", closefd=False)
734            self.assertEqual(file.read(), "egg\n")
735            file.seek(0)
736            file.close()
737            self.assertRaises(ValueError, file.read)
738        with self.open(support.TESTFN, "rb") as f:
739            file = self.open(f.fileno(), "rb", closefd=False)
740            self.assertEqual(file.read()[:3], b"egg")
741            file.close()
742            self.assertRaises(ValueError, file.readinto, bytearray(1))
743
744    def test_no_closefd_with_filename(self):
745        # can't use closefd in combination with a file name
746        self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
747
748    def test_closefd_attr(self):
749        with self.open(support.TESTFN, "wb") as f:
750            f.write(b"egg\n")
751        with self.open(support.TESTFN, "r") as f:
752            self.assertEqual(f.buffer.raw.closefd, True)
753            file = self.open(f.fileno(), "r", closefd=False)
754            self.assertEqual(file.buffer.raw.closefd, False)
755
756    def test_garbage_collection(self):
757        # FileIO objects are collected, and collecting them flushes
758        # all data to disk.
759        with support.check_warnings(('', ResourceWarning)):
760            f = self.FileIO(support.TESTFN, "wb")
761            f.write(b"abcxxx")
762            f.f = f
763            wr = weakref.ref(f)
764            del f
765            support.gc_collect()
766        self.assertIsNone(wr(), wr)
767        with self.open(support.TESTFN, "rb") as f:
768            self.assertEqual(f.read(), b"abcxxx")
769
770    def test_unbounded_file(self):
771        # Issue #1174606: reading from an unbounded stream such as /dev/zero.
772        zero = "/dev/zero"
773        if not os.path.exists(zero):
774            self.skipTest("{0} does not exist".format(zero))
775        if sys.maxsize > 0x7FFFFFFF:
776            self.skipTest("test can only run in a 32-bit address space")
777        if support.real_max_memuse < support._2G:
778            self.skipTest("test requires at least 2 GiB of memory")
779        with self.open(zero, "rb", buffering=0) as f:
780            self.assertRaises(OverflowError, f.read)
781        with self.open(zero, "rb") as f:
782            self.assertRaises(OverflowError, f.read)
783        with self.open(zero, "r") as f:
784            self.assertRaises(OverflowError, f.read)
785
786    def check_flush_error_on_close(self, *args, **kwargs):
787        # Test that the file is closed despite failed flush
788        # and that flush() is called before file closed.
789        f = self.open(*args, **kwargs)
790        closed = []
791        def bad_flush():
792            closed[:] = [f.closed]
793            raise OSError()
794        f.flush = bad_flush
795        self.assertRaises(OSError, f.close) # exception not swallowed
796        self.assertTrue(f.closed)
797        self.assertTrue(closed)      # flush() called
798        self.assertFalse(closed[0])  # flush() called before file closed
799        f.flush = lambda: None  # break reference loop
800
801    def test_flush_error_on_close(self):
802        # raw file
803        # Issue #5700: io.FileIO calls flush() after file closed
804        self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
805        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
806        self.check_flush_error_on_close(fd, 'wb', buffering=0)
807        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
808        self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
809        os.close(fd)
810        # buffered io
811        self.check_flush_error_on_close(support.TESTFN, 'wb')
812        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
813        self.check_flush_error_on_close(fd, 'wb')
814        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
815        self.check_flush_error_on_close(fd, 'wb', closefd=False)
816        os.close(fd)
817        # text io
818        self.check_flush_error_on_close(support.TESTFN, 'w')
819        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
820        self.check_flush_error_on_close(fd, 'w')
821        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
822        self.check_flush_error_on_close(fd, 'w', closefd=False)
823        os.close(fd)
824
825    def test_multi_close(self):
826        f = self.open(support.TESTFN, "wb", buffering=0)
827        f.close()
828        f.close()
829        f.close()
830        self.assertRaises(ValueError, f.flush)
831
832    def test_RawIOBase_read(self):
833        # Exercise the default limited RawIOBase.read(n) implementation (which
834        # calls readinto() internally).
835        rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
836        self.assertEqual(rawio.read(2), b"ab")
837        self.assertEqual(rawio.read(2), b"c")
838        self.assertEqual(rawio.read(2), b"d")
839        self.assertEqual(rawio.read(2), None)
840        self.assertEqual(rawio.read(2), b"ef")
841        self.assertEqual(rawio.read(2), b"g")
842        self.assertEqual(rawio.read(2), None)
843        self.assertEqual(rawio.read(2), b"")
844
845    def test_types_have_dict(self):
846        test = (
847            self.IOBase(),
848            self.RawIOBase(),
849            self.TextIOBase(),
850            self.StringIO(),
851            self.BytesIO()
852        )
853        for obj in test:
854            self.assertTrue(hasattr(obj, "__dict__"))
855
856    def test_opener(self):
857        with self.open(support.TESTFN, "w") as f:
858            f.write("egg\n")
859        fd = os.open(support.TESTFN, os.O_RDONLY)
860        def opener(path, flags):
861            return fd
862        with self.open("non-existent", "r", opener=opener) as f:
863            self.assertEqual(f.read(), "egg\n")
864
865    def test_bad_opener_negative_1(self):
866        # Issue #27066.
867        def badopener(fname, flags):
868            return -1
869        with self.assertRaises(ValueError) as cm:
870            open('non-existent', 'r', opener=badopener)
871        self.assertEqual(str(cm.exception), 'opener returned -1')
872
873    def test_bad_opener_other_negative(self):
874        # Issue #27066.
875        def badopener(fname, flags):
876            return -2
877        with self.assertRaises(ValueError) as cm:
878            open('non-existent', 'r', opener=badopener)
879        self.assertEqual(str(cm.exception), 'opener returned -2')
880
881    def test_fileio_closefd(self):
882        # Issue #4841
883        with self.open(__file__, 'rb') as f1, \
884             self.open(__file__, 'rb') as f2:
885            fileio = self.FileIO(f1.fileno(), closefd=False)
886            # .__init__() must not close f1
887            fileio.__init__(f2.fileno(), closefd=False)
888            f1.readline()
889            # .close() must not close f2
890            fileio.close()
891            f2.readline()
892
893    def test_nonbuffered_textio(self):
894        with support.check_no_resource_warning(self):
895            with self.assertRaises(ValueError):
896                self.open(support.TESTFN, 'w', buffering=0)
897
898    def test_invalid_newline(self):
899        with support.check_no_resource_warning(self):
900            with self.assertRaises(ValueError):
901                self.open(support.TESTFN, 'w', newline='invalid')
902
903    def test_buffered_readinto_mixin(self):
904        # Test the implementation provided by BufferedIOBase
905        class Stream(self.BufferedIOBase):
906            def read(self, size):
907                return b"12345"
908            read1 = read
909        stream = Stream()
910        for method in ("readinto", "readinto1"):
911            with self.subTest(method):
912                buffer = byteslike(5)
913                self.assertEqual(getattr(stream, method)(buffer), 5)
914                self.assertEqual(bytes(buffer), b"12345")
915
916    def test_fspath_support(self):
917        def check_path_succeeds(path):
918            with self.open(path, "w") as f:
919                f.write("egg\n")
920
921            with self.open(path, "r") as f:
922                self.assertEqual(f.read(), "egg\n")
923
924        check_path_succeeds(FakePath(support.TESTFN))
925        check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
926
927        with self.open(support.TESTFN, "w") as f:
928            bad_path = FakePath(f.fileno())
929            with self.assertRaises(TypeError):
930                self.open(bad_path, 'w')
931
932        bad_path = FakePath(None)
933        with self.assertRaises(TypeError):
934            self.open(bad_path, 'w')
935
936        bad_path = FakePath(FloatingPointError)
937        with self.assertRaises(FloatingPointError):
938            self.open(bad_path, 'w')
939
940        # ensure that refcounting is correct with some error conditions
941        with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
942            self.open(FakePath(support.TESTFN), 'rwxa')
943
944    def test_RawIOBase_readall(self):
945        # Exercise the default unlimited RawIOBase.read() and readall()
946        # implementations.
947        rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
948        self.assertEqual(rawio.read(), b"abcdefg")
949        rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
950        self.assertEqual(rawio.readall(), b"abcdefg")
951
952    def test_BufferedIOBase_readinto(self):
953        # Exercise the default BufferedIOBase.readinto() and readinto1()
954        # implementations (which call read() or read1() internally).
955        class Reader(self.BufferedIOBase):
956            def __init__(self, avail):
957                self.avail = avail
958            def read(self, size):
959                result = self.avail[:size]
960                self.avail = self.avail[size:]
961                return result
962            def read1(self, size):
963                """Returns no more than 5 bytes at once"""
964                return self.read(min(size, 5))
965        tests = (
966            # (test method, total data available, read buffer size, expected
967            #     read size)
968            ("readinto", 10, 5, 5),
969            ("readinto", 10, 6, 6),  # More than read1() can return
970            ("readinto", 5, 6, 5),  # Buffer larger than total available
971            ("readinto", 6, 7, 6),
972            ("readinto", 10, 0, 0),  # Empty buffer
973            ("readinto1", 10, 5, 5),  # Result limited to single read1() call
974            ("readinto1", 10, 6, 5),  # Buffer larger than read1() can return
975            ("readinto1", 5, 6, 5),  # Buffer larger than total available
976            ("readinto1", 6, 7, 5),
977            ("readinto1", 10, 0, 0),  # Empty buffer
978        )
979        UNUSED_BYTE = 0x81
980        for test in tests:
981            with self.subTest(test):
982                method, avail, request, result = test
983                reader = Reader(bytes(range(avail)))
984                buffer = bytearray((UNUSED_BYTE,) * request)
985                method = getattr(reader, method)
986                self.assertEqual(method(buffer), result)
987                self.assertEqual(len(buffer), request)
988                self.assertSequenceEqual(buffer[:result], range(result))
989                unused = (UNUSED_BYTE,) * (request - result)
990                self.assertSequenceEqual(buffer[result:], unused)
991                self.assertEqual(len(reader.avail), avail - result)
992
993    def test_close_assert(self):
994        class R(self.IOBase):
995            def __setattr__(self, name, value):
996                pass
997            def flush(self):
998                raise OSError()
999        f = R()
1000        # This would cause an assertion failure.
1001        self.assertRaises(OSError, f.close)
1002
1003        # Silence destructor error
1004        R.flush = lambda self: None
1005
1006
1007class CIOTest(IOTest):
1008
1009    def test_IOBase_finalize(self):
1010        # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1011        # class which inherits IOBase and an object of this class are caught
1012        # in a reference cycle and close() is already in the method cache.
1013        class MyIO(self.IOBase):
1014            def close(self):
1015                pass
1016
1017        # create an instance to populate the method cache
1018        MyIO()
1019        obj = MyIO()
1020        obj.obj = obj
1021        wr = weakref.ref(obj)
1022        del MyIO
1023        del obj
1024        support.gc_collect()
1025        self.assertIsNone(wr(), wr)
1026
1027class PyIOTest(IOTest):
1028    pass
1029
1030
1031@support.cpython_only
1032class APIMismatchTest(unittest.TestCase):
1033
1034    def test_RawIOBase_io_in_pyio_match(self):
1035        """Test that pyio RawIOBase class has all c RawIOBase methods"""
1036        mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1037                                               ignore=('__weakref__',))
1038        self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1039
1040    def test_RawIOBase_pyio_in_io_match(self):
1041        """Test that c RawIOBase class has all pyio RawIOBase methods"""
1042        mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1043        self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1044
1045
1046class CommonBufferedTests:
1047    # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1048
1049    def test_detach(self):
1050        raw = self.MockRawIO()
1051        buf = self.tp(raw)
1052        self.assertIs(buf.detach(), raw)
1053        self.assertRaises(ValueError, buf.detach)
1054
1055        repr(buf)  # Should still work
1056
1057    def test_fileno(self):
1058        rawio = self.MockRawIO()
1059        bufio = self.tp(rawio)
1060
1061        self.assertEqual(42, bufio.fileno())
1062
1063    def test_invalid_args(self):
1064        rawio = self.MockRawIO()
1065        bufio = self.tp(rawio)
1066        # Invalid whence
1067        self.assertRaises(ValueError, bufio.seek, 0, -1)
1068        self.assertRaises(ValueError, bufio.seek, 0, 9)
1069
1070    def test_override_destructor(self):
1071        tp = self.tp
1072        record = []
1073        class MyBufferedIO(tp):
1074            def __del__(self):
1075                record.append(1)
1076                try:
1077                    f = super().__del__
1078                except AttributeError:
1079                    pass
1080                else:
1081                    f()
1082            def close(self):
1083                record.append(2)
1084                super().close()
1085            def flush(self):
1086                record.append(3)
1087                super().flush()
1088        rawio = self.MockRawIO()
1089        bufio = MyBufferedIO(rawio)
1090        del bufio
1091        support.gc_collect()
1092        self.assertEqual(record, [1, 2, 3])
1093
1094    def test_context_manager(self):
1095        # Test usability as a context manager
1096        rawio = self.MockRawIO()
1097        bufio = self.tp(rawio)
1098        def _with():
1099            with bufio:
1100                pass
1101        _with()
1102        # bufio should now be closed, and using it a second time should raise
1103        # a ValueError.
1104        self.assertRaises(ValueError, _with)
1105
1106    def test_error_through_destructor(self):
1107        # Test that the exception state is not modified by a destructor,
1108        # even if close() fails.
1109        rawio = self.CloseFailureIO()
1110        with support.catch_unraisable_exception() as cm:
1111            with self.assertRaises(AttributeError):
1112                self.tp(rawio).xyzzy
1113
1114            if not IOBASE_EMITS_UNRAISABLE:
1115                self.assertIsNone(cm.unraisable)
1116            elif cm.unraisable is not None:
1117                self.assertEqual(cm.unraisable.exc_type, OSError)
1118
1119    def test_repr(self):
1120        raw = self.MockRawIO()
1121        b = self.tp(raw)
1122        clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1123        self.assertRegex(repr(b), "<%s>" % clsname)
1124        raw.name = "dummy"
1125        self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
1126        raw.name = b"dummy"
1127        self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
1128
1129    def test_recursive_repr(self):
1130        # Issue #25455
1131        raw = self.MockRawIO()
1132        b = self.tp(raw)
1133        with support.swap_attr(raw, 'name', b):
1134            try:
1135                repr(b)  # Should not crash
1136            except RuntimeError:
1137                pass
1138
1139    def test_flush_error_on_close(self):
1140        # Test that buffered file is closed despite failed flush
1141        # and that flush() is called before file closed.
1142        raw = self.MockRawIO()
1143        closed = []
1144        def bad_flush():
1145            closed[:] = [b.closed, raw.closed]
1146            raise OSError()
1147        raw.flush = bad_flush
1148        b = self.tp(raw)
1149        self.assertRaises(OSError, b.close) # exception not swallowed
1150        self.assertTrue(b.closed)
1151        self.assertTrue(raw.closed)
1152        self.assertTrue(closed)      # flush() called
1153        self.assertFalse(closed[0])  # flush() called before file closed
1154        self.assertFalse(closed[1])
1155        raw.flush = lambda: None  # break reference loop
1156
1157    def test_close_error_on_close(self):
1158        raw = self.MockRawIO()
1159        def bad_flush():
1160            raise OSError('flush')
1161        def bad_close():
1162            raise OSError('close')
1163        raw.close = bad_close
1164        b = self.tp(raw)
1165        b.flush = bad_flush
1166        with self.assertRaises(OSError) as err: # exception not swallowed
1167            b.close()
1168        self.assertEqual(err.exception.args, ('close',))
1169        self.assertIsInstance(err.exception.__context__, OSError)
1170        self.assertEqual(err.exception.__context__.args, ('flush',))
1171        self.assertFalse(b.closed)
1172
1173        # Silence destructor error
1174        raw.close = lambda: None
1175        b.flush = lambda: None
1176
1177    def test_nonnormalized_close_error_on_close(self):
1178        # Issue #21677
1179        raw = self.MockRawIO()
1180        def bad_flush():
1181            raise non_existing_flush
1182        def bad_close():
1183            raise non_existing_close
1184        raw.close = bad_close
1185        b = self.tp(raw)
1186        b.flush = bad_flush
1187        with self.assertRaises(NameError) as err: # exception not swallowed
1188            b.close()
1189        self.assertIn('non_existing_close', str(err.exception))
1190        self.assertIsInstance(err.exception.__context__, NameError)
1191        self.assertIn('non_existing_flush', str(err.exception.__context__))
1192        self.assertFalse(b.closed)
1193
1194        # Silence destructor error
1195        b.flush = lambda: None
1196        raw.close = lambda: None
1197
1198    def test_multi_close(self):
1199        raw = self.MockRawIO()
1200        b = self.tp(raw)
1201        b.close()
1202        b.close()
1203        b.close()
1204        self.assertRaises(ValueError, b.flush)
1205
1206    def test_unseekable(self):
1207        bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1208        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1209        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1210
1211    def test_readonly_attributes(self):
1212        raw = self.MockRawIO()
1213        buf = self.tp(raw)
1214        x = self.MockRawIO()
1215        with self.assertRaises(AttributeError):
1216            buf.raw = x
1217
1218
1219class SizeofTest:
1220
1221    @support.cpython_only
1222    def test_sizeof(self):
1223        bufsize1 = 4096
1224        bufsize2 = 8192
1225        rawio = self.MockRawIO()
1226        bufio = self.tp(rawio, buffer_size=bufsize1)
1227        size = sys.getsizeof(bufio) - bufsize1
1228        rawio = self.MockRawIO()
1229        bufio = self.tp(rawio, buffer_size=bufsize2)
1230        self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1231
1232    @support.cpython_only
1233    def test_buffer_freeing(self) :
1234        bufsize = 4096
1235        rawio = self.MockRawIO()
1236        bufio = self.tp(rawio, buffer_size=bufsize)
1237        size = sys.getsizeof(bufio) - bufsize
1238        bufio.close()
1239        self.assertEqual(sys.getsizeof(bufio), size)
1240
1241class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1242    read_mode = "rb"
1243
1244    def test_constructor(self):
1245        rawio = self.MockRawIO([b"abc"])
1246        bufio = self.tp(rawio)
1247        bufio.__init__(rawio)
1248        bufio.__init__(rawio, buffer_size=1024)
1249        bufio.__init__(rawio, buffer_size=16)
1250        self.assertEqual(b"abc", bufio.read())
1251        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1252        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1253        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1254        rawio = self.MockRawIO([b"abc"])
1255        bufio.__init__(rawio)
1256        self.assertEqual(b"abc", bufio.read())
1257
1258    def test_uninitialized(self):
1259        bufio = self.tp.__new__(self.tp)
1260        del bufio
1261        bufio = self.tp.__new__(self.tp)
1262        self.assertRaisesRegex((ValueError, AttributeError),
1263                               'uninitialized|has no attribute',
1264                               bufio.read, 0)
1265        bufio.__init__(self.MockRawIO())
1266        self.assertEqual(bufio.read(0), b'')
1267
1268    def test_read(self):
1269        for arg in (None, 7):
1270            rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1271            bufio = self.tp(rawio)
1272            self.assertEqual(b"abcdefg", bufio.read(arg))
1273        # Invalid args
1274        self.assertRaises(ValueError, bufio.read, -2)
1275
1276    def test_read1(self):
1277        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1278        bufio = self.tp(rawio)
1279        self.assertEqual(b"a", bufio.read(1))
1280        self.assertEqual(b"b", bufio.read1(1))
1281        self.assertEqual(rawio._reads, 1)
1282        self.assertEqual(b"", bufio.read1(0))
1283        self.assertEqual(b"c", bufio.read1(100))
1284        self.assertEqual(rawio._reads, 1)
1285        self.assertEqual(b"d", bufio.read1(100))
1286        self.assertEqual(rawio._reads, 2)
1287        self.assertEqual(b"efg", bufio.read1(100))
1288        self.assertEqual(rawio._reads, 3)
1289        self.assertEqual(b"", bufio.read1(100))
1290        self.assertEqual(rawio._reads, 4)
1291
1292    def test_read1_arbitrary(self):
1293        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1294        bufio = self.tp(rawio)
1295        self.assertEqual(b"a", bufio.read(1))
1296        self.assertEqual(b"bc", bufio.read1())
1297        self.assertEqual(b"d", bufio.read1())
1298        self.assertEqual(b"efg", bufio.read1(-1))
1299        self.assertEqual(rawio._reads, 3)
1300        self.assertEqual(b"", bufio.read1())
1301        self.assertEqual(rawio._reads, 4)
1302
1303    def test_readinto(self):
1304        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1305        bufio = self.tp(rawio)
1306        b = bytearray(2)
1307        self.assertEqual(bufio.readinto(b), 2)
1308        self.assertEqual(b, b"ab")
1309        self.assertEqual(bufio.readinto(b), 2)
1310        self.assertEqual(b, b"cd")
1311        self.assertEqual(bufio.readinto(b), 2)
1312        self.assertEqual(b, b"ef")
1313        self.assertEqual(bufio.readinto(b), 1)
1314        self.assertEqual(b, b"gf")
1315        self.assertEqual(bufio.readinto(b), 0)
1316        self.assertEqual(b, b"gf")
1317        rawio = self.MockRawIO((b"abc", None))
1318        bufio = self.tp(rawio)
1319        self.assertEqual(bufio.readinto(b), 2)
1320        self.assertEqual(b, b"ab")
1321        self.assertEqual(bufio.readinto(b), 1)
1322        self.assertEqual(b, b"cb")
1323
1324    def test_readinto1(self):
1325        buffer_size = 10
1326        rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1327        bufio = self.tp(rawio, buffer_size=buffer_size)
1328        b = bytearray(2)
1329        self.assertEqual(bufio.peek(3), b'abc')
1330        self.assertEqual(rawio._reads, 1)
1331        self.assertEqual(bufio.readinto1(b), 2)
1332        self.assertEqual(b, b"ab")
1333        self.assertEqual(rawio._reads, 1)
1334        self.assertEqual(bufio.readinto1(b), 1)
1335        self.assertEqual(b[:1], b"c")
1336        self.assertEqual(rawio._reads, 1)
1337        self.assertEqual(bufio.readinto1(b), 2)
1338        self.assertEqual(b, b"de")
1339        self.assertEqual(rawio._reads, 2)
1340        b = bytearray(2*buffer_size)
1341        self.assertEqual(bufio.peek(3), b'fgh')
1342        self.assertEqual(rawio._reads, 3)
1343        self.assertEqual(bufio.readinto1(b), 6)
1344        self.assertEqual(b[:6], b"fghjkl")
1345        self.assertEqual(rawio._reads, 4)
1346
1347    def test_readinto_array(self):
1348        buffer_size = 60
1349        data = b"a" * 26
1350        rawio = self.MockRawIO((data,))
1351        bufio = self.tp(rawio, buffer_size=buffer_size)
1352
1353        # Create an array with element size > 1 byte
1354        b = array.array('i', b'x' * 32)
1355        assert len(b) != 16
1356
1357        # Read into it. We should get as many *bytes* as we can fit into b
1358        # (which is more than the number of elements)
1359        n = bufio.readinto(b)
1360        self.assertGreater(n, len(b))
1361
1362        # Check that old contents of b are preserved
1363        bm = memoryview(b).cast('B')
1364        self.assertLess(n, len(bm))
1365        self.assertEqual(bm[:n], data[:n])
1366        self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1367
1368    def test_readinto1_array(self):
1369        buffer_size = 60
1370        data = b"a" * 26
1371        rawio = self.MockRawIO((data,))
1372        bufio = self.tp(rawio, buffer_size=buffer_size)
1373
1374        # Create an array with element size > 1 byte
1375        b = array.array('i', b'x' * 32)
1376        assert len(b) != 16
1377
1378        # Read into it. We should get as many *bytes* as we can fit into b
1379        # (which is more than the number of elements)
1380        n = bufio.readinto1(b)
1381        self.assertGreater(n, len(b))
1382
1383        # Check that old contents of b are preserved
1384        bm = memoryview(b).cast('B')
1385        self.assertLess(n, len(bm))
1386        self.assertEqual(bm[:n], data[:n])
1387        self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1388
1389    def test_readlines(self):
1390        def bufio():
1391            rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1392            return self.tp(rawio)
1393        self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1394        self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1395        self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
1396
1397    def test_buffering(self):
1398        data = b"abcdefghi"
1399        dlen = len(data)
1400
1401        tests = [
1402            [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1403            [ 100, [ 3, 3, 3],     [ dlen ]    ],
1404            [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1405        ]
1406
1407        for bufsize, buf_read_sizes, raw_read_sizes in tests:
1408            rawio = self.MockFileIO(data)
1409            bufio = self.tp(rawio, buffer_size=bufsize)
1410            pos = 0
1411            for nbytes in buf_read_sizes:
1412                self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
1413                pos += nbytes
1414            # this is mildly implementation-dependent
1415            self.assertEqual(rawio.read_history, raw_read_sizes)
1416
1417    def test_read_non_blocking(self):
1418        # Inject some None's in there to simulate EWOULDBLOCK
1419        rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1420        bufio = self.tp(rawio)
1421        self.assertEqual(b"abcd", bufio.read(6))
1422        self.assertEqual(b"e", bufio.read(1))
1423        self.assertEqual(b"fg", bufio.read())
1424        self.assertEqual(b"", bufio.peek(1))
1425        self.assertIsNone(bufio.read())
1426        self.assertEqual(b"", bufio.read())
1427
1428        rawio = self.MockRawIO((b"a", None, None))
1429        self.assertEqual(b"a", rawio.readall())
1430        self.assertIsNone(rawio.readall())
1431
1432    def test_read_past_eof(self):
1433        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1434        bufio = self.tp(rawio)
1435
1436        self.assertEqual(b"abcdefg", bufio.read(9000))
1437
1438    def test_read_all(self):
1439        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1440        bufio = self.tp(rawio)
1441
1442        self.assertEqual(b"abcdefg", bufio.read())
1443
1444    @support.requires_resource('cpu')
1445    def test_threads(self):
1446        try:
1447            # Write out many bytes with exactly the same number of 0's,
1448            # 1's... 255's. This will help us check that concurrent reading
1449            # doesn't duplicate or forget contents.
1450            N = 1000
1451            l = list(range(256)) * N
1452            random.shuffle(l)
1453            s = bytes(bytearray(l))
1454            with self.open(support.TESTFN, "wb") as f:
1455                f.write(s)
1456            with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
1457                bufio = self.tp(raw, 8)
1458                errors = []
1459                results = []
1460                def f():
1461                    try:
1462                        # Intra-buffer read then buffer-flushing read
1463                        for n in cycle([1, 19]):
1464                            s = bufio.read(n)
1465                            if not s:
1466                                break
1467                            # list.append() is atomic
1468                            results.append(s)
1469                    except Exception as e:
1470                        errors.append(e)
1471                        raise
1472                threads = [threading.Thread(target=f) for x in range(20)]
1473                with support.start_threads(threads):
1474                    time.sleep(0.02) # yield
1475                self.assertFalse(errors,
1476                    "the following exceptions were caught: %r" % errors)
1477                s = b''.join(results)
1478                for i in range(256):
1479                    c = bytes(bytearray([i]))
1480                    self.assertEqual(s.count(c), N)
1481        finally:
1482            support.unlink(support.TESTFN)
1483
1484    def test_unseekable(self):
1485        bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1486        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1487        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1488        bufio.read(1)
1489        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1490        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1491
1492    def test_misbehaved_io(self):
1493        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1494        bufio = self.tp(rawio)
1495        self.assertRaises(OSError, bufio.seek, 0)
1496        self.assertRaises(OSError, bufio.tell)
1497
1498        # Silence destructor error
1499        bufio.close = lambda: None
1500
1501    def test_no_extraneous_read(self):
1502        # Issue #9550; when the raw IO object has satisfied the read request,
1503        # we should not issue any additional reads, otherwise it may block
1504        # (e.g. socket).
1505        bufsize = 16
1506        for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1507            rawio = self.MockRawIO([b"x" * n])
1508            bufio = self.tp(rawio, bufsize)
1509            self.assertEqual(bufio.read(n), b"x" * n)
1510            # Simple case: one raw read is enough to satisfy the request.
1511            self.assertEqual(rawio._extraneous_reads, 0,
1512                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1513            # A more complex case where two raw reads are needed to satisfy
1514            # the request.
1515            rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1516            bufio = self.tp(rawio, bufsize)
1517            self.assertEqual(bufio.read(n), b"x" * n)
1518            self.assertEqual(rawio._extraneous_reads, 0,
1519                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1520
1521    def test_read_on_closed(self):
1522        # Issue #23796
1523        b = io.BufferedReader(io.BytesIO(b"12"))
1524        b.read(1)
1525        b.close()
1526        self.assertRaises(ValueError, b.peek)
1527        self.assertRaises(ValueError, b.read1, 1)
1528
1529
1530class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
1531    tp = io.BufferedReader
1532
1533    @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1534                     "instead of returning NULL for malloc failure.")
1535    def test_constructor(self):
1536        BufferedReaderTest.test_constructor(self)
1537        # The allocation can succeed on 32-bit builds, e.g. with more
1538        # than 2 GiB RAM and a 64-bit kernel.
1539        if sys.maxsize > 0x7FFFFFFF:
1540            rawio = self.MockRawIO()
1541            bufio = self.tp(rawio)
1542            self.assertRaises((OverflowError, MemoryError, ValueError),
1543                bufio.__init__, rawio, sys.maxsize)
1544
1545    def test_initialization(self):
1546        rawio = self.MockRawIO([b"abc"])
1547        bufio = self.tp(rawio)
1548        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1549        self.assertRaises(ValueError, bufio.read)
1550        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1551        self.assertRaises(ValueError, bufio.read)
1552        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1553        self.assertRaises(ValueError, bufio.read)
1554
1555    def test_misbehaved_io_read(self):
1556        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1557        bufio = self.tp(rawio)
1558        # _pyio.BufferedReader seems to implement reading different, so that
1559        # checking this is not so easy.
1560        self.assertRaises(OSError, bufio.read, 10)
1561
1562    def test_garbage_collection(self):
1563        # C BufferedReader objects are collected.
1564        # The Python version has __del__, so it ends into gc.garbage instead
1565        self.addCleanup(support.unlink, support.TESTFN)
1566        with support.check_warnings(('', ResourceWarning)):
1567            rawio = self.FileIO(support.TESTFN, "w+b")
1568            f = self.tp(rawio)
1569            f.f = f
1570            wr = weakref.ref(f)
1571            del f
1572            support.gc_collect()
1573        self.assertIsNone(wr(), wr)
1574
1575    def test_args_error(self):
1576        # Issue #17275
1577        with self.assertRaisesRegex(TypeError, "BufferedReader"):
1578            self.tp(io.BytesIO(), 1024, 1024, 1024)
1579
1580
1581class PyBufferedReaderTest(BufferedReaderTest):
1582    tp = pyio.BufferedReader
1583
1584
1585class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1586    write_mode = "wb"
1587
1588    def test_constructor(self):
1589        rawio = self.MockRawIO()
1590        bufio = self.tp(rawio)
1591        bufio.__init__(rawio)
1592        bufio.__init__(rawio, buffer_size=1024)
1593        bufio.__init__(rawio, buffer_size=16)
1594        self.assertEqual(3, bufio.write(b"abc"))
1595        bufio.flush()
1596        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1597        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1598        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1599        bufio.__init__(rawio)
1600        self.assertEqual(3, bufio.write(b"ghi"))
1601        bufio.flush()
1602        self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
1603
1604    def test_uninitialized(self):
1605        bufio = self.tp.__new__(self.tp)
1606        del bufio
1607        bufio = self.tp.__new__(self.tp)
1608        self.assertRaisesRegex((ValueError, AttributeError),
1609                               'uninitialized|has no attribute',
1610                               bufio.write, b'')
1611        bufio.__init__(self.MockRawIO())
1612        self.assertEqual(bufio.write(b''), 0)
1613
1614    def test_detach_flush(self):
1615        raw = self.MockRawIO()
1616        buf = self.tp(raw)
1617        buf.write(b"howdy!")
1618        self.assertFalse(raw._write_stack)
1619        buf.detach()
1620        self.assertEqual(raw._write_stack, [b"howdy!"])
1621
1622    def test_write(self):
1623        # Write to the buffered IO but don't overflow the buffer.
1624        writer = self.MockRawIO()
1625        bufio = self.tp(writer, 8)
1626        bufio.write(b"abc")
1627        self.assertFalse(writer._write_stack)
1628        buffer = bytearray(b"def")
1629        bufio.write(buffer)
1630        buffer[:] = b"***"  # Overwrite our copy of the data
1631        bufio.flush()
1632        self.assertEqual(b"".join(writer._write_stack), b"abcdef")
1633
1634    def test_write_overflow(self):
1635        writer = self.MockRawIO()
1636        bufio = self.tp(writer, 8)
1637        contents = b"abcdefghijklmnop"
1638        for n in range(0, len(contents), 3):
1639            bufio.write(contents[n:n+3])
1640        flushed = b"".join(writer._write_stack)
1641        # At least (total - 8) bytes were implicitly flushed, perhaps more
1642        # depending on the implementation.
1643        self.assertTrue(flushed.startswith(contents[:-8]), flushed)
1644
1645    def check_writes(self, intermediate_func):
1646        # Lots of writes, test the flushed output is as expected.
1647        contents = bytes(range(256)) * 1000
1648        n = 0
1649        writer = self.MockRawIO()
1650        bufio = self.tp(writer, 13)
1651        # Generator of write sizes: repeat each N 15 times then proceed to N+1
1652        def gen_sizes():
1653            for size in count(1):
1654                for i in range(15):
1655                    yield size
1656        sizes = gen_sizes()
1657        while n < len(contents):
1658            size = min(next(sizes), len(contents) - n)
1659            self.assertEqual(bufio.write(contents[n:n+size]), size)
1660            intermediate_func(bufio)
1661            n += size
1662        bufio.flush()
1663        self.assertEqual(contents, b"".join(writer._write_stack))
1664
1665    def test_writes(self):
1666        self.check_writes(lambda bufio: None)
1667
1668    def test_writes_and_flushes(self):
1669        self.check_writes(lambda bufio: bufio.flush())
1670
1671    def test_writes_and_seeks(self):
1672        def _seekabs(bufio):
1673            pos = bufio.tell()
1674            bufio.seek(pos + 1, 0)
1675            bufio.seek(pos - 1, 0)
1676            bufio.seek(pos, 0)
1677        self.check_writes(_seekabs)
1678        def _seekrel(bufio):
1679            pos = bufio.seek(0, 1)
1680            bufio.seek(+1, 1)
1681            bufio.seek(-1, 1)
1682            bufio.seek(pos, 0)
1683        self.check_writes(_seekrel)
1684
1685    def test_writes_and_truncates(self):
1686        self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
1687
1688    def test_write_non_blocking(self):
1689        raw = self.MockNonBlockWriterIO()
1690        bufio = self.tp(raw, 8)
1691
1692        self.assertEqual(bufio.write(b"abcd"), 4)
1693        self.assertEqual(bufio.write(b"efghi"), 5)
1694        # 1 byte will be written, the rest will be buffered
1695        raw.block_on(b"k")
1696        self.assertEqual(bufio.write(b"jklmn"), 5)
1697
1698        # 8 bytes will be written, 8 will be buffered and the rest will be lost
1699        raw.block_on(b"0")
1700        try:
1701            bufio.write(b"opqrwxyz0123456789")
1702        except self.BlockingIOError as e:
1703            written = e.characters_written
1704        else:
1705            self.fail("BlockingIOError should have been raised")
1706        self.assertEqual(written, 16)
1707        self.assertEqual(raw.pop_written(),
1708            b"abcdefghijklmnopqrwxyz")
1709
1710        self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
1711        s = raw.pop_written()
1712        # Previously buffered bytes were flushed
1713        self.assertTrue(s.startswith(b"01234567A"), s)
1714
1715    def test_write_and_rewind(self):
1716        raw = io.BytesIO()
1717        bufio = self.tp(raw, 4)
1718        self.assertEqual(bufio.write(b"abcdef"), 6)
1719        self.assertEqual(bufio.tell(), 6)
1720        bufio.seek(0, 0)
1721        self.assertEqual(bufio.write(b"XY"), 2)
1722        bufio.seek(6, 0)
1723        self.assertEqual(raw.getvalue(), b"XYcdef")
1724        self.assertEqual(bufio.write(b"123456"), 6)
1725        bufio.flush()
1726        self.assertEqual(raw.getvalue(), b"XYcdef123456")
1727
1728    def test_flush(self):
1729        writer = self.MockRawIO()
1730        bufio = self.tp(writer, 8)
1731        bufio.write(b"abc")
1732        bufio.flush()
1733        self.assertEqual(b"abc", writer._write_stack[0])
1734
1735    def test_writelines(self):
1736        l = [b'ab', b'cd', b'ef']
1737        writer = self.MockRawIO()
1738        bufio = self.tp(writer, 8)
1739        bufio.writelines(l)
1740        bufio.flush()
1741        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1742
1743    def test_writelines_userlist(self):
1744        l = UserList([b'ab', b'cd', b'ef'])
1745        writer = self.MockRawIO()
1746        bufio = self.tp(writer, 8)
1747        bufio.writelines(l)
1748        bufio.flush()
1749        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1750
1751    def test_writelines_error(self):
1752        writer = self.MockRawIO()
1753        bufio = self.tp(writer, 8)
1754        self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1755        self.assertRaises(TypeError, bufio.writelines, None)
1756        self.assertRaises(TypeError, bufio.writelines, 'abc')
1757
1758    def test_destructor(self):
1759        writer = self.MockRawIO()
1760        bufio = self.tp(writer, 8)
1761        bufio.write(b"abc")
1762        del bufio
1763        support.gc_collect()
1764        self.assertEqual(b"abc", writer._write_stack[0])
1765
1766    def test_truncate(self):
1767        # Truncate implicitly flushes the buffer.
1768        self.addCleanup(support.unlink, support.TESTFN)
1769        with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1770            bufio = self.tp(raw, 8)
1771            bufio.write(b"abcdef")
1772            self.assertEqual(bufio.truncate(3), 3)
1773            self.assertEqual(bufio.tell(), 6)
1774        with self.open(support.TESTFN, "rb", buffering=0) as f:
1775            self.assertEqual(f.read(), b"abc")
1776
1777    def test_truncate_after_write(self):
1778        # Ensure that truncate preserves the file position after
1779        # writes longer than the buffer size.
1780        # Issue: https://bugs.python.org/issue32228
1781        self.addCleanup(support.unlink, support.TESTFN)
1782        with self.open(support.TESTFN, "wb") as f:
1783            # Fill with some buffer
1784            f.write(b'\x00' * 10000)
1785        buffer_sizes = [8192, 4096, 200]
1786        for buffer_size in buffer_sizes:
1787            with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1788                f.write(b'\x00' * (buffer_size + 1))
1789                # After write write_pos and write_end are set to 0
1790                f.read(1)
1791                # read operation makes sure that pos != raw_pos
1792                f.truncate()
1793                self.assertEqual(f.tell(), buffer_size + 2)
1794
1795    @support.requires_resource('cpu')
1796    def test_threads(self):
1797        try:
1798            # Write out many bytes from many threads and test they were
1799            # all flushed.
1800            N = 1000
1801            contents = bytes(range(256)) * N
1802            sizes = cycle([1, 19])
1803            n = 0
1804            queue = deque()
1805            while n < len(contents):
1806                size = next(sizes)
1807                queue.append(contents[n:n+size])
1808                n += size
1809            del contents
1810            # We use a real file object because it allows us to
1811            # exercise situations where the GIL is released before
1812            # writing the buffer to the raw streams. This is in addition
1813            # to concurrency issues due to switching threads in the middle
1814            # of Python code.
1815            with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1816                bufio = self.tp(raw, 8)
1817                errors = []
1818                def f():
1819                    try:
1820                        while True:
1821                            try:
1822                                s = queue.popleft()
1823                            except IndexError:
1824                                return
1825                            bufio.write(s)
1826                    except Exception as e:
1827                        errors.append(e)
1828                        raise
1829                threads = [threading.Thread(target=f) for x in range(20)]
1830                with support.start_threads(threads):
1831                    time.sleep(0.02) # yield
1832                self.assertFalse(errors,
1833                    "the following exceptions were caught: %r" % errors)
1834                bufio.close()
1835            with self.open(support.TESTFN, "rb") as f:
1836                s = f.read()
1837            for i in range(256):
1838                self.assertEqual(s.count(bytes([i])), N)
1839        finally:
1840            support.unlink(support.TESTFN)
1841
1842    def test_misbehaved_io(self):
1843        rawio = self.MisbehavedRawIO()
1844        bufio = self.tp(rawio, 5)
1845        self.assertRaises(OSError, bufio.seek, 0)
1846        self.assertRaises(OSError, bufio.tell)
1847        self.assertRaises(OSError, bufio.write, b"abcdef")
1848
1849        # Silence destructor error
1850        bufio.close = lambda: None
1851
1852    def test_max_buffer_size_removal(self):
1853        with self.assertRaises(TypeError):
1854            self.tp(self.MockRawIO(), 8, 12)
1855
1856    def test_write_error_on_close(self):
1857        raw = self.MockRawIO()
1858        def bad_write(b):
1859            raise OSError()
1860        raw.write = bad_write
1861        b = self.tp(raw)
1862        b.write(b'spam')
1863        self.assertRaises(OSError, b.close) # exception not swallowed
1864        self.assertTrue(b.closed)
1865
1866    def test_slow_close_from_thread(self):
1867        # Issue #31976
1868        rawio = self.SlowFlushRawIO()
1869        bufio = self.tp(rawio, 8)
1870        t = threading.Thread(target=bufio.close)
1871        t.start()
1872        rawio.in_flush.wait()
1873        self.assertRaises(ValueError, bufio.write, b'spam')
1874        self.assertTrue(bufio.closed)
1875        t.join()
1876
1877
1878
1879class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
1880    tp = io.BufferedWriter
1881
1882    @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1883                     "instead of returning NULL for malloc failure.")
1884    def test_constructor(self):
1885        BufferedWriterTest.test_constructor(self)
1886        # The allocation can succeed on 32-bit builds, e.g. with more
1887        # than 2 GiB RAM and a 64-bit kernel.
1888        if sys.maxsize > 0x7FFFFFFF:
1889            rawio = self.MockRawIO()
1890            bufio = self.tp(rawio)
1891            self.assertRaises((OverflowError, MemoryError, ValueError),
1892                bufio.__init__, rawio, sys.maxsize)
1893
1894    def test_initialization(self):
1895        rawio = self.MockRawIO()
1896        bufio = self.tp(rawio)
1897        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1898        self.assertRaises(ValueError, bufio.write, b"def")
1899        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1900        self.assertRaises(ValueError, bufio.write, b"def")
1901        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1902        self.assertRaises(ValueError, bufio.write, b"def")
1903
1904    def test_garbage_collection(self):
1905        # C BufferedWriter objects are collected, and collecting them flushes
1906        # all data to disk.
1907        # The Python version has __del__, so it ends into gc.garbage instead
1908        self.addCleanup(support.unlink, support.TESTFN)
1909        with support.check_warnings(('', ResourceWarning)):
1910            rawio = self.FileIO(support.TESTFN, "w+b")
1911            f = self.tp(rawio)
1912            f.write(b"123xxx")
1913            f.x = f
1914            wr = weakref.ref(f)
1915            del f
1916            support.gc_collect()
1917        self.assertIsNone(wr(), wr)
1918        with self.open(support.TESTFN, "rb") as f:
1919            self.assertEqual(f.read(), b"123xxx")
1920
1921    def test_args_error(self):
1922        # Issue #17275
1923        with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1924            self.tp(io.BytesIO(), 1024, 1024, 1024)
1925
1926
1927class PyBufferedWriterTest(BufferedWriterTest):
1928    tp = pyio.BufferedWriter
1929
1930class BufferedRWPairTest(unittest.TestCase):
1931
1932    def test_constructor(self):
1933        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1934        self.assertFalse(pair.closed)
1935
1936    def test_uninitialized(self):
1937        pair = self.tp.__new__(self.tp)
1938        del pair
1939        pair = self.tp.__new__(self.tp)
1940        self.assertRaisesRegex((ValueError, AttributeError),
1941                               'uninitialized|has no attribute',
1942                               pair.read, 0)
1943        self.assertRaisesRegex((ValueError, AttributeError),
1944                               'uninitialized|has no attribute',
1945                               pair.write, b'')
1946        pair.__init__(self.MockRawIO(), self.MockRawIO())
1947        self.assertEqual(pair.read(0), b'')
1948        self.assertEqual(pair.write(b''), 0)
1949
1950    def test_detach(self):
1951        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1952        self.assertRaises(self.UnsupportedOperation, pair.detach)
1953
1954    def test_constructor_max_buffer_size_removal(self):
1955        with self.assertRaises(TypeError):
1956            self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1957
1958    def test_constructor_with_not_readable(self):
1959        class NotReadable(MockRawIO):
1960            def readable(self):
1961                return False
1962
1963        self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
1964
1965    def test_constructor_with_not_writeable(self):
1966        class NotWriteable(MockRawIO):
1967            def writable(self):
1968                return False
1969
1970        self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
1971
1972    def test_read(self):
1973        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1974
1975        self.assertEqual(pair.read(3), b"abc")
1976        self.assertEqual(pair.read(1), b"d")
1977        self.assertEqual(pair.read(), b"ef")
1978        pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1979        self.assertEqual(pair.read(None), b"abc")
1980
1981    def test_readlines(self):
1982        pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1983        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1984        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1985        self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
1986
1987    def test_read1(self):
1988        # .read1() is delegated to the underlying reader object, so this test
1989        # can be shallow.
1990        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1991
1992        self.assertEqual(pair.read1(3), b"abc")
1993        self.assertEqual(pair.read1(), b"def")
1994
1995    def test_readinto(self):
1996        for method in ("readinto", "readinto1"):
1997            with self.subTest(method):
1998                pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1999
2000                data = byteslike(b'\0' * 5)
2001                self.assertEqual(getattr(pair, method)(data), 5)
2002                self.assertEqual(bytes(data), b"abcde")
2003
2004    def test_write(self):
2005        w = self.MockRawIO()
2006        pair = self.tp(self.MockRawIO(), w)
2007
2008        pair.write(b"abc")
2009        pair.flush()
2010        buffer = bytearray(b"def")
2011        pair.write(buffer)
2012        buffer[:] = b"***"  # Overwrite our copy of the data
2013        pair.flush()
2014        self.assertEqual(w._write_stack, [b"abc", b"def"])
2015
2016    def test_peek(self):
2017        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2018
2019        self.assertTrue(pair.peek(3).startswith(b"abc"))
2020        self.assertEqual(pair.read(3), b"abc")
2021
2022    def test_readable(self):
2023        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2024        self.assertTrue(pair.readable())
2025
2026    def test_writeable(self):
2027        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2028        self.assertTrue(pair.writable())
2029
2030    def test_seekable(self):
2031        # BufferedRWPairs are never seekable, even if their readers and writers
2032        # are.
2033        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2034        self.assertFalse(pair.seekable())
2035
2036    # .flush() is delegated to the underlying writer object and has been
2037    # tested in the test_write method.
2038
2039    def test_close_and_closed(self):
2040        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2041        self.assertFalse(pair.closed)
2042        pair.close()
2043        self.assertTrue(pair.closed)
2044
2045    def test_reader_close_error_on_close(self):
2046        def reader_close():
2047            reader_non_existing
2048        reader = self.MockRawIO()
2049        reader.close = reader_close
2050        writer = self.MockRawIO()
2051        pair = self.tp(reader, writer)
2052        with self.assertRaises(NameError) as err:
2053            pair.close()
2054        self.assertIn('reader_non_existing', str(err.exception))
2055        self.assertTrue(pair.closed)
2056        self.assertFalse(reader.closed)
2057        self.assertTrue(writer.closed)
2058
2059        # Silence destructor error
2060        reader.close = lambda: None
2061
2062    def test_writer_close_error_on_close(self):
2063        def writer_close():
2064            writer_non_existing
2065        reader = self.MockRawIO()
2066        writer = self.MockRawIO()
2067        writer.close = writer_close
2068        pair = self.tp(reader, writer)
2069        with self.assertRaises(NameError) as err:
2070            pair.close()
2071        self.assertIn('writer_non_existing', str(err.exception))
2072        self.assertFalse(pair.closed)
2073        self.assertTrue(reader.closed)
2074        self.assertFalse(writer.closed)
2075
2076        # Silence destructor error
2077        writer.close = lambda: None
2078        writer = None
2079
2080        # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
2081        with support.catch_unraisable_exception():
2082            # Ignore BufferedRWPair unraisable exception
2083            with support.catch_unraisable_exception():
2084                pair = None
2085                support.gc_collect()
2086            support.gc_collect()
2087
2088    def test_reader_writer_close_error_on_close(self):
2089        def reader_close():
2090            reader_non_existing
2091        def writer_close():
2092            writer_non_existing
2093        reader = self.MockRawIO()
2094        reader.close = reader_close
2095        writer = self.MockRawIO()
2096        writer.close = writer_close
2097        pair = self.tp(reader, writer)
2098        with self.assertRaises(NameError) as err:
2099            pair.close()
2100        self.assertIn('reader_non_existing', str(err.exception))
2101        self.assertIsInstance(err.exception.__context__, NameError)
2102        self.assertIn('writer_non_existing', str(err.exception.__context__))
2103        self.assertFalse(pair.closed)
2104        self.assertFalse(reader.closed)
2105        self.assertFalse(writer.closed)
2106
2107        # Silence destructor error
2108        reader.close = lambda: None
2109        writer.close = lambda: None
2110
2111    def test_isatty(self):
2112        class SelectableIsAtty(MockRawIO):
2113            def __init__(self, isatty):
2114                MockRawIO.__init__(self)
2115                self._isatty = isatty
2116
2117            def isatty(self):
2118                return self._isatty
2119
2120        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2121        self.assertFalse(pair.isatty())
2122
2123        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2124        self.assertTrue(pair.isatty())
2125
2126        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2127        self.assertTrue(pair.isatty())
2128
2129        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2130        self.assertTrue(pair.isatty())
2131
2132    def test_weakref_clearing(self):
2133        brw = self.tp(self.MockRawIO(), self.MockRawIO())
2134        ref = weakref.ref(brw)
2135        brw = None
2136        ref = None # Shouldn't segfault.
2137
2138class CBufferedRWPairTest(BufferedRWPairTest):
2139    tp = io.BufferedRWPair
2140
2141class PyBufferedRWPairTest(BufferedRWPairTest):
2142    tp = pyio.BufferedRWPair
2143
2144
2145class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2146    read_mode = "rb+"
2147    write_mode = "wb+"
2148
2149    def test_constructor(self):
2150        BufferedReaderTest.test_constructor(self)
2151        BufferedWriterTest.test_constructor(self)
2152
2153    def test_uninitialized(self):
2154        BufferedReaderTest.test_uninitialized(self)
2155        BufferedWriterTest.test_uninitialized(self)
2156
2157    def test_read_and_write(self):
2158        raw = self.MockRawIO((b"asdf", b"ghjk"))
2159        rw = self.tp(raw, 8)
2160
2161        self.assertEqual(b"as", rw.read(2))
2162        rw.write(b"ddd")
2163        rw.write(b"eee")
2164        self.assertFalse(raw._write_stack) # Buffer writes
2165        self.assertEqual(b"ghjk", rw.read())
2166        self.assertEqual(b"dddeee", raw._write_stack[0])
2167
2168    def test_seek_and_tell(self):
2169        raw = self.BytesIO(b"asdfghjkl")
2170        rw = self.tp(raw)
2171
2172        self.assertEqual(b"as", rw.read(2))
2173        self.assertEqual(2, rw.tell())
2174        rw.seek(0, 0)
2175        self.assertEqual(b"asdf", rw.read(4))
2176
2177        rw.write(b"123f")
2178        rw.seek(0, 0)
2179        self.assertEqual(b"asdf123fl", rw.read())
2180        self.assertEqual(9, rw.tell())
2181        rw.seek(-4, 2)
2182        self.assertEqual(5, rw.tell())
2183        rw.seek(2, 1)
2184        self.assertEqual(7, rw.tell())
2185        self.assertEqual(b"fl", rw.read(11))
2186        rw.flush()
2187        self.assertEqual(b"asdf123fl", raw.getvalue())
2188
2189        self.assertRaises(TypeError, rw.seek, 0.0)
2190
2191    def check_flush_and_read(self, read_func):
2192        raw = self.BytesIO(b"abcdefghi")
2193        bufio = self.tp(raw)
2194
2195        self.assertEqual(b"ab", read_func(bufio, 2))
2196        bufio.write(b"12")
2197        self.assertEqual(b"ef", read_func(bufio, 2))
2198        self.assertEqual(6, bufio.tell())
2199        bufio.flush()
2200        self.assertEqual(6, bufio.tell())
2201        self.assertEqual(b"ghi", read_func(bufio))
2202        raw.seek(0, 0)
2203        raw.write(b"XYZ")
2204        # flush() resets the read buffer
2205        bufio.flush()
2206        bufio.seek(0, 0)
2207        self.assertEqual(b"XYZ", read_func(bufio, 3))
2208
2209    def test_flush_and_read(self):
2210        self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2211
2212    def test_flush_and_readinto(self):
2213        def _readinto(bufio, n=-1):
2214            b = bytearray(n if n >= 0 else 9999)
2215            n = bufio.readinto(b)
2216            return bytes(b[:n])
2217        self.check_flush_and_read(_readinto)
2218
2219    def test_flush_and_peek(self):
2220        def _peek(bufio, n=-1):
2221            # This relies on the fact that the buffer can contain the whole
2222            # raw stream, otherwise peek() can return less.
2223            b = bufio.peek(n)
2224            if n != -1:
2225                b = b[:n]
2226            bufio.seek(len(b), 1)
2227            return b
2228        self.check_flush_and_read(_peek)
2229
2230    def test_flush_and_write(self):
2231        raw = self.BytesIO(b"abcdefghi")
2232        bufio = self.tp(raw)
2233
2234        bufio.write(b"123")
2235        bufio.flush()
2236        bufio.write(b"45")
2237        bufio.flush()
2238        bufio.seek(0, 0)
2239        self.assertEqual(b"12345fghi", raw.getvalue())
2240        self.assertEqual(b"12345fghi", bufio.read())
2241
2242    def test_threads(self):
2243        BufferedReaderTest.test_threads(self)
2244        BufferedWriterTest.test_threads(self)
2245
2246    def test_writes_and_peek(self):
2247        def _peek(bufio):
2248            bufio.peek(1)
2249        self.check_writes(_peek)
2250        def _peek(bufio):
2251            pos = bufio.tell()
2252            bufio.seek(-1, 1)
2253            bufio.peek(1)
2254            bufio.seek(pos, 0)
2255        self.check_writes(_peek)
2256
2257    def test_writes_and_reads(self):
2258        def _read(bufio):
2259            bufio.seek(-1, 1)
2260            bufio.read(1)
2261        self.check_writes(_read)
2262
2263    def test_writes_and_read1s(self):
2264        def _read1(bufio):
2265            bufio.seek(-1, 1)
2266            bufio.read1(1)
2267        self.check_writes(_read1)
2268
2269    def test_writes_and_readintos(self):
2270        def _read(bufio):
2271            bufio.seek(-1, 1)
2272            bufio.readinto(bytearray(1))
2273        self.check_writes(_read)
2274
2275    def test_write_after_readahead(self):
2276        # Issue #6629: writing after the buffer was filled by readahead should
2277        # first rewind the raw stream.
2278        for overwrite_size in [1, 5]:
2279            raw = self.BytesIO(b"A" * 10)
2280            bufio = self.tp(raw, 4)
2281            # Trigger readahead
2282            self.assertEqual(bufio.read(1), b"A")
2283            self.assertEqual(bufio.tell(), 1)
2284            # Overwriting should rewind the raw stream if it needs so
2285            bufio.write(b"B" * overwrite_size)
2286            self.assertEqual(bufio.tell(), overwrite_size + 1)
2287            # If the write size was smaller than the buffer size, flush() and
2288            # check that rewind happens.
2289            bufio.flush()
2290            self.assertEqual(bufio.tell(), overwrite_size + 1)
2291            s = raw.getvalue()
2292            self.assertEqual(s,
2293                b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2294
2295    def test_write_rewind_write(self):
2296        # Various combinations of reading / writing / seeking backwards / writing again
2297        def mutate(bufio, pos1, pos2):
2298            assert pos2 >= pos1
2299            # Fill the buffer
2300            bufio.seek(pos1)
2301            bufio.read(pos2 - pos1)
2302            bufio.write(b'\x02')
2303            # This writes earlier than the previous write, but still inside
2304            # the buffer.
2305            bufio.seek(pos1)
2306            bufio.write(b'\x01')
2307
2308        b = b"\x80\x81\x82\x83\x84"
2309        for i in range(0, len(b)):
2310            for j in range(i, len(b)):
2311                raw = self.BytesIO(b)
2312                bufio = self.tp(raw, 100)
2313                mutate(bufio, i, j)
2314                bufio.flush()
2315                expected = bytearray(b)
2316                expected[j] = 2
2317                expected[i] = 1
2318                self.assertEqual(raw.getvalue(), expected,
2319                                 "failed result for i=%d, j=%d" % (i, j))
2320
2321    def test_truncate_after_read_or_write(self):
2322        raw = self.BytesIO(b"A" * 10)
2323        bufio = self.tp(raw, 100)
2324        self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2325        self.assertEqual(bufio.truncate(), 2)
2326        self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2327        self.assertEqual(bufio.truncate(), 4)
2328
2329    def test_misbehaved_io(self):
2330        BufferedReaderTest.test_misbehaved_io(self)
2331        BufferedWriterTest.test_misbehaved_io(self)
2332
2333    def test_interleaved_read_write(self):
2334        # Test for issue #12213
2335        with self.BytesIO(b'abcdefgh') as raw:
2336            with self.tp(raw, 100) as f:
2337                f.write(b"1")
2338                self.assertEqual(f.read(1), b'b')
2339                f.write(b'2')
2340                self.assertEqual(f.read1(1), b'd')
2341                f.write(b'3')
2342                buf = bytearray(1)
2343                f.readinto(buf)
2344                self.assertEqual(buf, b'f')
2345                f.write(b'4')
2346                self.assertEqual(f.peek(1), b'h')
2347                f.flush()
2348                self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2349
2350        with self.BytesIO(b'abc') as raw:
2351            with self.tp(raw, 100) as f:
2352                self.assertEqual(f.read(1), b'a')
2353                f.write(b"2")
2354                self.assertEqual(f.read(1), b'c')
2355                f.flush()
2356                self.assertEqual(raw.getvalue(), b'a2c')
2357
2358    def test_interleaved_readline_write(self):
2359        with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2360            with self.tp(raw) as f:
2361                f.write(b'1')
2362                self.assertEqual(f.readline(), b'b\n')
2363                f.write(b'2')
2364                self.assertEqual(f.readline(), b'def\n')
2365                f.write(b'3')
2366                self.assertEqual(f.readline(), b'\n')
2367                f.flush()
2368                self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2369
2370    # You can't construct a BufferedRandom over a non-seekable stream.
2371    test_unseekable = None
2372
2373
2374class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
2375    tp = io.BufferedRandom
2376
2377    @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2378                     "instead of returning NULL for malloc failure.")
2379    def test_constructor(self):
2380        BufferedRandomTest.test_constructor(self)
2381        # The allocation can succeed on 32-bit builds, e.g. with more
2382        # than 2 GiB RAM and a 64-bit kernel.
2383        if sys.maxsize > 0x7FFFFFFF:
2384            rawio = self.MockRawIO()
2385            bufio = self.tp(rawio)
2386            self.assertRaises((OverflowError, MemoryError, ValueError),
2387                bufio.__init__, rawio, sys.maxsize)
2388
2389    def test_garbage_collection(self):
2390        CBufferedReaderTest.test_garbage_collection(self)
2391        CBufferedWriterTest.test_garbage_collection(self)
2392
2393    def test_args_error(self):
2394        # Issue #17275
2395        with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2396            self.tp(io.BytesIO(), 1024, 1024, 1024)
2397
2398
2399class PyBufferedRandomTest(BufferedRandomTest):
2400    tp = pyio.BufferedRandom
2401
2402
2403# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2404# properties:
2405#   - A single output character can correspond to many bytes of input.
2406#   - The number of input bytes to complete the character can be
2407#     undetermined until the last input byte is received.
2408#   - The number of input bytes can vary depending on previous input.
2409#   - A single input byte can correspond to many characters of output.
2410#   - The number of output characters can be undetermined until the
2411#     last input byte is received.
2412#   - The number of output characters can vary depending on previous input.
2413
2414class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2415    """
2416    For testing seek/tell behavior with a stateful, buffering decoder.
2417
2418    Input is a sequence of words.  Words may be fixed-length (length set
2419    by input) or variable-length (period-terminated).  In variable-length
2420    mode, extra periods are ignored.  Possible words are:
2421      - 'i' followed by a number sets the input length, I (maximum 99).
2422        When I is set to 0, words are space-terminated.
2423      - 'o' followed by a number sets the output length, O (maximum 99).
2424      - Any other word is converted into a word followed by a period on
2425        the output.  The output word consists of the input word truncated
2426        or padded out with hyphens to make its length equal to O.  If O
2427        is 0, the word is output verbatim without truncating or padding.
2428    I and O are initially set to 1.  When I changes, any buffered input is
2429    re-scanned according to the new I.  EOF also terminates the last word.
2430    """
2431
2432    def __init__(self, errors='strict'):
2433        codecs.IncrementalDecoder.__init__(self, errors)
2434        self.reset()
2435
2436    def __repr__(self):
2437        return '<SID %x>' % id(self)
2438
2439    def reset(self):
2440        self.i = 1
2441        self.o = 1
2442        self.buffer = bytearray()
2443
2444    def getstate(self):
2445        i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2446        return bytes(self.buffer), i*100 + o
2447
2448    def setstate(self, state):
2449        buffer, io = state
2450        self.buffer = bytearray(buffer)
2451        i, o = divmod(io, 100)
2452        self.i, self.o = i ^ 1, o ^ 1
2453
2454    def decode(self, input, final=False):
2455        output = ''
2456        for b in input:
2457            if self.i == 0: # variable-length, terminated with period
2458                if b == ord('.'):
2459                    if self.buffer:
2460                        output += self.process_word()
2461                else:
2462                    self.buffer.append(b)
2463            else: # fixed-length, terminate after self.i bytes
2464                self.buffer.append(b)
2465                if len(self.buffer) == self.i:
2466                    output += self.process_word()
2467        if final and self.buffer: # EOF terminates the last word
2468            output += self.process_word()
2469        return output
2470
2471    def process_word(self):
2472        output = ''
2473        if self.buffer[0] == ord('i'):
2474            self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2475        elif self.buffer[0] == ord('o'):
2476            self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2477        else:
2478            output = self.buffer.decode('ascii')
2479            if len(output) < self.o:
2480                output += '-'*self.o # pad out with hyphens
2481            if self.o:
2482                output = output[:self.o] # truncate to output length
2483            output += '.'
2484        self.buffer = bytearray()
2485        return output
2486
2487    codecEnabled = False
2488
2489    @classmethod
2490    def lookupTestDecoder(cls, name):
2491        if cls.codecEnabled and name == 'test_decoder':
2492            latin1 = codecs.lookup('latin-1')
2493            return codecs.CodecInfo(
2494                name='test_decoder', encode=latin1.encode, decode=None,
2495                incrementalencoder=None,
2496                streamreader=None, streamwriter=None,
2497                incrementaldecoder=cls)
2498
2499# Register the previous decoder for testing.
2500# Disabled by default, tests will enable it.
2501codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2502
2503
2504class StatefulIncrementalDecoderTest(unittest.TestCase):
2505    """
2506    Make sure the StatefulIncrementalDecoder actually works.
2507    """
2508
2509    test_cases = [
2510        # I=1, O=1 (fixed-length input == fixed-length output)
2511        (b'abcd', False, 'a.b.c.d.'),
2512        # I=0, O=0 (variable-length input, variable-length output)
2513        (b'oiabcd', True, 'abcd.'),
2514        # I=0, O=0 (should ignore extra periods)
2515        (b'oi...abcd...', True, 'abcd.'),
2516        # I=0, O=6 (variable-length input, fixed-length output)
2517        (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2518        # I=2, O=6 (fixed-length input < fixed-length output)
2519        (b'i.i2.o6xyz', True, 'xy----.z-----.'),
2520        # I=6, O=3 (fixed-length input > fixed-length output)
2521        (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2522        # I=0, then 3; O=29, then 15 (with longer output)
2523        (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2524         'a----------------------------.' +
2525         'b----------------------------.' +
2526         'cde--------------------------.' +
2527         'abcdefghijabcde.' +
2528         'a.b------------.' +
2529         '.c.------------.' +
2530         'd.e------------.' +
2531         'k--------------.' +
2532         'l--------------.' +
2533         'm--------------.')
2534    ]
2535
2536    def test_decoder(self):
2537        # Try a few one-shot test cases.
2538        for input, eof, output in self.test_cases:
2539            d = StatefulIncrementalDecoder()
2540            self.assertEqual(d.decode(input, eof), output)
2541
2542        # Also test an unfinished decode, followed by forcing EOF.
2543        d = StatefulIncrementalDecoder()
2544        self.assertEqual(d.decode(b'oiabcd'), '')
2545        self.assertEqual(d.decode(b'', 1), 'abcd.')
2546
2547class TextIOWrapperTest(unittest.TestCase):
2548
2549    def setUp(self):
2550        self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2551        self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
2552        support.unlink(support.TESTFN)
2553
2554    def tearDown(self):
2555        support.unlink(support.TESTFN)
2556
2557    def test_constructor(self):
2558        r = self.BytesIO(b"\xc3\xa9\n\n")
2559        b = self.BufferedReader(r, 1000)
2560        t = self.TextIOWrapper(b)
2561        t.__init__(b, encoding="latin-1", newline="\r\n")
2562        self.assertEqual(t.encoding, "latin-1")
2563        self.assertEqual(t.line_buffering, False)
2564        t.__init__(b, encoding="utf-8", line_buffering=True)
2565        self.assertEqual(t.encoding, "utf-8")
2566        self.assertEqual(t.line_buffering, True)
2567        self.assertEqual("\xe9\n", t.readline())
2568        self.assertRaises(TypeError, t.__init__, b, newline=42)
2569        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2570
2571    def test_uninitialized(self):
2572        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2573        del t
2574        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2575        self.assertRaises(Exception, repr, t)
2576        self.assertRaisesRegex((ValueError, AttributeError),
2577                               'uninitialized|has no attribute',
2578                               t.read, 0)
2579        t.__init__(self.MockRawIO())
2580        self.assertEqual(t.read(0), '')
2581
2582    def test_non_text_encoding_codecs_are_rejected(self):
2583        # Ensure the constructor complains if passed a codec that isn't
2584        # marked as a text encoding
2585        # http://bugs.python.org/issue20404
2586        r = self.BytesIO()
2587        b = self.BufferedWriter(r)
2588        with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2589            self.TextIOWrapper(b, encoding="hex")
2590
2591    def test_detach(self):
2592        r = self.BytesIO()
2593        b = self.BufferedWriter(r)
2594        t = self.TextIOWrapper(b)
2595        self.assertIs(t.detach(), b)
2596
2597        t = self.TextIOWrapper(b, encoding="ascii")
2598        t.write("howdy")
2599        self.assertFalse(r.getvalue())
2600        t.detach()
2601        self.assertEqual(r.getvalue(), b"howdy")
2602        self.assertRaises(ValueError, t.detach)
2603
2604        # Operations independent of the detached stream should still work
2605        repr(t)
2606        self.assertEqual(t.encoding, "ascii")
2607        self.assertEqual(t.errors, "strict")
2608        self.assertFalse(t.line_buffering)
2609        self.assertFalse(t.write_through)
2610
2611    def test_repr(self):
2612        raw = self.BytesIO("hello".encode("utf-8"))
2613        b = self.BufferedReader(raw)
2614        t = self.TextIOWrapper(b, encoding="utf-8")
2615        modname = self.TextIOWrapper.__module__
2616        self.assertRegex(repr(t),
2617                         r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
2618        raw.name = "dummy"
2619        self.assertRegex(repr(t),
2620                         r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2621        t.mode = "r"
2622        self.assertRegex(repr(t),
2623                         r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
2624        raw.name = b"dummy"
2625        self.assertRegex(repr(t),
2626                         r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
2627
2628        t.buffer.detach()
2629        repr(t)  # Should not raise an exception
2630
2631    def test_recursive_repr(self):
2632        # Issue #25455
2633        raw = self.BytesIO()
2634        t = self.TextIOWrapper(raw)
2635        with support.swap_attr(raw, 'name', t):
2636            try:
2637                repr(t)  # Should not crash
2638            except RuntimeError:
2639                pass
2640
2641    def test_line_buffering(self):
2642        r = self.BytesIO()
2643        b = self.BufferedWriter(r, 1000)
2644        t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
2645        t.write("X")
2646        self.assertEqual(r.getvalue(), b"")  # No flush happened
2647        t.write("Y\nZ")
2648        self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
2649        t.write("A\rB")
2650        self.assertEqual(r.getvalue(), b"XY\nZA\rB")
2651
2652    def test_reconfigure_line_buffering(self):
2653        r = self.BytesIO()
2654        b = self.BufferedWriter(r, 1000)
2655        t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2656        t.write("AB\nC")
2657        self.assertEqual(r.getvalue(), b"")
2658
2659        t.reconfigure(line_buffering=True)   # implicit flush
2660        self.assertEqual(r.getvalue(), b"AB\nC")
2661        t.write("DEF\nG")
2662        self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2663        t.write("H")
2664        self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2665        t.reconfigure(line_buffering=False)   # implicit flush
2666        self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2667        t.write("IJ")
2668        self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2669
2670        # Keeping default value
2671        t.reconfigure()
2672        t.reconfigure(line_buffering=None)
2673        self.assertEqual(t.line_buffering, False)
2674        t.reconfigure(line_buffering=True)
2675        t.reconfigure()
2676        t.reconfigure(line_buffering=None)
2677        self.assertEqual(t.line_buffering, True)
2678
2679    @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
2680    def test_default_encoding(self):
2681        old_environ = dict(os.environ)
2682        try:
2683            # try to get a user preferred encoding different than the current
2684            # locale encoding to check that TextIOWrapper() uses the current
2685            # locale encoding and not the user preferred encoding
2686            for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2687                if key in os.environ:
2688                    del os.environ[key]
2689
2690            current_locale_encoding = locale.getpreferredencoding(False)
2691            b = self.BytesIO()
2692            t = self.TextIOWrapper(b)
2693            self.assertEqual(t.encoding, current_locale_encoding)
2694        finally:
2695            os.environ.clear()
2696            os.environ.update(old_environ)
2697
2698    @support.cpython_only
2699    @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
2700    def test_device_encoding(self):
2701        # Issue 15989
2702        import _testcapi
2703        b = self.BytesIO()
2704        b.fileno = lambda: _testcapi.INT_MAX + 1
2705        self.assertRaises(OverflowError, self.TextIOWrapper, b)
2706        b.fileno = lambda: _testcapi.UINT_MAX + 1
2707        self.assertRaises(OverflowError, self.TextIOWrapper, b)
2708
2709    def test_encoding(self):
2710        # Check the encoding attribute is always set, and valid
2711        b = self.BytesIO()
2712        t = self.TextIOWrapper(b, encoding="utf-8")
2713        self.assertEqual(t.encoding, "utf-8")
2714        t = self.TextIOWrapper(b)
2715        self.assertIsNotNone(t.encoding)
2716        codecs.lookup(t.encoding)
2717
2718    def test_encoding_errors_reading(self):
2719        # (1) default
2720        b = self.BytesIO(b"abc\n\xff\n")
2721        t = self.TextIOWrapper(b, encoding="ascii")
2722        self.assertRaises(UnicodeError, t.read)
2723        # (2) explicit strict
2724        b = self.BytesIO(b"abc\n\xff\n")
2725        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2726        self.assertRaises(UnicodeError, t.read)
2727        # (3) ignore
2728        b = self.BytesIO(b"abc\n\xff\n")
2729        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
2730        self.assertEqual(t.read(), "abc\n\n")
2731        # (4) replace
2732        b = self.BytesIO(b"abc\n\xff\n")
2733        t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
2734        self.assertEqual(t.read(), "abc\n\ufffd\n")
2735
2736    def test_encoding_errors_writing(self):
2737        # (1) default
2738        b = self.BytesIO()
2739        t = self.TextIOWrapper(b, encoding="ascii")
2740        self.assertRaises(UnicodeError, t.write, "\xff")
2741        # (2) explicit strict
2742        b = self.BytesIO()
2743        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2744        self.assertRaises(UnicodeError, t.write, "\xff")
2745        # (3) ignore
2746        b = self.BytesIO()
2747        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
2748                             newline="\n")
2749        t.write("abc\xffdef\n")
2750        t.flush()
2751        self.assertEqual(b.getvalue(), b"abcdef\n")
2752        # (4) replace
2753        b = self.BytesIO()
2754        t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
2755                             newline="\n")
2756        t.write("abc\xffdef\n")
2757        t.flush()
2758        self.assertEqual(b.getvalue(), b"abc?def\n")
2759
2760    def test_newlines(self):
2761        input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2762
2763        tests = [
2764            [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2765            [ '', input_lines ],
2766            [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2767            [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2768            [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2769        ]
2770        encodings = (
2771            'utf-8', 'latin-1',
2772            'utf-16', 'utf-16-le', 'utf-16-be',
2773            'utf-32', 'utf-32-le', 'utf-32-be',
2774        )
2775
2776        # Try a range of buffer sizes to test the case where \r is the last
2777        # character in TextIOWrapper._pending_line.
2778        for encoding in encodings:
2779            # XXX: str.encode() should return bytes
2780            data = bytes(''.join(input_lines).encode(encoding))
2781            for do_reads in (False, True):
2782                for bufsize in range(1, 10):
2783                    for newline, exp_lines in tests:
2784                        bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2785                        textio = self.TextIOWrapper(bufio, newline=newline,
2786                                                  encoding=encoding)
2787                        if do_reads:
2788                            got_lines = []
2789                            while True:
2790                                c2 = textio.read(2)
2791                                if c2 == '':
2792                                    break
2793                                self.assertEqual(len(c2), 2)
2794                                got_lines.append(c2 + textio.readline())
2795                        else:
2796                            got_lines = list(textio)
2797
2798                        for got_line, exp_line in zip(got_lines, exp_lines):
2799                            self.assertEqual(got_line, exp_line)
2800                        self.assertEqual(len(got_lines), len(exp_lines))
2801
2802    def test_newlines_input(self):
2803        testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
2804        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2805        for newline, expected in [
2806            (None, normalized.decode("ascii").splitlines(keepends=True)),
2807            ("", testdata.decode("ascii").splitlines(keepends=True)),
2808            ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2809            ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2810            ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
2811            ]:
2812            buf = self.BytesIO(testdata)
2813            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2814            self.assertEqual(txt.readlines(), expected)
2815            txt.seek(0)
2816            self.assertEqual(txt.read(), "".join(expected))
2817
2818    def test_newlines_output(self):
2819        testdict = {
2820            "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2821            "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2822            "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2823            "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2824            }
2825        tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2826        for newline, expected in tests:
2827            buf = self.BytesIO()
2828            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2829            txt.write("AAA\nB")
2830            txt.write("BB\nCCC\n")
2831            txt.write("X\rY\r\nZ")
2832            txt.flush()
2833            self.assertEqual(buf.closed, False)
2834            self.assertEqual(buf.getvalue(), expected)
2835
2836    def test_destructor(self):
2837        l = []
2838        base = self.BytesIO
2839        class MyBytesIO(base):
2840            def close(self):
2841                l.append(self.getvalue())
2842                base.close(self)
2843        b = MyBytesIO()
2844        t = self.TextIOWrapper(b, encoding="ascii")
2845        t.write("abc")
2846        del t
2847        support.gc_collect()
2848        self.assertEqual([b"abc"], l)
2849
2850    def test_override_destructor(self):
2851        record = []
2852        class MyTextIO(self.TextIOWrapper):
2853            def __del__(self):
2854                record.append(1)
2855                try:
2856                    f = super().__del__
2857                except AttributeError:
2858                    pass
2859                else:
2860                    f()
2861            def close(self):
2862                record.append(2)
2863                super().close()
2864            def flush(self):
2865                record.append(3)
2866                super().flush()
2867        b = self.BytesIO()
2868        t = MyTextIO(b, encoding="ascii")
2869        del t
2870        support.gc_collect()
2871        self.assertEqual(record, [1, 2, 3])
2872
2873    def test_error_through_destructor(self):
2874        # Test that the exception state is not modified by a destructor,
2875        # even if close() fails.
2876        rawio = self.CloseFailureIO()
2877        with support.catch_unraisable_exception() as cm:
2878            with self.assertRaises(AttributeError):
2879                self.TextIOWrapper(rawio).xyzzy
2880
2881            if not IOBASE_EMITS_UNRAISABLE:
2882                self.assertIsNone(cm.unraisable)
2883            elif cm.unraisable is not None:
2884                self.assertEqual(cm.unraisable.exc_type, OSError)
2885
2886    # Systematic tests of the text I/O API
2887
2888    def test_basic_io(self):
2889        for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2890            for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
2891                f = self.open(support.TESTFN, "w+", encoding=enc)
2892                f._CHUNK_SIZE = chunksize
2893                self.assertEqual(f.write("abc"), 3)
2894                f.close()
2895                f = self.open(support.TESTFN, "r+", encoding=enc)
2896                f._CHUNK_SIZE = chunksize
2897                self.assertEqual(f.tell(), 0)
2898                self.assertEqual(f.read(), "abc")
2899                cookie = f.tell()
2900                self.assertEqual(f.seek(0), 0)
2901                self.assertEqual(f.read(None), "abc")
2902                f.seek(0)
2903                self.assertEqual(f.read(2), "ab")
2904                self.assertEqual(f.read(1), "c")
2905                self.assertEqual(f.read(1), "")
2906                self.assertEqual(f.read(), "")
2907                self.assertEqual(f.tell(), cookie)
2908                self.assertEqual(f.seek(0), 0)
2909                self.assertEqual(f.seek(0, 2), cookie)
2910                self.assertEqual(f.write("def"), 3)
2911                self.assertEqual(f.seek(cookie), cookie)
2912                self.assertEqual(f.read(), "def")
2913                if enc.startswith("utf"):
2914                    self.multi_line_test(f, enc)
2915                f.close()
2916
2917    def multi_line_test(self, f, enc):
2918        f.seek(0)
2919        f.truncate()
2920        sample = "s\xff\u0fff\uffff"
2921        wlines = []
2922        for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2923            chars = []
2924            for i in range(size):
2925                chars.append(sample[i % len(sample)])
2926            line = "".join(chars) + "\n"
2927            wlines.append((f.tell(), line))
2928            f.write(line)
2929        f.seek(0)
2930        rlines = []
2931        while True:
2932            pos = f.tell()
2933            line = f.readline()
2934            if not line:
2935                break
2936            rlines.append((pos, line))
2937        self.assertEqual(rlines, wlines)
2938
2939    def test_telling(self):
2940        f = self.open(support.TESTFN, "w+", encoding="utf-8")
2941        p0 = f.tell()
2942        f.write("\xff\n")
2943        p1 = f.tell()
2944        f.write("\xff\n")
2945        p2 = f.tell()
2946        f.seek(0)
2947        self.assertEqual(f.tell(), p0)
2948        self.assertEqual(f.readline(), "\xff\n")
2949        self.assertEqual(f.tell(), p1)
2950        self.assertEqual(f.readline(), "\xff\n")
2951        self.assertEqual(f.tell(), p2)
2952        f.seek(0)
2953        for line in f:
2954            self.assertEqual(line, "\xff\n")
2955            self.assertRaises(OSError, f.tell)
2956        self.assertEqual(f.tell(), p2)
2957        f.close()
2958
2959    def test_seeking(self):
2960        chunk_size = _default_chunk_size()
2961        prefix_size = chunk_size - 2
2962        u_prefix = "a" * prefix_size
2963        prefix = bytes(u_prefix.encode("utf-8"))
2964        self.assertEqual(len(u_prefix), len(prefix))
2965        u_suffix = "\u8888\n"
2966        suffix = bytes(u_suffix.encode("utf-8"))
2967        line = prefix + suffix
2968        with self.open(support.TESTFN, "wb") as f:
2969            f.write(line*2)
2970        with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2971            s = f.read(prefix_size)
2972            self.assertEqual(s, str(prefix, "ascii"))
2973            self.assertEqual(f.tell(), prefix_size)
2974            self.assertEqual(f.readline(), u_suffix)
2975
2976    def test_seeking_too(self):
2977        # Regression test for a specific bug
2978        data = b'\xe0\xbf\xbf\n'
2979        with self.open(support.TESTFN, "wb") as f:
2980            f.write(data)
2981        with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2982            f._CHUNK_SIZE  # Just test that it exists
2983            f._CHUNK_SIZE = 2
2984            f.readline()
2985            f.tell()
2986
2987    def test_seek_and_tell(self):
2988        #Test seek/tell using the StatefulIncrementalDecoder.
2989        # Make test faster by doing smaller seeks
2990        CHUNK_SIZE = 128
2991
2992        def test_seek_and_tell_with_data(data, min_pos=0):
2993            """Tell/seek to various points within a data stream and ensure
2994            that the decoded data returned by read() is consistent."""
2995            f = self.open(support.TESTFN, 'wb')
2996            f.write(data)
2997            f.close()
2998            f = self.open(support.TESTFN, encoding='test_decoder')
2999            f._CHUNK_SIZE = CHUNK_SIZE
3000            decoded = f.read()
3001            f.close()
3002
3003            for i in range(min_pos, len(decoded) + 1): # seek positions
3004                for j in [1, 5, len(decoded) - i]: # read lengths
3005                    f = self.open(support.TESTFN, encoding='test_decoder')
3006                    self.assertEqual(f.read(i), decoded[:i])
3007                    cookie = f.tell()
3008                    self.assertEqual(f.read(j), decoded[i:i + j])
3009                    f.seek(cookie)
3010                    self.assertEqual(f.read(), decoded[i:])
3011                    f.close()
3012
3013        # Enable the test decoder.
3014        StatefulIncrementalDecoder.codecEnabled = 1
3015
3016        # Run the tests.
3017        try:
3018            # Try each test case.
3019            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3020                test_seek_and_tell_with_data(input)
3021
3022            # Position each test case so that it crosses a chunk boundary.
3023            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3024                offset = CHUNK_SIZE - len(input)//2
3025                prefix = b'.'*offset
3026                # Don't bother seeking into the prefix (takes too long).
3027                min_pos = offset*2
3028                test_seek_and_tell_with_data(prefix + input, min_pos)
3029
3030        # Ensure our test decoder won't interfere with subsequent tests.
3031        finally:
3032            StatefulIncrementalDecoder.codecEnabled = 0
3033
3034    def test_multibyte_seek_and_tell(self):
3035        f = self.open(support.TESTFN, "w", encoding="euc_jp")
3036        f.write("AB\n\u3046\u3048\n")
3037        f.close()
3038
3039        f = self.open(support.TESTFN, "r", encoding="euc_jp")
3040        self.assertEqual(f.readline(), "AB\n")
3041        p0 = f.tell()
3042        self.assertEqual(f.readline(), "\u3046\u3048\n")
3043        p1 = f.tell()
3044        f.seek(p0)
3045        self.assertEqual(f.readline(), "\u3046\u3048\n")
3046        self.assertEqual(f.tell(), p1)
3047        f.close()
3048
3049    def test_seek_with_encoder_state(self):
3050        f = self.open(support.TESTFN, "w", encoding="euc_jis_2004")
3051        f.write("\u00e6\u0300")
3052        p0 = f.tell()
3053        f.write("\u00e6")
3054        f.seek(p0)
3055        f.write("\u0300")
3056        f.close()
3057
3058        f = self.open(support.TESTFN, "r", encoding="euc_jis_2004")
3059        self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3060        f.close()
3061
3062    def test_encoded_writes(self):
3063        data = "1234567890"
3064        tests = ("utf-16",
3065                 "utf-16-le",
3066                 "utf-16-be",
3067                 "utf-32",
3068                 "utf-32-le",
3069                 "utf-32-be")
3070        for encoding in tests:
3071            buf = self.BytesIO()
3072            f = self.TextIOWrapper(buf, encoding=encoding)
3073            # Check if the BOM is written only once (see issue1753).
3074            f.write(data)
3075            f.write(data)
3076            f.seek(0)
3077            self.assertEqual(f.read(), data * 2)
3078            f.seek(0)
3079            self.assertEqual(f.read(), data * 2)
3080            self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
3081
3082    def test_unreadable(self):
3083        class UnReadable(self.BytesIO):
3084            def readable(self):
3085                return False
3086        txt = self.TextIOWrapper(UnReadable())
3087        self.assertRaises(OSError, txt.read)
3088
3089    def test_read_one_by_one(self):
3090        txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
3091        reads = ""
3092        while True:
3093            c = txt.read(1)
3094            if not c:
3095                break
3096            reads += c
3097        self.assertEqual(reads, "AA\nBB")
3098
3099    def test_readlines(self):
3100        txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3101        self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3102        txt.seek(0)
3103        self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3104        txt.seek(0)
3105        self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3106
3107    # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
3108    def test_read_by_chunk(self):
3109        # make sure "\r\n" straddles 128 char boundary.
3110        txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
3111        reads = ""
3112        while True:
3113            c = txt.read(128)
3114            if not c:
3115                break
3116            reads += c
3117        self.assertEqual(reads, "A"*127+"\nB")
3118
3119    def test_writelines(self):
3120        l = ['ab', 'cd', 'ef']
3121        buf = self.BytesIO()
3122        txt = self.TextIOWrapper(buf)
3123        txt.writelines(l)
3124        txt.flush()
3125        self.assertEqual(buf.getvalue(), b'abcdef')
3126
3127    def test_writelines_userlist(self):
3128        l = UserList(['ab', 'cd', 'ef'])
3129        buf = self.BytesIO()
3130        txt = self.TextIOWrapper(buf)
3131        txt.writelines(l)
3132        txt.flush()
3133        self.assertEqual(buf.getvalue(), b'abcdef')
3134
3135    def test_writelines_error(self):
3136        txt = self.TextIOWrapper(self.BytesIO())
3137        self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3138        self.assertRaises(TypeError, txt.writelines, None)
3139        self.assertRaises(TypeError, txt.writelines, b'abc')
3140
3141    def test_issue1395_1(self):
3142        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3143
3144        # read one char at a time
3145        reads = ""
3146        while True:
3147            c = txt.read(1)
3148            if not c:
3149                break
3150            reads += c
3151        self.assertEqual(reads, self.normalized)
3152
3153    def test_issue1395_2(self):
3154        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3155        txt._CHUNK_SIZE = 4
3156
3157        reads = ""
3158        while True:
3159            c = txt.read(4)
3160            if not c:
3161                break
3162            reads += c
3163        self.assertEqual(reads, self.normalized)
3164
3165    def test_issue1395_3(self):
3166        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3167        txt._CHUNK_SIZE = 4
3168
3169        reads = txt.read(4)
3170        reads += txt.read(4)
3171        reads += txt.readline()
3172        reads += txt.readline()
3173        reads += txt.readline()
3174        self.assertEqual(reads, self.normalized)
3175
3176    def test_issue1395_4(self):
3177        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3178        txt._CHUNK_SIZE = 4
3179
3180        reads = txt.read(4)
3181        reads += txt.read()
3182        self.assertEqual(reads, self.normalized)
3183
3184    def test_issue1395_5(self):
3185        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3186        txt._CHUNK_SIZE = 4
3187
3188        reads = txt.read(4)
3189        pos = txt.tell()
3190        txt.seek(0)
3191        txt.seek(pos)
3192        self.assertEqual(txt.read(4), "BBB\n")
3193
3194    def test_issue2282(self):
3195        buffer = self.BytesIO(self.testdata)
3196        txt = self.TextIOWrapper(buffer, encoding="ascii")
3197
3198        self.assertEqual(buffer.seekable(), txt.seekable())
3199
3200    def test_append_bom(self):
3201        # The BOM is not written again when appending to a non-empty file
3202        filename = support.TESTFN
3203        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3204            with self.open(filename, 'w', encoding=charset) as f:
3205                f.write('aaa')
3206                pos = f.tell()
3207            with self.open(filename, 'rb') as f:
3208                self.assertEqual(f.read(), 'aaa'.encode(charset))
3209
3210            with self.open(filename, 'a', encoding=charset) as f:
3211                f.write('xxx')
3212            with self.open(filename, 'rb') as f:
3213                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3214
3215    def test_seek_bom(self):
3216        # Same test, but when seeking manually
3217        filename = support.TESTFN
3218        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3219            with self.open(filename, 'w', encoding=charset) as f:
3220                f.write('aaa')
3221                pos = f.tell()
3222            with self.open(filename, 'r+', encoding=charset) as f:
3223                f.seek(pos)
3224                f.write('zzz')
3225                f.seek(0)
3226                f.write('bbb')
3227            with self.open(filename, 'rb') as f:
3228                self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
3229
3230    def test_seek_append_bom(self):
3231        # Same test, but first seek to the start and then to the end
3232        filename = support.TESTFN
3233        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3234            with self.open(filename, 'w', encoding=charset) as f:
3235                f.write('aaa')
3236            with self.open(filename, 'a', encoding=charset) as f:
3237                f.seek(0)
3238                f.seek(0, self.SEEK_END)
3239                f.write('xxx')
3240            with self.open(filename, 'rb') as f:
3241                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3242
3243    def test_errors_property(self):
3244        with self.open(support.TESTFN, "w") as f:
3245            self.assertEqual(f.errors, "strict")
3246        with self.open(support.TESTFN, "w", errors="replace") as f:
3247            self.assertEqual(f.errors, "replace")
3248
3249    @support.no_tracing
3250    def test_threads_write(self):
3251        # Issue6750: concurrent writes could duplicate data
3252        event = threading.Event()
3253        with self.open(support.TESTFN, "w", buffering=1) as f:
3254            def run(n):
3255                text = "Thread%03d\n" % n
3256                event.wait()
3257                f.write(text)
3258            threads = [threading.Thread(target=run, args=(x,))
3259                       for x in range(20)]
3260            with support.start_threads(threads, event.set):
3261                time.sleep(0.02)
3262        with self.open(support.TESTFN) as f:
3263            content = f.read()
3264            for n in range(20):
3265                self.assertEqual(content.count("Thread%03d\n" % n), 1)
3266
3267    def test_flush_error_on_close(self):
3268        # Test that text file is closed despite failed flush
3269        # and that flush() is called before file closed.
3270        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3271        closed = []
3272        def bad_flush():
3273            closed[:] = [txt.closed, txt.buffer.closed]
3274            raise OSError()
3275        txt.flush = bad_flush
3276        self.assertRaises(OSError, txt.close) # exception not swallowed
3277        self.assertTrue(txt.closed)
3278        self.assertTrue(txt.buffer.closed)
3279        self.assertTrue(closed)      # flush() called
3280        self.assertFalse(closed[0])  # flush() called before file closed
3281        self.assertFalse(closed[1])
3282        txt.flush = lambda: None  # break reference loop
3283
3284    def test_close_error_on_close(self):
3285        buffer = self.BytesIO(self.testdata)
3286        def bad_flush():
3287            raise OSError('flush')
3288        def bad_close():
3289            raise OSError('close')
3290        buffer.close = bad_close
3291        txt = self.TextIOWrapper(buffer, encoding="ascii")
3292        txt.flush = bad_flush
3293        with self.assertRaises(OSError) as err: # exception not swallowed
3294            txt.close()
3295        self.assertEqual(err.exception.args, ('close',))
3296        self.assertIsInstance(err.exception.__context__, OSError)
3297        self.assertEqual(err.exception.__context__.args, ('flush',))
3298        self.assertFalse(txt.closed)
3299
3300        # Silence destructor error
3301        buffer.close = lambda: None
3302        txt.flush = lambda: None
3303
3304    def test_nonnormalized_close_error_on_close(self):
3305        # Issue #21677
3306        buffer = self.BytesIO(self.testdata)
3307        def bad_flush():
3308            raise non_existing_flush
3309        def bad_close():
3310            raise non_existing_close
3311        buffer.close = bad_close
3312        txt = self.TextIOWrapper(buffer, encoding="ascii")
3313        txt.flush = bad_flush
3314        with self.assertRaises(NameError) as err: # exception not swallowed
3315            txt.close()
3316        self.assertIn('non_existing_close', str(err.exception))
3317        self.assertIsInstance(err.exception.__context__, NameError)
3318        self.assertIn('non_existing_flush', str(err.exception.__context__))
3319        self.assertFalse(txt.closed)
3320
3321        # Silence destructor error
3322        buffer.close = lambda: None
3323        txt.flush = lambda: None
3324
3325    def test_multi_close(self):
3326        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3327        txt.close()
3328        txt.close()
3329        txt.close()
3330        self.assertRaises(ValueError, txt.flush)
3331
3332    def test_unseekable(self):
3333        txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3334        self.assertRaises(self.UnsupportedOperation, txt.tell)
3335        self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3336
3337    def test_readonly_attributes(self):
3338        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3339        buf = self.BytesIO(self.testdata)
3340        with self.assertRaises(AttributeError):
3341            txt.buffer = buf
3342
3343    def test_rawio(self):
3344        # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3345        # that subprocess.Popen() can have the required unbuffered
3346        # semantics with universal_newlines=True.
3347        raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3348        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3349        # Reads
3350        self.assertEqual(txt.read(4), 'abcd')
3351        self.assertEqual(txt.readline(), 'efghi\n')
3352        self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3353
3354    def test_rawio_write_through(self):
3355        # Issue #12591: with write_through=True, writes don't need a flush
3356        raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3357        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3358                                 write_through=True)
3359        txt.write('1')
3360        txt.write('23\n4')
3361        txt.write('5')
3362        self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3363
3364    def test_bufio_write_through(self):
3365        # Issue #21396: write_through=True doesn't force a flush()
3366        # on the underlying binary buffered object.
3367        flush_called, write_called = [], []
3368        class BufferedWriter(self.BufferedWriter):
3369            def flush(self, *args, **kwargs):
3370                flush_called.append(True)
3371                return super().flush(*args, **kwargs)
3372            def write(self, *args, **kwargs):
3373                write_called.append(True)
3374                return super().write(*args, **kwargs)
3375
3376        rawio = self.BytesIO()
3377        data = b"a"
3378        bufio = BufferedWriter(rawio, len(data)*2)
3379        textio = self.TextIOWrapper(bufio, encoding='ascii',
3380                                    write_through=True)
3381        # write to the buffered io but don't overflow the buffer
3382        text = data.decode('ascii')
3383        textio.write(text)
3384
3385        # buffer.flush is not called with write_through=True
3386        self.assertFalse(flush_called)
3387        # buffer.write *is* called with write_through=True
3388        self.assertTrue(write_called)
3389        self.assertEqual(rawio.getvalue(), b"") # no flush
3390
3391        write_called = [] # reset
3392        textio.write(text * 10) # total content is larger than bufio buffer
3393        self.assertTrue(write_called)
3394        self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3395
3396    def test_reconfigure_write_through(self):
3397        raw = self.MockRawIO([])
3398        t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3399        t.write('1')
3400        t.reconfigure(write_through=True)  # implied flush
3401        self.assertEqual(t.write_through, True)
3402        self.assertEqual(b''.join(raw._write_stack), b'1')
3403        t.write('23')
3404        self.assertEqual(b''.join(raw._write_stack), b'123')
3405        t.reconfigure(write_through=False)
3406        self.assertEqual(t.write_through, False)
3407        t.write('45')
3408        t.flush()
3409        self.assertEqual(b''.join(raw._write_stack), b'12345')
3410        # Keeping default value
3411        t.reconfigure()
3412        t.reconfigure(write_through=None)
3413        self.assertEqual(t.write_through, False)
3414        t.reconfigure(write_through=True)
3415        t.reconfigure()
3416        t.reconfigure(write_through=None)
3417        self.assertEqual(t.write_through, True)
3418
3419    def test_read_nonbytes(self):
3420        # Issue #17106
3421        # Crash when underlying read() returns non-bytes
3422        t = self.TextIOWrapper(self.StringIO('a'))
3423        self.assertRaises(TypeError, t.read, 1)
3424        t = self.TextIOWrapper(self.StringIO('a'))
3425        self.assertRaises(TypeError, t.readline)
3426        t = self.TextIOWrapper(self.StringIO('a'))
3427        self.assertRaises(TypeError, t.read)
3428
3429    def test_illegal_encoder(self):
3430        # Issue 31271: Calling write() while the return value of encoder's
3431        # encode() is invalid shouldn't cause an assertion failure.
3432        rot13 = codecs.lookup("rot13")
3433        with support.swap_attr(rot13, '_is_text_encoding', True):
3434            t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3435        self.assertRaises(TypeError, t.write, 'bar')
3436
3437    def test_illegal_decoder(self):
3438        # Issue #17106
3439        # Bypass the early encoding check added in issue 20404
3440        def _make_illegal_wrapper():
3441            quopri = codecs.lookup("quopri")
3442            quopri._is_text_encoding = True
3443            try:
3444                t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3445                                       newline='\n', encoding="quopri")
3446            finally:
3447                quopri._is_text_encoding = False
3448            return t
3449        # Crash when decoder returns non-string
3450        t = _make_illegal_wrapper()
3451        self.assertRaises(TypeError, t.read, 1)
3452        t = _make_illegal_wrapper()
3453        self.assertRaises(TypeError, t.readline)
3454        t = _make_illegal_wrapper()
3455        self.assertRaises(TypeError, t.read)
3456
3457        # Issue 31243: calling read() while the return value of decoder's
3458        # getstate() is invalid should neither crash the interpreter nor
3459        # raise a SystemError.
3460        def _make_very_illegal_wrapper(getstate_ret_val):
3461            class BadDecoder:
3462                def getstate(self):
3463                    return getstate_ret_val
3464            def _get_bad_decoder(dummy):
3465                return BadDecoder()
3466            quopri = codecs.lookup("quopri")
3467            with support.swap_attr(quopri, 'incrementaldecoder',
3468                                   _get_bad_decoder):
3469                return _make_illegal_wrapper()
3470        t = _make_very_illegal_wrapper(42)
3471        self.assertRaises(TypeError, t.read, 42)
3472        t = _make_very_illegal_wrapper(())
3473        self.assertRaises(TypeError, t.read, 42)
3474        t = _make_very_illegal_wrapper((1, 2))
3475        self.assertRaises(TypeError, t.read, 42)
3476
3477    def _check_create_at_shutdown(self, **kwargs):
3478        # Issue #20037: creating a TextIOWrapper at shutdown
3479        # shouldn't crash the interpreter.
3480        iomod = self.io.__name__
3481        code = """if 1:
3482            import codecs
3483            import {iomod} as io
3484
3485            # Avoid looking up codecs at shutdown
3486            codecs.lookup('utf-8')
3487
3488            class C:
3489                def __init__(self):
3490                    self.buf = io.BytesIO()
3491                def __del__(self):
3492                    io.TextIOWrapper(self.buf, **{kwargs})
3493                    print("ok")
3494            c = C()
3495            """.format(iomod=iomod, kwargs=kwargs)
3496        return assert_python_ok("-c", code)
3497
3498    @support.requires_type_collecting
3499    def test_create_at_shutdown_without_encoding(self):
3500        rc, out, err = self._check_create_at_shutdown()
3501        if err:
3502            # Can error out with a RuntimeError if the module state
3503            # isn't found.
3504            self.assertIn(self.shutdown_error, err.decode())
3505        else:
3506            self.assertEqual("ok", out.decode().strip())
3507
3508    @support.requires_type_collecting
3509    def test_create_at_shutdown_with_encoding(self):
3510        rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3511                                                      errors='strict')
3512        self.assertFalse(err)
3513        self.assertEqual("ok", out.decode().strip())
3514
3515    def test_read_byteslike(self):
3516        r = MemviewBytesIO(b'Just some random string\n')
3517        t = self.TextIOWrapper(r, 'utf-8')
3518
3519        # TextIOwrapper will not read the full string, because
3520        # we truncate it to a multiple of the native int size
3521        # so that we can construct a more complex memoryview.
3522        bytes_val =  _to_memoryview(r.getvalue()).tobytes()
3523
3524        self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3525
3526    def test_issue22849(self):
3527        class F(object):
3528            def readable(self): return True
3529            def writable(self): return True
3530            def seekable(self): return True
3531
3532        for i in range(10):
3533            try:
3534                self.TextIOWrapper(F(), encoding='utf-8')
3535            except Exception:
3536                pass
3537
3538        F.tell = lambda x: 0
3539        t = self.TextIOWrapper(F(), encoding='utf-8')
3540
3541    def test_reconfigure_encoding_read(self):
3542        # latin1 -> utf8
3543        # (latin1 can decode utf-8 encoded string)
3544        data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3545        raw = self.BytesIO(data)
3546        txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3547        self.assertEqual(txt.readline(), 'abc\xe9\n')
3548        with self.assertRaises(self.UnsupportedOperation):
3549            txt.reconfigure(encoding='utf-8')
3550        with self.assertRaises(self.UnsupportedOperation):
3551            txt.reconfigure(newline=None)
3552
3553    def test_reconfigure_write_fromascii(self):
3554        # ascii has a specific encodefunc in the C implementation,
3555        # but utf-8-sig has not. Make sure that we get rid of the
3556        # cached encodefunc when we switch encoders.
3557        raw = self.BytesIO()
3558        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3559        txt.write('foo\n')
3560        txt.reconfigure(encoding='utf-8-sig')
3561        txt.write('\xe9\n')
3562        txt.flush()
3563        self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3564
3565    def test_reconfigure_write(self):
3566        # latin -> utf8
3567        raw = self.BytesIO()
3568        txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3569        txt.write('abc\xe9\n')
3570        txt.reconfigure(encoding='utf-8')
3571        self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3572        txt.write('d\xe9f\n')
3573        txt.flush()
3574        self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3575
3576        # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3577        # the file
3578        raw = self.BytesIO()
3579        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3580        txt.write('abc\n')
3581        txt.reconfigure(encoding='utf-8-sig')
3582        txt.write('d\xe9f\n')
3583        txt.flush()
3584        self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3585
3586    def test_reconfigure_write_non_seekable(self):
3587        raw = self.BytesIO()
3588        raw.seekable = lambda: False
3589        raw.seek = None
3590        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3591        txt.write('abc\n')
3592        txt.reconfigure(encoding='utf-8-sig')
3593        txt.write('d\xe9f\n')
3594        txt.flush()
3595
3596        # If the raw stream is not seekable, there'll be a BOM
3597        self.assertEqual(raw.getvalue(),  b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3598
3599    def test_reconfigure_defaults(self):
3600        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3601        txt.reconfigure(encoding=None)
3602        self.assertEqual(txt.encoding, 'ascii')
3603        self.assertEqual(txt.errors, 'replace')
3604        txt.write('LF\n')
3605
3606        txt.reconfigure(newline='\r\n')
3607        self.assertEqual(txt.encoding, 'ascii')
3608        self.assertEqual(txt.errors, 'replace')
3609
3610        txt.reconfigure(errors='ignore')
3611        self.assertEqual(txt.encoding, 'ascii')
3612        self.assertEqual(txt.errors, 'ignore')
3613        txt.write('CRLF\n')
3614
3615        txt.reconfigure(encoding='utf-8', newline=None)
3616        self.assertEqual(txt.errors, 'strict')
3617        txt.seek(0)
3618        self.assertEqual(txt.read(), 'LF\nCRLF\n')
3619
3620        self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3621
3622    def test_reconfigure_newline(self):
3623        raw = self.BytesIO(b'CR\rEOF')
3624        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3625        txt.reconfigure(newline=None)
3626        self.assertEqual(txt.readline(), 'CR\n')
3627        raw = self.BytesIO(b'CR\rEOF')
3628        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3629        txt.reconfigure(newline='')
3630        self.assertEqual(txt.readline(), 'CR\r')
3631        raw = self.BytesIO(b'CR\rLF\nEOF')
3632        txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3633        txt.reconfigure(newline='\n')
3634        self.assertEqual(txt.readline(), 'CR\rLF\n')
3635        raw = self.BytesIO(b'LF\nCR\rEOF')
3636        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3637        txt.reconfigure(newline='\r')
3638        self.assertEqual(txt.readline(), 'LF\nCR\r')
3639        raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3640        txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3641        txt.reconfigure(newline='\r\n')
3642        self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3643
3644        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3645        txt.reconfigure(newline=None)
3646        txt.write('linesep\n')
3647        txt.reconfigure(newline='')
3648        txt.write('LF\n')
3649        txt.reconfigure(newline='\n')
3650        txt.write('LF\n')
3651        txt.reconfigure(newline='\r')
3652        txt.write('CR\n')
3653        txt.reconfigure(newline='\r\n')
3654        txt.write('CRLF\n')
3655        expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3656        self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3657
3658    def test_issue25862(self):
3659        # Assertion failures occurred in tell() after read() and write().
3660        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3661        t.read(1)
3662        t.read()
3663        t.tell()
3664        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3665        t.read(1)
3666        t.write('x')
3667        t.tell()
3668
3669
3670class MemviewBytesIO(io.BytesIO):
3671    '''A BytesIO object whose read method returns memoryviews
3672       rather than bytes'''
3673
3674    def read1(self, len_):
3675        return _to_memoryview(super().read1(len_))
3676
3677    def read(self, len_):
3678        return _to_memoryview(super().read(len_))
3679
3680def _to_memoryview(buf):
3681    '''Convert bytes-object *buf* to a non-trivial memoryview'''
3682
3683    arr = array.array('i')
3684    idx = len(buf) - len(buf) % arr.itemsize
3685    arr.frombytes(buf[:idx])
3686    return memoryview(arr)
3687
3688
3689class CTextIOWrapperTest(TextIOWrapperTest):
3690    io = io
3691    shutdown_error = "RuntimeError: could not find io module state"
3692
3693    def test_initialization(self):
3694        r = self.BytesIO(b"\xc3\xa9\n\n")
3695        b = self.BufferedReader(r, 1000)
3696        t = self.TextIOWrapper(b)
3697        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3698        self.assertRaises(ValueError, t.read)
3699
3700        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3701        self.assertRaises(Exception, repr, t)
3702
3703    def test_garbage_collection(self):
3704        # C TextIOWrapper objects are collected, and collecting them flushes
3705        # all data to disk.
3706        # The Python version has __del__, so it ends in gc.garbage instead.
3707        with support.check_warnings(('', ResourceWarning)):
3708            rawio = io.FileIO(support.TESTFN, "wb")
3709            b = self.BufferedWriter(rawio)
3710            t = self.TextIOWrapper(b, encoding="ascii")
3711            t.write("456def")
3712            t.x = t
3713            wr = weakref.ref(t)
3714            del t
3715            support.gc_collect()
3716        self.assertIsNone(wr(), wr)
3717        with self.open(support.TESTFN, "rb") as f:
3718            self.assertEqual(f.read(), b"456def")
3719
3720    def test_rwpair_cleared_before_textio(self):
3721        # Issue 13070: TextIOWrapper's finalization would crash when called
3722        # after the reference to the underlying BufferedRWPair's writer got
3723        # cleared by the GC.
3724        for i in range(1000):
3725            b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3726            t1 = self.TextIOWrapper(b1, encoding="ascii")
3727            b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3728            t2 = self.TextIOWrapper(b2, encoding="ascii")
3729            # circular references
3730            t1.buddy = t2
3731            t2.buddy = t1
3732        support.gc_collect()
3733
3734    def test_del__CHUNK_SIZE_SystemError(self):
3735        t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3736        with self.assertRaises(AttributeError):
3737            del t._CHUNK_SIZE
3738
3739
3740class PyTextIOWrapperTest(TextIOWrapperTest):
3741    io = pyio
3742    shutdown_error = "LookupError: unknown encoding: ascii"
3743
3744
3745class IncrementalNewlineDecoderTest(unittest.TestCase):
3746
3747    def check_newline_decoding_utf8(self, decoder):
3748        # UTF-8 specific tests for a newline decoder
3749        def _check_decode(b, s, **kwargs):
3750            # We exercise getstate() / setstate() as well as decode()
3751            state = decoder.getstate()
3752            self.assertEqual(decoder.decode(b, **kwargs), s)
3753            decoder.setstate(state)
3754            self.assertEqual(decoder.decode(b, **kwargs), s)
3755
3756        _check_decode(b'\xe8\xa2\x88', "\u8888")
3757
3758        _check_decode(b'\xe8', "")
3759        _check_decode(b'\xa2', "")
3760        _check_decode(b'\x88', "\u8888")
3761
3762        _check_decode(b'\xe8', "")
3763        _check_decode(b'\xa2', "")
3764        _check_decode(b'\x88', "\u8888")
3765
3766        _check_decode(b'\xe8', "")
3767        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3768
3769        decoder.reset()
3770        _check_decode(b'\n', "\n")
3771        _check_decode(b'\r', "")
3772        _check_decode(b'', "\n", final=True)
3773        _check_decode(b'\r', "\n", final=True)
3774
3775        _check_decode(b'\r', "")
3776        _check_decode(b'a', "\na")
3777
3778        _check_decode(b'\r\r\n', "\n\n")
3779        _check_decode(b'\r', "")
3780        _check_decode(b'\r', "\n")
3781        _check_decode(b'\na', "\na")
3782
3783        _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3784        _check_decode(b'\xe8\xa2\x88', "\u8888")
3785        _check_decode(b'\n', "\n")
3786        _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3787        _check_decode(b'\n', "\n")
3788
3789    def check_newline_decoding(self, decoder, encoding):
3790        result = []
3791        if encoding is not None:
3792            encoder = codecs.getincrementalencoder(encoding)()
3793            def _decode_bytewise(s):
3794                # Decode one byte at a time
3795                for b in encoder.encode(s):
3796                    result.append(decoder.decode(bytes([b])))
3797        else:
3798            encoder = None
3799            def _decode_bytewise(s):
3800                # Decode one char at a time
3801                for c in s:
3802                    result.append(decoder.decode(c))
3803        self.assertEqual(decoder.newlines, None)
3804        _decode_bytewise("abc\n\r")
3805        self.assertEqual(decoder.newlines, '\n')
3806        _decode_bytewise("\nabc")
3807        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
3808        _decode_bytewise("abc\r")
3809        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
3810        _decode_bytewise("abc")
3811        self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
3812        _decode_bytewise("abc\r")
3813        self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
3814        decoder.reset()
3815        input = "abc"
3816        if encoder is not None:
3817            encoder.reset()
3818            input = encoder.encode(input)
3819        self.assertEqual(decoder.decode(input), "abc")
3820        self.assertEqual(decoder.newlines, None)
3821
3822    def test_newline_decoder(self):
3823        encodings = (
3824            # None meaning the IncrementalNewlineDecoder takes unicode input
3825            # rather than bytes input
3826            None, 'utf-8', 'latin-1',
3827            'utf-16', 'utf-16-le', 'utf-16-be',
3828            'utf-32', 'utf-32-le', 'utf-32-be',
3829        )
3830        for enc in encodings:
3831            decoder = enc and codecs.getincrementaldecoder(enc)()
3832            decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3833            self.check_newline_decoding(decoder, enc)
3834        decoder = codecs.getincrementaldecoder("utf-8")()
3835        decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3836        self.check_newline_decoding_utf8(decoder)
3837        self.assertRaises(TypeError, decoder.setstate, 42)
3838
3839    def test_newline_bytes(self):
3840        # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3841        def _check(dec):
3842            self.assertEqual(dec.newlines, None)
3843            self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3844            self.assertEqual(dec.newlines, None)
3845            self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3846            self.assertEqual(dec.newlines, None)
3847        dec = self.IncrementalNewlineDecoder(None, translate=False)
3848        _check(dec)
3849        dec = self.IncrementalNewlineDecoder(None, translate=True)
3850        _check(dec)
3851
3852    def test_translate(self):
3853        # issue 35062
3854        for translate in (-2, -1, 1, 2):
3855            decoder = codecs.getincrementaldecoder("utf-8")()
3856            decoder = self.IncrementalNewlineDecoder(decoder, translate)
3857            self.check_newline_decoding_utf8(decoder)
3858        decoder = codecs.getincrementaldecoder("utf-8")()
3859        decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3860        self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3861
3862class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3863    pass
3864
3865class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3866    pass
3867
3868
3869# XXX Tests for open()
3870
3871class MiscIOTest(unittest.TestCase):
3872
3873    def tearDown(self):
3874        support.unlink(support.TESTFN)
3875
3876    def test___all__(self):
3877        for name in self.io.__all__:
3878            obj = getattr(self.io, name, None)
3879            self.assertIsNotNone(obj, name)
3880            if name in ("open", "open_code"):
3881                continue
3882            elif "error" in name.lower() or name == "UnsupportedOperation":
3883                self.assertTrue(issubclass(obj, Exception), name)
3884            elif not name.startswith("SEEK_"):
3885                self.assertTrue(issubclass(obj, self.IOBase))
3886
3887    def test_attributes(self):
3888        f = self.open(support.TESTFN, "wb", buffering=0)
3889        self.assertEqual(f.mode, "wb")
3890        f.close()
3891
3892        with support.check_warnings(('', DeprecationWarning)):
3893            f = self.open(support.TESTFN, "U")
3894        self.assertEqual(f.name,            support.TESTFN)
3895        self.assertEqual(f.buffer.name,     support.TESTFN)
3896        self.assertEqual(f.buffer.raw.name, support.TESTFN)
3897        self.assertEqual(f.mode,            "U")
3898        self.assertEqual(f.buffer.mode,     "rb")
3899        self.assertEqual(f.buffer.raw.mode, "rb")
3900        f.close()
3901
3902        f = self.open(support.TESTFN, "w+")
3903        self.assertEqual(f.mode,            "w+")
3904        self.assertEqual(f.buffer.mode,     "rb+") # Does it really matter?
3905        self.assertEqual(f.buffer.raw.mode, "rb+")
3906
3907        g = self.open(f.fileno(), "wb", closefd=False)
3908        self.assertEqual(g.mode,     "wb")
3909        self.assertEqual(g.raw.mode, "wb")
3910        self.assertEqual(g.name,     f.fileno())
3911        self.assertEqual(g.raw.name, f.fileno())
3912        f.close()
3913        g.close()
3914
3915    def test_open_pipe_with_append(self):
3916        # bpo-27805: Ignore ESPIPE from lseek() in open().
3917        r, w = os.pipe()
3918        self.addCleanup(os.close, r)
3919        f = self.open(w, 'a')
3920        self.addCleanup(f.close)
3921        # Check that the file is marked non-seekable. On Windows, however, lseek
3922        # somehow succeeds on pipes.
3923        if sys.platform != 'win32':
3924            self.assertFalse(f.seekable())
3925
3926    def test_io_after_close(self):
3927        for kwargs in [
3928                {"mode": "w"},
3929                {"mode": "wb"},
3930                {"mode": "w", "buffering": 1},
3931                {"mode": "w", "buffering": 2},
3932                {"mode": "wb", "buffering": 0},
3933                {"mode": "r"},
3934                {"mode": "rb"},
3935                {"mode": "r", "buffering": 1},
3936                {"mode": "r", "buffering": 2},
3937                {"mode": "rb", "buffering": 0},
3938                {"mode": "w+"},
3939                {"mode": "w+b"},
3940                {"mode": "w+", "buffering": 1},
3941                {"mode": "w+", "buffering": 2},
3942                {"mode": "w+b", "buffering": 0},
3943            ]:
3944            f = self.open(support.TESTFN, **kwargs)
3945            f.close()
3946            self.assertRaises(ValueError, f.flush)
3947            self.assertRaises(ValueError, f.fileno)
3948            self.assertRaises(ValueError, f.isatty)
3949            self.assertRaises(ValueError, f.__iter__)
3950            if hasattr(f, "peek"):
3951                self.assertRaises(ValueError, f.peek, 1)
3952            self.assertRaises(ValueError, f.read)
3953            if hasattr(f, "read1"):
3954                self.assertRaises(ValueError, f.read1, 1024)
3955                self.assertRaises(ValueError, f.read1)
3956            if hasattr(f, "readall"):
3957                self.assertRaises(ValueError, f.readall)
3958            if hasattr(f, "readinto"):
3959                self.assertRaises(ValueError, f.readinto, bytearray(1024))
3960            if hasattr(f, "readinto1"):
3961                self.assertRaises(ValueError, f.readinto1, bytearray(1024))
3962            self.assertRaises(ValueError, f.readline)
3963            self.assertRaises(ValueError, f.readlines)
3964            self.assertRaises(ValueError, f.readlines, 1)
3965            self.assertRaises(ValueError, f.seek, 0)
3966            self.assertRaises(ValueError, f.tell)
3967            self.assertRaises(ValueError, f.truncate)
3968            self.assertRaises(ValueError, f.write,
3969                              b"" if "b" in kwargs['mode'] else "")
3970            self.assertRaises(ValueError, f.writelines, [])
3971            self.assertRaises(ValueError, next, f)
3972
3973    def test_blockingioerror(self):
3974        # Various BlockingIOError issues
3975        class C(str):
3976            pass
3977        c = C("")
3978        b = self.BlockingIOError(1, c)
3979        c.b = b
3980        b.c = c
3981        wr = weakref.ref(c)
3982        del c, b
3983        support.gc_collect()
3984        self.assertIsNone(wr(), wr)
3985
3986    def test_abcs(self):
3987        # Test the visible base classes are ABCs.
3988        self.assertIsInstance(self.IOBase, abc.ABCMeta)
3989        self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3990        self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3991        self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
3992
3993    def _check_abc_inheritance(self, abcmodule):
3994        with self.open(support.TESTFN, "wb", buffering=0) as f:
3995            self.assertIsInstance(f, abcmodule.IOBase)
3996            self.assertIsInstance(f, abcmodule.RawIOBase)
3997            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3998            self.assertNotIsInstance(f, abcmodule.TextIOBase)
3999        with self.open(support.TESTFN, "wb") as f:
4000            self.assertIsInstance(f, abcmodule.IOBase)
4001            self.assertNotIsInstance(f, abcmodule.RawIOBase)
4002            self.assertIsInstance(f, abcmodule.BufferedIOBase)
4003            self.assertNotIsInstance(f, abcmodule.TextIOBase)
4004        with self.open(support.TESTFN, "w") as f:
4005            self.assertIsInstance(f, abcmodule.IOBase)
4006            self.assertNotIsInstance(f, abcmodule.RawIOBase)
4007            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4008            self.assertIsInstance(f, abcmodule.TextIOBase)
4009
4010    def test_abc_inheritance(self):
4011        # Test implementations inherit from their respective ABCs
4012        self._check_abc_inheritance(self)
4013
4014    def test_abc_inheritance_official(self):
4015        # Test implementations inherit from the official ABCs of the
4016        # baseline "io" module.
4017        self._check_abc_inheritance(io)
4018
4019    def _check_warn_on_dealloc(self, *args, **kwargs):
4020        f = open(*args, **kwargs)
4021        r = repr(f)
4022        with self.assertWarns(ResourceWarning) as cm:
4023            f = None
4024            support.gc_collect()
4025        self.assertIn(r, str(cm.warning.args[0]))
4026
4027    def test_warn_on_dealloc(self):
4028        self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
4029        self._check_warn_on_dealloc(support.TESTFN, "wb")
4030        self._check_warn_on_dealloc(support.TESTFN, "w")
4031
4032    def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4033        fds = []
4034        def cleanup_fds():
4035            for fd in fds:
4036                try:
4037                    os.close(fd)
4038                except OSError as e:
4039                    if e.errno != errno.EBADF:
4040                        raise
4041        self.addCleanup(cleanup_fds)
4042        r, w = os.pipe()
4043        fds += r, w
4044        self._check_warn_on_dealloc(r, *args, **kwargs)
4045        # When using closefd=False, there's no warning
4046        r, w = os.pipe()
4047        fds += r, w
4048        with support.check_no_resource_warning(self):
4049            open(r, *args, closefd=False, **kwargs)
4050
4051    def test_warn_on_dealloc_fd(self):
4052        self._check_warn_on_dealloc_fd("rb", buffering=0)
4053        self._check_warn_on_dealloc_fd("rb")
4054        self._check_warn_on_dealloc_fd("r")
4055
4056
4057    def test_pickling(self):
4058        # Pickling file objects is forbidden
4059        for kwargs in [
4060                {"mode": "w"},
4061                {"mode": "wb"},
4062                {"mode": "wb", "buffering": 0},
4063                {"mode": "r"},
4064                {"mode": "rb"},
4065                {"mode": "rb", "buffering": 0},
4066                {"mode": "w+"},
4067                {"mode": "w+b"},
4068                {"mode": "w+b", "buffering": 0},
4069            ]:
4070            for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
4071                with self.open(support.TESTFN, **kwargs) as f:
4072                    self.assertRaises(TypeError, pickle.dumps, f, protocol)
4073
4074    def test_nonblock_pipe_write_bigbuf(self):
4075        self._test_nonblock_pipe_write(16*1024)
4076
4077    def test_nonblock_pipe_write_smallbuf(self):
4078        self._test_nonblock_pipe_write(1024)
4079
4080    @unittest.skipUnless(hasattr(os, 'set_blocking'),
4081                         'os.set_blocking() required for this test')
4082    def _test_nonblock_pipe_write(self, bufsize):
4083        sent = []
4084        received = []
4085        r, w = os.pipe()
4086        os.set_blocking(r, False)
4087        os.set_blocking(w, False)
4088
4089        # To exercise all code paths in the C implementation we need
4090        # to play with buffer sizes.  For instance, if we choose a
4091        # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4092        # then we will never get a partial write of the buffer.
4093        rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4094        wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4095
4096        with rf, wf:
4097            for N in 9999, 73, 7574:
4098                try:
4099                    i = 0
4100                    while True:
4101                        msg = bytes([i % 26 + 97]) * N
4102                        sent.append(msg)
4103                        wf.write(msg)
4104                        i += 1
4105
4106                except self.BlockingIOError as e:
4107                    self.assertEqual(e.args[0], errno.EAGAIN)
4108                    self.assertEqual(e.args[2], e.characters_written)
4109                    sent[-1] = sent[-1][:e.characters_written]
4110                    received.append(rf.read())
4111                    msg = b'BLOCKED'
4112                    wf.write(msg)
4113                    sent.append(msg)
4114
4115            while True:
4116                try:
4117                    wf.flush()
4118                    break
4119                except self.BlockingIOError as e:
4120                    self.assertEqual(e.args[0], errno.EAGAIN)
4121                    self.assertEqual(e.args[2], e.characters_written)
4122                    self.assertEqual(e.characters_written, 0)
4123                    received.append(rf.read())
4124
4125            received += iter(rf.read, None)
4126
4127        sent, received = b''.join(sent), b''.join(received)
4128        self.assertEqual(sent, received)
4129        self.assertTrue(wf.closed)
4130        self.assertTrue(rf.closed)
4131
4132    def test_create_fail(self):
4133        # 'x' mode fails if file is existing
4134        with self.open(support.TESTFN, 'w'):
4135            pass
4136        self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
4137
4138    def test_create_writes(self):
4139        # 'x' mode opens for writing
4140        with self.open(support.TESTFN, 'xb') as f:
4141            f.write(b"spam")
4142        with self.open(support.TESTFN, 'rb') as f:
4143            self.assertEqual(b"spam", f.read())
4144
4145    def test_open_allargs(self):
4146        # there used to be a buffer overflow in the parser for rawmode
4147        self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4148
4149
4150class CMiscIOTest(MiscIOTest):
4151    io = io
4152
4153    def test_readinto_buffer_overflow(self):
4154        # Issue #18025
4155        class BadReader(self.io.BufferedIOBase):
4156            def read(self, n=-1):
4157                return b'x' * 10**6
4158        bufio = BadReader()
4159        b = bytearray(2)
4160        self.assertRaises(ValueError, bufio.readinto, b)
4161
4162    def check_daemon_threads_shutdown_deadlock(self, stream_name):
4163        # Issue #23309: deadlocks at shutdown should be avoided when a
4164        # daemon thread and the main thread both write to a file.
4165        code = """if 1:
4166            import sys
4167            import time
4168            import threading
4169            from test.support import SuppressCrashReport
4170
4171            file = sys.{stream_name}
4172
4173            def run():
4174                while True:
4175                    file.write('.')
4176                    file.flush()
4177
4178            crash = SuppressCrashReport()
4179            crash.__enter__()
4180            # don't call __exit__(): the crash occurs at Python shutdown
4181
4182            thread = threading.Thread(target=run)
4183            thread.daemon = True
4184            thread.start()
4185
4186            time.sleep(0.5)
4187            file.write('!')
4188            file.flush()
4189            """.format_map(locals())
4190        res, _ = run_python_until_end("-c", code)
4191        err = res.err.decode()
4192        if res.rc != 0:
4193            # Failure: should be a fatal error
4194            pattern = (r"Fatal Python error: could not acquire lock "
4195                       r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4196                       r"at interpreter shutdown, possibly due to "
4197                       r"daemon threads".format_map(locals()))
4198            self.assertRegex(err, pattern)
4199        else:
4200            self.assertFalse(err.strip('.!'))
4201
4202    def test_daemon_threads_shutdown_stdout_deadlock(self):
4203        self.check_daemon_threads_shutdown_deadlock('stdout')
4204
4205    def test_daemon_threads_shutdown_stderr_deadlock(self):
4206        self.check_daemon_threads_shutdown_deadlock('stderr')
4207
4208
4209class PyMiscIOTest(MiscIOTest):
4210    io = pyio
4211
4212
4213@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4214class SignalsTest(unittest.TestCase):
4215
4216    def setUp(self):
4217        self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4218
4219    def tearDown(self):
4220        signal.signal(signal.SIGALRM, self.oldalrm)
4221
4222    def alarm_interrupt(self, sig, frame):
4223        1/0
4224
4225    def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4226        """Check that a partial write, when it gets interrupted, properly
4227        invokes the signal handler, and bubbles up the exception raised
4228        in the latter."""
4229        read_results = []
4230        def _read():
4231            s = os.read(r, 1)
4232            read_results.append(s)
4233
4234        t = threading.Thread(target=_read)
4235        t.daemon = True
4236        r, w = os.pipe()
4237        fdopen_kwargs["closefd"] = False
4238        large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
4239        try:
4240            wio = self.io.open(w, **fdopen_kwargs)
4241            if hasattr(signal, 'pthread_sigmask'):
4242                # create the thread with SIGALRM signal blocked
4243                signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4244                t.start()
4245                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4246            else:
4247                t.start()
4248
4249            # Fill the pipe enough that the write will be blocking.
4250            # It will be interrupted by the timer armed above.  Since the
4251            # other thread has read one byte, the low-level write will
4252            # return with a successful (partial) result rather than an EINTR.
4253            # The buffered IO layer must check for pending signal
4254            # handlers, which in this case will invoke alarm_interrupt().
4255            signal.alarm(1)
4256            try:
4257                self.assertRaises(ZeroDivisionError, wio.write, large_data)
4258            finally:
4259                signal.alarm(0)
4260                t.join()
4261            # We got one byte, get another one and check that it isn't a
4262            # repeat of the first one.
4263            read_results.append(os.read(r, 1))
4264            self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4265        finally:
4266            os.close(w)
4267            os.close(r)
4268            # This is deliberate. If we didn't close the file descriptor
4269            # before closing wio, wio would try to flush its internal
4270            # buffer, and block again.
4271            try:
4272                wio.close()
4273            except OSError as e:
4274                if e.errno != errno.EBADF:
4275                    raise
4276
4277    def test_interrupted_write_unbuffered(self):
4278        self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4279
4280    def test_interrupted_write_buffered(self):
4281        self.check_interrupted_write(b"xy", b"xy", mode="wb")
4282
4283    def test_interrupted_write_text(self):
4284        self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4285
4286    @support.no_tracing
4287    def check_reentrant_write(self, data, **fdopen_kwargs):
4288        def on_alarm(*args):
4289            # Will be called reentrantly from the same thread
4290            wio.write(data)
4291            1/0
4292        signal.signal(signal.SIGALRM, on_alarm)
4293        r, w = os.pipe()
4294        wio = self.io.open(w, **fdopen_kwargs)
4295        try:
4296            signal.alarm(1)
4297            # Either the reentrant call to wio.write() fails with RuntimeError,
4298            # or the signal handler raises ZeroDivisionError.
4299            with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4300                while 1:
4301                    for i in range(100):
4302                        wio.write(data)
4303                        wio.flush()
4304                    # Make sure the buffer doesn't fill up and block further writes
4305                    os.read(r, len(data) * 100)
4306            exc = cm.exception
4307            if isinstance(exc, RuntimeError):
4308                self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4309        finally:
4310            signal.alarm(0)
4311            wio.close()
4312            os.close(r)
4313
4314    def test_reentrant_write_buffered(self):
4315        self.check_reentrant_write(b"xy", mode="wb")
4316
4317    def test_reentrant_write_text(self):
4318        self.check_reentrant_write("xy", mode="w", encoding="ascii")
4319
4320    def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4321        """Check that a buffered read, when it gets interrupted (either
4322        returning a partial result or EINTR), properly invokes the signal
4323        handler and retries if the latter returned successfully."""
4324        r, w = os.pipe()
4325        fdopen_kwargs["closefd"] = False
4326        def alarm_handler(sig, frame):
4327            os.write(w, b"bar")
4328        signal.signal(signal.SIGALRM, alarm_handler)
4329        try:
4330            rio = self.io.open(r, **fdopen_kwargs)
4331            os.write(w, b"foo")
4332            signal.alarm(1)
4333            # Expected behaviour:
4334            # - first raw read() returns partial b"foo"
4335            # - second raw read() returns EINTR
4336            # - third raw read() returns b"bar"
4337            self.assertEqual(decode(rio.read(6)), "foobar")
4338        finally:
4339            signal.alarm(0)
4340            rio.close()
4341            os.close(w)
4342            os.close(r)
4343
4344    def test_interrupted_read_retry_buffered(self):
4345        self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4346                                          mode="rb")
4347
4348    def test_interrupted_read_retry_text(self):
4349        self.check_interrupted_read_retry(lambda x: x,
4350                                          mode="r")
4351
4352    def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4353        """Check that a buffered write, when it gets interrupted (either
4354        returning a partial result or EINTR), properly invokes the signal
4355        handler and retries if the latter returned successfully."""
4356        select = support.import_module("select")
4357
4358        # A quantity that exceeds the buffer size of an anonymous pipe's
4359        # write end.
4360        N = support.PIPE_MAX_SIZE
4361        r, w = os.pipe()
4362        fdopen_kwargs["closefd"] = False
4363
4364        # We need a separate thread to read from the pipe and allow the
4365        # write() to finish.  This thread is started after the SIGALRM is
4366        # received (forcing a first EINTR in write()).
4367        read_results = []
4368        write_finished = False
4369        error = None
4370        def _read():
4371            try:
4372                while not write_finished:
4373                    while r in select.select([r], [], [], 1.0)[0]:
4374                        s = os.read(r, 1024)
4375                        read_results.append(s)
4376            except BaseException as exc:
4377                nonlocal error
4378                error = exc
4379        t = threading.Thread(target=_read)
4380        t.daemon = True
4381        def alarm1(sig, frame):
4382            signal.signal(signal.SIGALRM, alarm2)
4383            signal.alarm(1)
4384        def alarm2(sig, frame):
4385            t.start()
4386
4387        large_data = item * N
4388        signal.signal(signal.SIGALRM, alarm1)
4389        try:
4390            wio = self.io.open(w, **fdopen_kwargs)
4391            signal.alarm(1)
4392            # Expected behaviour:
4393            # - first raw write() is partial (because of the limited pipe buffer
4394            #   and the first alarm)
4395            # - second raw write() returns EINTR (because of the second alarm)
4396            # - subsequent write()s are successful (either partial or complete)
4397            written = wio.write(large_data)
4398            self.assertEqual(N, written)
4399
4400            wio.flush()
4401            write_finished = True
4402            t.join()
4403
4404            self.assertIsNone(error)
4405            self.assertEqual(N, sum(len(x) for x in read_results))
4406        finally:
4407            signal.alarm(0)
4408            write_finished = True
4409            os.close(w)
4410            os.close(r)
4411            # This is deliberate. If we didn't close the file descriptor
4412            # before closing wio, wio would try to flush its internal
4413            # buffer, and could block (in case of failure).
4414            try:
4415                wio.close()
4416            except OSError as e:
4417                if e.errno != errno.EBADF:
4418                    raise
4419
4420    def test_interrupted_write_retry_buffered(self):
4421        self.check_interrupted_write_retry(b"x", mode="wb")
4422
4423    def test_interrupted_write_retry_text(self):
4424        self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4425
4426
4427class CSignalsTest(SignalsTest):
4428    io = io
4429
4430class PySignalsTest(SignalsTest):
4431    io = pyio
4432
4433    # Handling reentrancy issues would slow down _pyio even more, so the
4434    # tests are disabled.
4435    test_reentrant_write_buffered = None
4436    test_reentrant_write_text = None
4437
4438
4439def load_tests(*args):
4440    tests = (CIOTest, PyIOTest, APIMismatchTest,
4441             CBufferedReaderTest, PyBufferedReaderTest,
4442             CBufferedWriterTest, PyBufferedWriterTest,
4443             CBufferedRWPairTest, PyBufferedRWPairTest,
4444             CBufferedRandomTest, PyBufferedRandomTest,
4445             StatefulIncrementalDecoderTest,
4446             CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4447             CTextIOWrapperTest, PyTextIOWrapperTest,
4448             CMiscIOTest, PyMiscIOTest,
4449             CSignalsTest, PySignalsTest,
4450             )
4451
4452    # Put the namespaces of the IO module we are testing and some useful mock
4453    # classes in the __dict__ of each test.
4454    mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
4455             MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4456             SlowFlushRawIO)
4457    all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4458    c_io_ns = {name : getattr(io, name) for name in all_members}
4459    py_io_ns = {name : getattr(pyio, name) for name in all_members}
4460    globs = globals()
4461    c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4462    py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4463    # Avoid turning open into a bound method.
4464    py_io_ns["open"] = pyio.OpenWrapper
4465    for test in tests:
4466        if test.__name__.startswith("C"):
4467            for name, obj in c_io_ns.items():
4468                setattr(test, name, obj)
4469        elif test.__name__.startswith("Py"):
4470            for name, obj in py_io_ns.items():
4471                setattr(test, name, obj)
4472
4473    suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4474    return suite
4475
4476if __name__ == "__main__":
4477    unittest.main()
4478