• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11#     (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as an attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
22import abc
23import array
24import errno
25import locale
26import os
27import pickle
28import random
29import signal
30import sys
31import textwrap
32import threading
33import time
34import unittest
35import warnings
36import weakref
37from collections import deque, UserList
38from itertools import cycle, count
39from test import support
40from test.support.script_helper import (
41    assert_python_ok, assert_python_failure, run_python_until_end)
42from test.support import import_helper
43from test.support import os_helper
44from test.support import threading_helper
45from test.support import warnings_helper
46from test.support import skip_if_sanitizer
47from test.support.os_helper import FakePath
48
49import codecs
50import io  # C implementation of io
51import _pyio as pyio # Python implementation of io
52
53try:
54    import ctypes
55except ImportError:
56    def byteslike(*pos, **kw):
57        return array.array("b", bytes(*pos, **kw))
58else:
59    def byteslike(*pos, **kw):
60        """Create a bytes-like object having no string or sequence methods"""
61        data = bytes(*pos, **kw)
62        obj = EmptyStruct()
63        ctypes.resize(obj, len(data))
64        memoryview(obj).cast("B")[:] = data
65        return obj
66    class EmptyStruct(ctypes.Structure):
67        pass
68
69# Does io.IOBase finalizer log the exception if the close() method fails?
70# The exception is ignored silently by default in release build.
71IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
72
73
74def _default_chunk_size():
75    """Get the default TextIOWrapper chunk size"""
76    with open(__file__, "r", encoding="latin-1") as f:
77        return f._CHUNK_SIZE
78
79
80class MockRawIOWithoutRead:
81    """A RawIO implementation without read(), so as to exercise the default
82    RawIO.read() which calls readinto()."""
83
84    def __init__(self, read_stack=()):
85        self._read_stack = list(read_stack)
86        self._write_stack = []
87        self._reads = 0
88        self._extraneous_reads = 0
89
90    def write(self, b):
91        self._write_stack.append(bytes(b))
92        return len(b)
93
94    def writable(self):
95        return True
96
97    def fileno(self):
98        return 42
99
100    def readable(self):
101        return True
102
103    def seekable(self):
104        return True
105
106    def seek(self, pos, whence):
107        return 0   # wrong but we gotta return something
108
109    def tell(self):
110        return 0   # same comment as above
111
112    def readinto(self, buf):
113        self._reads += 1
114        max_len = len(buf)
115        try:
116            data = self._read_stack[0]
117        except IndexError:
118            self._extraneous_reads += 1
119            return 0
120        if data is None:
121            del self._read_stack[0]
122            return None
123        n = len(data)
124        if len(data) <= max_len:
125            del self._read_stack[0]
126            buf[:n] = data
127            return n
128        else:
129            buf[:] = data[:max_len]
130            self._read_stack[0] = data[max_len:]
131            return max_len
132
133    def truncate(self, pos=None):
134        return pos
135
136class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
137    pass
138
139class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
140    pass
141
142
143class MockRawIO(MockRawIOWithoutRead):
144
145    def read(self, n=None):
146        self._reads += 1
147        try:
148            return self._read_stack.pop(0)
149        except:
150            self._extraneous_reads += 1
151            return b""
152
153class CMockRawIO(MockRawIO, io.RawIOBase):
154    pass
155
156class PyMockRawIO(MockRawIO, pyio.RawIOBase):
157    pass
158
159
160class MisbehavedRawIO(MockRawIO):
161    def write(self, b):
162        return super().write(b) * 2
163
164    def read(self, n=None):
165        return super().read(n) * 2
166
167    def seek(self, pos, whence):
168        return -123
169
170    def tell(self):
171        return -456
172
173    def readinto(self, buf):
174        super().readinto(buf)
175        return len(buf) * 5
176
177class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
178    pass
179
180class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
181    pass
182
183
184class SlowFlushRawIO(MockRawIO):
185    def __init__(self):
186        super().__init__()
187        self.in_flush = threading.Event()
188
189    def flush(self):
190        self.in_flush.set()
191        time.sleep(0.25)
192
193class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
194    pass
195
196class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
197    pass
198
199
200class CloseFailureIO(MockRawIO):
201    closed = 0
202
203    def close(self):
204        if not self.closed:
205            self.closed = 1
206            raise OSError
207
208class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
209    pass
210
211class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
212    pass
213
214
215class MockFileIO:
216
217    def __init__(self, data):
218        self.read_history = []
219        super().__init__(data)
220
221    def read(self, n=None):
222        res = super().read(n)
223        self.read_history.append(None if res is None else len(res))
224        return res
225
226    def readinto(self, b):
227        res = super().readinto(b)
228        self.read_history.append(res)
229        return res
230
231class CMockFileIO(MockFileIO, io.BytesIO):
232    pass
233
234class PyMockFileIO(MockFileIO, pyio.BytesIO):
235    pass
236
237
238class MockUnseekableIO:
239    def seekable(self):
240        return False
241
242    def seek(self, *args):
243        raise self.UnsupportedOperation("not seekable")
244
245    def tell(self, *args):
246        raise self.UnsupportedOperation("not seekable")
247
248    def truncate(self, *args):
249        raise self.UnsupportedOperation("not seekable")
250
251class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
252    UnsupportedOperation = io.UnsupportedOperation
253
254class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
255    UnsupportedOperation = pyio.UnsupportedOperation
256
257
258class MockNonBlockWriterIO:
259
260    def __init__(self):
261        self._write_stack = []
262        self._blocker_char = None
263
264    def pop_written(self):
265        s = b"".join(self._write_stack)
266        self._write_stack[:] = []
267        return s
268
269    def block_on(self, char):
270        """Block when a given char is encountered."""
271        self._blocker_char = char
272
273    def readable(self):
274        return True
275
276    def seekable(self):
277        return True
278
279    def seek(self, pos, whence=0):
280        # naive implementation, enough for tests
281        return 0
282
283    def writable(self):
284        return True
285
286    def write(self, b):
287        b = bytes(b)
288        n = -1
289        if self._blocker_char:
290            try:
291                n = b.index(self._blocker_char)
292            except ValueError:
293                pass
294            else:
295                if n > 0:
296                    # write data up to the first blocker
297                    self._write_stack.append(b[:n])
298                    return n
299                else:
300                    # cancel blocker and indicate would block
301                    self._blocker_char = None
302                    return None
303        self._write_stack.append(b)
304        return len(b)
305
306class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
307    BlockingIOError = io.BlockingIOError
308
309class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
310    BlockingIOError = pyio.BlockingIOError
311
312
313class IOTest(unittest.TestCase):
314
315    def setUp(self):
316        os_helper.unlink(os_helper.TESTFN)
317
318    def tearDown(self):
319        os_helper.unlink(os_helper.TESTFN)
320
321    def write_ops(self, f):
322        self.assertEqual(f.write(b"blah."), 5)
323        f.truncate(0)
324        self.assertEqual(f.tell(), 5)
325        f.seek(0)
326
327        self.assertEqual(f.write(b"blah."), 5)
328        self.assertEqual(f.seek(0), 0)
329        self.assertEqual(f.write(b"Hello."), 6)
330        self.assertEqual(f.tell(), 6)
331        self.assertEqual(f.seek(-1, 1), 5)
332        self.assertEqual(f.tell(), 5)
333        buffer = bytearray(b" world\n\n\n")
334        self.assertEqual(f.write(buffer), 9)
335        buffer[:] = b"*" * 9  # Overwrite our copy of the data
336        self.assertEqual(f.seek(0), 0)
337        self.assertEqual(f.write(b"h"), 1)
338        self.assertEqual(f.seek(-1, 2), 13)
339        self.assertEqual(f.tell(), 13)
340
341        self.assertEqual(f.truncate(12), 12)
342        self.assertEqual(f.tell(), 13)
343        self.assertRaises(TypeError, f.seek, 0.0)
344
345    def read_ops(self, f, buffered=False):
346        data = f.read(5)
347        self.assertEqual(data, b"hello")
348        data = byteslike(data)
349        self.assertEqual(f.readinto(data), 5)
350        self.assertEqual(bytes(data), b" worl")
351        data = bytearray(5)
352        self.assertEqual(f.readinto(data), 2)
353        self.assertEqual(len(data), 5)
354        self.assertEqual(data[:2], b"d\n")
355        self.assertEqual(f.seek(0), 0)
356        self.assertEqual(f.read(20), b"hello world\n")
357        self.assertEqual(f.read(1), b"")
358        self.assertEqual(f.readinto(byteslike(b"x")), 0)
359        self.assertEqual(f.seek(-6, 2), 6)
360        self.assertEqual(f.read(5), b"world")
361        self.assertEqual(f.read(0), b"")
362        self.assertEqual(f.readinto(byteslike()), 0)
363        self.assertEqual(f.seek(-6, 1), 5)
364        self.assertEqual(f.read(5), b" worl")
365        self.assertEqual(f.tell(), 10)
366        self.assertRaises(TypeError, f.seek, 0.0)
367        if buffered:
368            f.seek(0)
369            self.assertEqual(f.read(), b"hello world\n")
370            f.seek(6)
371            self.assertEqual(f.read(), b"world\n")
372            self.assertEqual(f.read(), b"")
373            f.seek(0)
374            data = byteslike(5)
375            self.assertEqual(f.readinto1(data), 5)
376            self.assertEqual(bytes(data), b"hello")
377
378    LARGE = 2**31
379
380    def large_file_ops(self, f):
381        assert f.readable()
382        assert f.writable()
383        try:
384            self.assertEqual(f.seek(self.LARGE), self.LARGE)
385        except (OverflowError, ValueError):
386            self.skipTest("no largefile support")
387        self.assertEqual(f.tell(), self.LARGE)
388        self.assertEqual(f.write(b"xxx"), 3)
389        self.assertEqual(f.tell(), self.LARGE + 3)
390        self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
391        self.assertEqual(f.truncate(), self.LARGE + 2)
392        self.assertEqual(f.tell(), self.LARGE + 2)
393        self.assertEqual(f.seek(0, 2), self.LARGE + 2)
394        self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
395        self.assertEqual(f.tell(), self.LARGE + 2)
396        self.assertEqual(f.seek(0, 2), self.LARGE + 1)
397        self.assertEqual(f.seek(-1, 2), self.LARGE)
398        self.assertEqual(f.read(2), b"x")
399
400    def test_invalid_operations(self):
401        # Try writing on a file opened in read mode and vice-versa.
402        exc = self.UnsupportedOperation
403        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as fp:
404            self.assertRaises(exc, fp.read)
405            self.assertRaises(exc, fp.readline)
406        with self.open(os_helper.TESTFN, "wb") as fp:
407            self.assertRaises(exc, fp.read)
408            self.assertRaises(exc, fp.readline)
409        with self.open(os_helper.TESTFN, "wb", buffering=0) as fp:
410            self.assertRaises(exc, fp.read)
411            self.assertRaises(exc, fp.readline)
412        with self.open(os_helper.TESTFN, "rb", buffering=0) as fp:
413            self.assertRaises(exc, fp.write, b"blah")
414            self.assertRaises(exc, fp.writelines, [b"blah\n"])
415        with self.open(os_helper.TESTFN, "rb") as fp:
416            self.assertRaises(exc, fp.write, b"blah")
417            self.assertRaises(exc, fp.writelines, [b"blah\n"])
418        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as fp:
419            self.assertRaises(exc, fp.write, "blah")
420            self.assertRaises(exc, fp.writelines, ["blah\n"])
421            # Non-zero seeking from current or end pos
422            self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
423            self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
424
425    def test_optional_abilities(self):
426        # Test for OSError when optional APIs are not supported
427        # The purpose of this test is to try fileno(), reading, writing and
428        # seeking operations with various objects that indicate they do not
429        # support these operations.
430
431        def pipe_reader():
432            [r, w] = os.pipe()
433            os.close(w)  # So that read() is harmless
434            return self.FileIO(r, "r")
435
436        def pipe_writer():
437            [r, w] = os.pipe()
438            self.addCleanup(os.close, r)
439            # Guarantee that we can write into the pipe without blocking
440            thread = threading.Thread(target=os.read, args=(r, 100))
441            thread.start()
442            self.addCleanup(thread.join)
443            return self.FileIO(w, "w")
444
445        def buffered_reader():
446            return self.BufferedReader(self.MockUnseekableIO())
447
448        def buffered_writer():
449            return self.BufferedWriter(self.MockUnseekableIO())
450
451        def buffered_random():
452            return self.BufferedRandom(self.BytesIO())
453
454        def buffered_rw_pair():
455            return self.BufferedRWPair(self.MockUnseekableIO(),
456                self.MockUnseekableIO())
457
458        def text_reader():
459            class UnseekableReader(self.MockUnseekableIO):
460                writable = self.BufferedIOBase.writable
461                write = self.BufferedIOBase.write
462            return self.TextIOWrapper(UnseekableReader(), "ascii")
463
464        def text_writer():
465            class UnseekableWriter(self.MockUnseekableIO):
466                readable = self.BufferedIOBase.readable
467                read = self.BufferedIOBase.read
468            return self.TextIOWrapper(UnseekableWriter(), "ascii")
469
470        tests = (
471            (pipe_reader, "fr"), (pipe_writer, "fw"),
472            (buffered_reader, "r"), (buffered_writer, "w"),
473            (buffered_random, "rws"), (buffered_rw_pair, "rw"),
474            (text_reader, "r"), (text_writer, "w"),
475            (self.BytesIO, "rws"), (self.StringIO, "rws"),
476        )
477        for [test, abilities] in tests:
478            with self.subTest(test), test() as obj:
479                readable = "r" in abilities
480                self.assertEqual(obj.readable(), readable)
481                writable = "w" in abilities
482                self.assertEqual(obj.writable(), writable)
483
484                if isinstance(obj, self.TextIOBase):
485                    data = "3"
486                elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
487                    data = b"3"
488                else:
489                    self.fail("Unknown base class")
490
491                if "f" in abilities:
492                    obj.fileno()
493                else:
494                    self.assertRaises(OSError, obj.fileno)
495
496                if readable:
497                    obj.read(1)
498                    obj.read()
499                else:
500                    self.assertRaises(OSError, obj.read, 1)
501                    self.assertRaises(OSError, obj.read)
502
503                if writable:
504                    obj.write(data)
505                else:
506                    self.assertRaises(OSError, obj.write, data)
507
508                if sys.platform.startswith("win") and test in (
509                        pipe_reader, pipe_writer):
510                    # Pipes seem to appear as seekable on Windows
511                    continue
512                seekable = "s" in abilities
513                self.assertEqual(obj.seekable(), seekable)
514
515                if seekable:
516                    obj.tell()
517                    obj.seek(0)
518                else:
519                    self.assertRaises(OSError, obj.tell)
520                    self.assertRaises(OSError, obj.seek, 0)
521
522                if writable and seekable:
523                    obj.truncate()
524                    obj.truncate(0)
525                else:
526                    self.assertRaises(OSError, obj.truncate)
527                    self.assertRaises(OSError, obj.truncate, 0)
528
529    def test_open_handles_NUL_chars(self):
530        fn_with_NUL = 'foo\0bar'
531        self.assertRaises(ValueError, self.open, fn_with_NUL, 'w', encoding="utf-8")
532
533        bytes_fn = bytes(fn_with_NUL, 'ascii')
534        with warnings.catch_warnings():
535            warnings.simplefilter("ignore", DeprecationWarning)
536            self.assertRaises(ValueError, self.open, bytes_fn, 'w', encoding="utf-8")
537
538    def test_raw_file_io(self):
539        with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
540            self.assertEqual(f.readable(), False)
541            self.assertEqual(f.writable(), True)
542            self.assertEqual(f.seekable(), True)
543            self.write_ops(f)
544        with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
545            self.assertEqual(f.readable(), True)
546            self.assertEqual(f.writable(), False)
547            self.assertEqual(f.seekable(), True)
548            self.read_ops(f)
549
550    def test_buffered_file_io(self):
551        with self.open(os_helper.TESTFN, "wb") as f:
552            self.assertEqual(f.readable(), False)
553            self.assertEqual(f.writable(), True)
554            self.assertEqual(f.seekable(), True)
555            self.write_ops(f)
556        with self.open(os_helper.TESTFN, "rb") as f:
557            self.assertEqual(f.readable(), True)
558            self.assertEqual(f.writable(), False)
559            self.assertEqual(f.seekable(), True)
560            self.read_ops(f, True)
561
562    def test_readline(self):
563        with self.open(os_helper.TESTFN, "wb") as f:
564            f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
565        with self.open(os_helper.TESTFN, "rb") as f:
566            self.assertEqual(f.readline(), b"abc\n")
567            self.assertEqual(f.readline(10), b"def\n")
568            self.assertEqual(f.readline(2), b"xy")
569            self.assertEqual(f.readline(4), b"zzy\n")
570            self.assertEqual(f.readline(), b"foo\x00bar\n")
571            self.assertEqual(f.readline(None), b"another line")
572            self.assertRaises(TypeError, f.readline, 5.3)
573        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
574            self.assertRaises(TypeError, f.readline, 5.3)
575
576    def test_readline_nonsizeable(self):
577        # Issue #30061
578        # Crash when readline() returns an object without __len__
579        class R(self.IOBase):
580            def readline(self):
581                return None
582        self.assertRaises((TypeError, StopIteration), next, R())
583
584    def test_next_nonsizeable(self):
585        # Issue #30061
586        # Crash when __next__() returns an object without __len__
587        class R(self.IOBase):
588            def __next__(self):
589                return None
590        self.assertRaises(TypeError, R().readlines, 1)
591
592    def test_raw_bytes_io(self):
593        f = self.BytesIO()
594        self.write_ops(f)
595        data = f.getvalue()
596        self.assertEqual(data, b"hello world\n")
597        f = self.BytesIO(data)
598        self.read_ops(f, True)
599
600    def test_large_file_ops(self):
601        # On Windows and Mac OSX this test consumes large resources; It takes
602        # a long time to build the >2 GiB file and takes >2 GiB of disk space
603        # therefore the resource must be enabled to run this test.
604        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
605            support.requires(
606                'largefile',
607                'test requires %s bytes and a long time to run' % self.LARGE)
608        with self.open(os_helper.TESTFN, "w+b", 0) as f:
609            self.large_file_ops(f)
610        with self.open(os_helper.TESTFN, "w+b") as f:
611            self.large_file_ops(f)
612
613    def test_with_open(self):
614        for bufsize in (0, 100):
615            f = None
616            with self.open(os_helper.TESTFN, "wb", bufsize) as f:
617                f.write(b"xxx")
618            self.assertEqual(f.closed, True)
619            f = None
620            try:
621                with self.open(os_helper.TESTFN, "wb", bufsize) as f:
622                    1/0
623            except ZeroDivisionError:
624                self.assertEqual(f.closed, True)
625            else:
626                self.fail("1/0 didn't raise an exception")
627
628    # issue 5008
629    def test_append_mode_tell(self):
630        with self.open(os_helper.TESTFN, "wb") as f:
631            f.write(b"xxx")
632        with self.open(os_helper.TESTFN, "ab", buffering=0) as f:
633            self.assertEqual(f.tell(), 3)
634        with self.open(os_helper.TESTFN, "ab") as f:
635            self.assertEqual(f.tell(), 3)
636        with self.open(os_helper.TESTFN, "a", encoding="utf-8") as f:
637            self.assertGreater(f.tell(), 0)
638
639    def test_destructor(self):
640        record = []
641        class MyFileIO(self.FileIO):
642            def __del__(self):
643                record.append(1)
644                try:
645                    f = super().__del__
646                except AttributeError:
647                    pass
648                else:
649                    f()
650            def close(self):
651                record.append(2)
652                super().close()
653            def flush(self):
654                record.append(3)
655                super().flush()
656        with warnings_helper.check_warnings(('', ResourceWarning)):
657            f = MyFileIO(os_helper.TESTFN, "wb")
658            f.write(b"xxx")
659            del f
660            support.gc_collect()
661            self.assertEqual(record, [1, 2, 3])
662            with self.open(os_helper.TESTFN, "rb") as f:
663                self.assertEqual(f.read(), b"xxx")
664
665    def _check_base_destructor(self, base):
666        record = []
667        class MyIO(base):
668            def __init__(self):
669                # This exercises the availability of attributes on object
670                # destruction.
671                # (in the C version, close() is called by the tp_dealloc
672                # function, not by __del__)
673                self.on_del = 1
674                self.on_close = 2
675                self.on_flush = 3
676            def __del__(self):
677                record.append(self.on_del)
678                try:
679                    f = super().__del__
680                except AttributeError:
681                    pass
682                else:
683                    f()
684            def close(self):
685                record.append(self.on_close)
686                super().close()
687            def flush(self):
688                record.append(self.on_flush)
689                super().flush()
690        f = MyIO()
691        del f
692        support.gc_collect()
693        self.assertEqual(record, [1, 2, 3])
694
695    def test_IOBase_destructor(self):
696        self._check_base_destructor(self.IOBase)
697
698    def test_RawIOBase_destructor(self):
699        self._check_base_destructor(self.RawIOBase)
700
701    def test_BufferedIOBase_destructor(self):
702        self._check_base_destructor(self.BufferedIOBase)
703
704    def test_TextIOBase_destructor(self):
705        self._check_base_destructor(self.TextIOBase)
706
707    def test_close_flushes(self):
708        with self.open(os_helper.TESTFN, "wb") as f:
709            f.write(b"xxx")
710        with self.open(os_helper.TESTFN, "rb") as f:
711            self.assertEqual(f.read(), b"xxx")
712
713    def test_array_writes(self):
714        a = array.array('i', range(10))
715        n = len(a.tobytes())
716        def check(f):
717            with f:
718                self.assertEqual(f.write(a), n)
719                f.writelines((a,))
720        check(self.BytesIO())
721        check(self.FileIO(os_helper.TESTFN, "w"))
722        check(self.BufferedWriter(self.MockRawIO()))
723        check(self.BufferedRandom(self.MockRawIO()))
724        check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
725
726    def test_closefd(self):
727        self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w',
728                          encoding="utf-8", closefd=False)
729
730    def test_read_closed(self):
731        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
732            f.write("egg\n")
733        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
734            file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False)
735            self.assertEqual(file.read(), "egg\n")
736            file.seek(0)
737            file.close()
738            self.assertRaises(ValueError, file.read)
739        with self.open(os_helper.TESTFN, "rb") as f:
740            file = self.open(f.fileno(), "rb", closefd=False)
741            self.assertEqual(file.read()[:3], b"egg")
742            file.close()
743            self.assertRaises(ValueError, file.readinto, bytearray(1))
744
745    def test_no_closefd_with_filename(self):
746        # can't use closefd in combination with a file name
747        self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r",
748                          encoding="utf-8", closefd=False)
749
750    def test_closefd_attr(self):
751        with self.open(os_helper.TESTFN, "wb") as f:
752            f.write(b"egg\n")
753        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
754            self.assertEqual(f.buffer.raw.closefd, True)
755            file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False)
756            self.assertEqual(file.buffer.raw.closefd, False)
757
758    def test_garbage_collection(self):
759        # FileIO objects are collected, and collecting them flushes
760        # all data to disk.
761        with warnings_helper.check_warnings(('', ResourceWarning)):
762            f = self.FileIO(os_helper.TESTFN, "wb")
763            f.write(b"abcxxx")
764            f.f = f
765            wr = weakref.ref(f)
766            del f
767            support.gc_collect()
768        self.assertIsNone(wr(), wr)
769        with self.open(os_helper.TESTFN, "rb") as f:
770            self.assertEqual(f.read(), b"abcxxx")
771
772    def test_unbounded_file(self):
773        # Issue #1174606: reading from an unbounded stream such as /dev/zero.
774        zero = "/dev/zero"
775        if not os.path.exists(zero):
776            self.skipTest("{0} does not exist".format(zero))
777        if sys.maxsize > 0x7FFFFFFF:
778            self.skipTest("test can only run in a 32-bit address space")
779        if support.real_max_memuse < support._2G:
780            self.skipTest("test requires at least 2 GiB of memory")
781        with self.open(zero, "rb", buffering=0) as f:
782            self.assertRaises(OverflowError, f.read)
783        with self.open(zero, "rb") as f:
784            self.assertRaises(OverflowError, f.read)
785        with self.open(zero, "r") as f:
786            self.assertRaises(OverflowError, f.read)
787
788    def check_flush_error_on_close(self, *args, **kwargs):
789        # Test that the file is closed despite failed flush
790        # and that flush() is called before file closed.
791        f = self.open(*args, **kwargs)
792        closed = []
793        def bad_flush():
794            closed[:] = [f.closed]
795            raise OSError()
796        f.flush = bad_flush
797        self.assertRaises(OSError, f.close) # exception not swallowed
798        self.assertTrue(f.closed)
799        self.assertTrue(closed)      # flush() called
800        self.assertFalse(closed[0])  # flush() called before file closed
801        f.flush = lambda: None  # break reference loop
802
803    def test_flush_error_on_close(self):
804        # raw file
805        # Issue #5700: io.FileIO calls flush() after file closed
806        self.check_flush_error_on_close(os_helper.TESTFN, 'wb', buffering=0)
807        fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
808        self.check_flush_error_on_close(fd, 'wb', buffering=0)
809        fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
810        self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
811        os.close(fd)
812        # buffered io
813        self.check_flush_error_on_close(os_helper.TESTFN, 'wb')
814        fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
815        self.check_flush_error_on_close(fd, 'wb')
816        fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
817        self.check_flush_error_on_close(fd, 'wb', closefd=False)
818        os.close(fd)
819        # text io
820        self.check_flush_error_on_close(os_helper.TESTFN, 'w', encoding="utf-8")
821        fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
822        self.check_flush_error_on_close(fd, 'w', encoding="utf-8")
823        fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
824        self.check_flush_error_on_close(fd, 'w', encoding="utf-8", closefd=False)
825        os.close(fd)
826
827    def test_multi_close(self):
828        f = self.open(os_helper.TESTFN, "wb", buffering=0)
829        f.close()
830        f.close()
831        f.close()
832        self.assertRaises(ValueError, f.flush)
833
834    def test_RawIOBase_read(self):
835        # Exercise the default limited RawIOBase.read(n) implementation (which
836        # calls readinto() internally).
837        rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
838        self.assertEqual(rawio.read(2), b"ab")
839        self.assertEqual(rawio.read(2), b"c")
840        self.assertEqual(rawio.read(2), b"d")
841        self.assertEqual(rawio.read(2), None)
842        self.assertEqual(rawio.read(2), b"ef")
843        self.assertEqual(rawio.read(2), b"g")
844        self.assertEqual(rawio.read(2), None)
845        self.assertEqual(rawio.read(2), b"")
846
847    def test_types_have_dict(self):
848        test = (
849            self.IOBase(),
850            self.RawIOBase(),
851            self.TextIOBase(),
852            self.StringIO(),
853            self.BytesIO()
854        )
855        for obj in test:
856            self.assertTrue(hasattr(obj, "__dict__"))
857
858    def test_opener(self):
859        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
860            f.write("egg\n")
861        fd = os.open(os_helper.TESTFN, os.O_RDONLY)
862        def opener(path, flags):
863            return fd
864        with self.open("non-existent", "r", encoding="utf-8", opener=opener) as f:
865            self.assertEqual(f.read(), "egg\n")
866
867    def test_bad_opener_negative_1(self):
868        # Issue #27066.
869        def badopener(fname, flags):
870            return -1
871        with self.assertRaises(ValueError) as cm:
872            open('non-existent', 'r', opener=badopener)
873        self.assertEqual(str(cm.exception), 'opener returned -1')
874
875    def test_bad_opener_other_negative(self):
876        # Issue #27066.
877        def badopener(fname, flags):
878            return -2
879        with self.assertRaises(ValueError) as cm:
880            open('non-existent', 'r', opener=badopener)
881        self.assertEqual(str(cm.exception), 'opener returned -2')
882
883    def test_fileio_closefd(self):
884        # Issue #4841
885        with self.open(__file__, 'rb') as f1, \
886             self.open(__file__, 'rb') as f2:
887            fileio = self.FileIO(f1.fileno(), closefd=False)
888            # .__init__() must not close f1
889            fileio.__init__(f2.fileno(), closefd=False)
890            f1.readline()
891            # .close() must not close f2
892            fileio.close()
893            f2.readline()
894
895    def test_nonbuffered_textio(self):
896        with warnings_helper.check_no_resource_warning(self):
897            with self.assertRaises(ValueError):
898                self.open(os_helper.TESTFN, 'w', encoding="utf-8", buffering=0)
899
900    def test_invalid_newline(self):
901        with warnings_helper.check_no_resource_warning(self):
902            with self.assertRaises(ValueError):
903                self.open(os_helper.TESTFN, 'w', encoding="utf-8", newline='invalid')
904
905    def test_buffered_readinto_mixin(self):
906        # Test the implementation provided by BufferedIOBase
907        class Stream(self.BufferedIOBase):
908            def read(self, size):
909                return b"12345"
910            read1 = read
911        stream = Stream()
912        for method in ("readinto", "readinto1"):
913            with self.subTest(method):
914                buffer = byteslike(5)
915                self.assertEqual(getattr(stream, method)(buffer), 5)
916                self.assertEqual(bytes(buffer), b"12345")
917
918    def test_fspath_support(self):
919        def check_path_succeeds(path):
920            with self.open(path, "w", encoding="utf-8") as f:
921                f.write("egg\n")
922
923            with self.open(path, "r", encoding="utf-8") as f:
924                self.assertEqual(f.read(), "egg\n")
925
926        check_path_succeeds(FakePath(os_helper.TESTFN))
927        check_path_succeeds(FakePath(os.fsencode(os_helper.TESTFN)))
928
929        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
930            bad_path = FakePath(f.fileno())
931            with self.assertRaises(TypeError):
932                self.open(bad_path, 'w', encoding="utf-8")
933
934        bad_path = FakePath(None)
935        with self.assertRaises(TypeError):
936            self.open(bad_path, 'w', encoding="utf-8")
937
938        bad_path = FakePath(FloatingPointError)
939        with self.assertRaises(FloatingPointError):
940            self.open(bad_path, 'w', encoding="utf-8")
941
942        # ensure that refcounting is correct with some error conditions
943        with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
944            self.open(FakePath(os_helper.TESTFN), 'rwxa', encoding="utf-8")
945
946    def test_RawIOBase_readall(self):
947        # Exercise the default unlimited RawIOBase.read() and readall()
948        # implementations.
949        rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
950        self.assertEqual(rawio.read(), b"abcdefg")
951        rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
952        self.assertEqual(rawio.readall(), b"abcdefg")
953
954    def test_BufferedIOBase_readinto(self):
955        # Exercise the default BufferedIOBase.readinto() and readinto1()
956        # implementations (which call read() or read1() internally).
957        class Reader(self.BufferedIOBase):
958            def __init__(self, avail):
959                self.avail = avail
960            def read(self, size):
961                result = self.avail[:size]
962                self.avail = self.avail[size:]
963                return result
964            def read1(self, size):
965                """Returns no more than 5 bytes at once"""
966                return self.read(min(size, 5))
967        tests = (
968            # (test method, total data available, read buffer size, expected
969            #     read size)
970            ("readinto", 10, 5, 5),
971            ("readinto", 10, 6, 6),  # More than read1() can return
972            ("readinto", 5, 6, 5),  # Buffer larger than total available
973            ("readinto", 6, 7, 6),
974            ("readinto", 10, 0, 0),  # Empty buffer
975            ("readinto1", 10, 5, 5),  # Result limited to single read1() call
976            ("readinto1", 10, 6, 5),  # Buffer larger than read1() can return
977            ("readinto1", 5, 6, 5),  # Buffer larger than total available
978            ("readinto1", 6, 7, 5),
979            ("readinto1", 10, 0, 0),  # Empty buffer
980        )
981        UNUSED_BYTE = 0x81
982        for test in tests:
983            with self.subTest(test):
984                method, avail, request, result = test
985                reader = Reader(bytes(range(avail)))
986                buffer = bytearray((UNUSED_BYTE,) * request)
987                method = getattr(reader, method)
988                self.assertEqual(method(buffer), result)
989                self.assertEqual(len(buffer), request)
990                self.assertSequenceEqual(buffer[:result], range(result))
991                unused = (UNUSED_BYTE,) * (request - result)
992                self.assertSequenceEqual(buffer[result:], unused)
993                self.assertEqual(len(reader.avail), avail - result)
994
995    def test_close_assert(self):
996        class R(self.IOBase):
997            def __setattr__(self, name, value):
998                pass
999            def flush(self):
1000                raise OSError()
1001        f = R()
1002        # This would cause an assertion failure.
1003        self.assertRaises(OSError, f.close)
1004
1005        # Silence destructor error
1006        R.flush = lambda self: None
1007
1008
1009class CIOTest(IOTest):
1010
1011    def test_IOBase_finalize(self):
1012        # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1013        # class which inherits IOBase and an object of this class are caught
1014        # in a reference cycle and close() is already in the method cache.
1015        class MyIO(self.IOBase):
1016            def close(self):
1017                pass
1018
1019        # create an instance to populate the method cache
1020        MyIO()
1021        obj = MyIO()
1022        obj.obj = obj
1023        wr = weakref.ref(obj)
1024        del MyIO
1025        del obj
1026        support.gc_collect()
1027        self.assertIsNone(wr(), wr)
1028
1029class PyIOTest(IOTest):
1030    pass
1031
1032
1033@support.cpython_only
1034class APIMismatchTest(unittest.TestCase):
1035
1036    def test_RawIOBase_io_in_pyio_match(self):
1037        """Test that pyio RawIOBase class has all c RawIOBase methods"""
1038        mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1039                                               ignore=('__weakref__',))
1040        self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1041
1042    def test_RawIOBase_pyio_in_io_match(self):
1043        """Test that c RawIOBase class has all pyio RawIOBase methods"""
1044        mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1045        self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1046
1047
1048class CommonBufferedTests:
1049    # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1050
1051    def test_detach(self):
1052        raw = self.MockRawIO()
1053        buf = self.tp(raw)
1054        self.assertIs(buf.detach(), raw)
1055        self.assertRaises(ValueError, buf.detach)
1056
1057        repr(buf)  # Should still work
1058
1059    def test_fileno(self):
1060        rawio = self.MockRawIO()
1061        bufio = self.tp(rawio)
1062
1063        self.assertEqual(42, bufio.fileno())
1064
1065    def test_invalid_args(self):
1066        rawio = self.MockRawIO()
1067        bufio = self.tp(rawio)
1068        # Invalid whence
1069        self.assertRaises(ValueError, bufio.seek, 0, -1)
1070        self.assertRaises(ValueError, bufio.seek, 0, 9)
1071
1072    def test_override_destructor(self):
1073        tp = self.tp
1074        record = []
1075        class MyBufferedIO(tp):
1076            def __del__(self):
1077                record.append(1)
1078                try:
1079                    f = super().__del__
1080                except AttributeError:
1081                    pass
1082                else:
1083                    f()
1084            def close(self):
1085                record.append(2)
1086                super().close()
1087            def flush(self):
1088                record.append(3)
1089                super().flush()
1090        rawio = self.MockRawIO()
1091        bufio = MyBufferedIO(rawio)
1092        del bufio
1093        support.gc_collect()
1094        self.assertEqual(record, [1, 2, 3])
1095
1096    def test_context_manager(self):
1097        # Test usability as a context manager
1098        rawio = self.MockRawIO()
1099        bufio = self.tp(rawio)
1100        def _with():
1101            with bufio:
1102                pass
1103        _with()
1104        # bufio should now be closed, and using it a second time should raise
1105        # a ValueError.
1106        self.assertRaises(ValueError, _with)
1107
1108    def test_error_through_destructor(self):
1109        # Test that the exception state is not modified by a destructor,
1110        # even if close() fails.
1111        rawio = self.CloseFailureIO()
1112        with support.catch_unraisable_exception() as cm:
1113            with self.assertRaises(AttributeError):
1114                self.tp(rawio).xyzzy
1115
1116            if not IOBASE_EMITS_UNRAISABLE:
1117                self.assertIsNone(cm.unraisable)
1118            elif cm.unraisable is not None:
1119                self.assertEqual(cm.unraisable.exc_type, OSError)
1120
1121    def test_repr(self):
1122        raw = self.MockRawIO()
1123        b = self.tp(raw)
1124        clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1125        self.assertRegex(repr(b), "<%s>" % clsname)
1126        raw.name = "dummy"
1127        self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
1128        raw.name = b"dummy"
1129        self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
1130
1131    def test_recursive_repr(self):
1132        # Issue #25455
1133        raw = self.MockRawIO()
1134        b = self.tp(raw)
1135        with support.swap_attr(raw, 'name', b):
1136            try:
1137                repr(b)  # Should not crash
1138            except RuntimeError:
1139                pass
1140
1141    def test_flush_error_on_close(self):
1142        # Test that buffered file is closed despite failed flush
1143        # and that flush() is called before file closed.
1144        raw = self.MockRawIO()
1145        closed = []
1146        def bad_flush():
1147            closed[:] = [b.closed, raw.closed]
1148            raise OSError()
1149        raw.flush = bad_flush
1150        b = self.tp(raw)
1151        self.assertRaises(OSError, b.close) # exception not swallowed
1152        self.assertTrue(b.closed)
1153        self.assertTrue(raw.closed)
1154        self.assertTrue(closed)      # flush() called
1155        self.assertFalse(closed[0])  # flush() called before file closed
1156        self.assertFalse(closed[1])
1157        raw.flush = lambda: None  # break reference loop
1158
1159    def test_close_error_on_close(self):
1160        raw = self.MockRawIO()
1161        def bad_flush():
1162            raise OSError('flush')
1163        def bad_close():
1164            raise OSError('close')
1165        raw.close = bad_close
1166        b = self.tp(raw)
1167        b.flush = bad_flush
1168        with self.assertRaises(OSError) as err: # exception not swallowed
1169            b.close()
1170        self.assertEqual(err.exception.args, ('close',))
1171        self.assertIsInstance(err.exception.__context__, OSError)
1172        self.assertEqual(err.exception.__context__.args, ('flush',))
1173        self.assertFalse(b.closed)
1174
1175        # Silence destructor error
1176        raw.close = lambda: None
1177        b.flush = lambda: None
1178
1179    def test_nonnormalized_close_error_on_close(self):
1180        # Issue #21677
1181        raw = self.MockRawIO()
1182        def bad_flush():
1183            raise non_existing_flush
1184        def bad_close():
1185            raise non_existing_close
1186        raw.close = bad_close
1187        b = self.tp(raw)
1188        b.flush = bad_flush
1189        with self.assertRaises(NameError) as err: # exception not swallowed
1190            b.close()
1191        self.assertIn('non_existing_close', str(err.exception))
1192        self.assertIsInstance(err.exception.__context__, NameError)
1193        self.assertIn('non_existing_flush', str(err.exception.__context__))
1194        self.assertFalse(b.closed)
1195
1196        # Silence destructor error
1197        b.flush = lambda: None
1198        raw.close = lambda: None
1199
1200    def test_multi_close(self):
1201        raw = self.MockRawIO()
1202        b = self.tp(raw)
1203        b.close()
1204        b.close()
1205        b.close()
1206        self.assertRaises(ValueError, b.flush)
1207
1208    def test_unseekable(self):
1209        bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1210        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1211        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1212
1213    def test_readonly_attributes(self):
1214        raw = self.MockRawIO()
1215        buf = self.tp(raw)
1216        x = self.MockRawIO()
1217        with self.assertRaises(AttributeError):
1218            buf.raw = x
1219
1220
1221class SizeofTest:
1222
1223    @support.cpython_only
1224    def test_sizeof(self):
1225        bufsize1 = 4096
1226        bufsize2 = 8192
1227        rawio = self.MockRawIO()
1228        bufio = self.tp(rawio, buffer_size=bufsize1)
1229        size = sys.getsizeof(bufio) - bufsize1
1230        rawio = self.MockRawIO()
1231        bufio = self.tp(rawio, buffer_size=bufsize2)
1232        self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1233
1234    @support.cpython_only
1235    def test_buffer_freeing(self) :
1236        bufsize = 4096
1237        rawio = self.MockRawIO()
1238        bufio = self.tp(rawio, buffer_size=bufsize)
1239        size = sys.getsizeof(bufio) - bufsize
1240        bufio.close()
1241        self.assertEqual(sys.getsizeof(bufio), size)
1242
1243class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1244    read_mode = "rb"
1245
1246    def test_constructor(self):
1247        rawio = self.MockRawIO([b"abc"])
1248        bufio = self.tp(rawio)
1249        bufio.__init__(rawio)
1250        bufio.__init__(rawio, buffer_size=1024)
1251        bufio.__init__(rawio, buffer_size=16)
1252        self.assertEqual(b"abc", bufio.read())
1253        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1254        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1255        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1256        rawio = self.MockRawIO([b"abc"])
1257        bufio.__init__(rawio)
1258        self.assertEqual(b"abc", bufio.read())
1259
1260    def test_uninitialized(self):
1261        bufio = self.tp.__new__(self.tp)
1262        del bufio
1263        bufio = self.tp.__new__(self.tp)
1264        self.assertRaisesRegex((ValueError, AttributeError),
1265                               'uninitialized|has no attribute',
1266                               bufio.read, 0)
1267        bufio.__init__(self.MockRawIO())
1268        self.assertEqual(bufio.read(0), b'')
1269
1270    def test_read(self):
1271        for arg in (None, 7):
1272            rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1273            bufio = self.tp(rawio)
1274            self.assertEqual(b"abcdefg", bufio.read(arg))
1275        # Invalid args
1276        self.assertRaises(ValueError, bufio.read, -2)
1277
1278    def test_read1(self):
1279        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1280        bufio = self.tp(rawio)
1281        self.assertEqual(b"a", bufio.read(1))
1282        self.assertEqual(b"b", bufio.read1(1))
1283        self.assertEqual(rawio._reads, 1)
1284        self.assertEqual(b"", bufio.read1(0))
1285        self.assertEqual(b"c", bufio.read1(100))
1286        self.assertEqual(rawio._reads, 1)
1287        self.assertEqual(b"d", bufio.read1(100))
1288        self.assertEqual(rawio._reads, 2)
1289        self.assertEqual(b"efg", bufio.read1(100))
1290        self.assertEqual(rawio._reads, 3)
1291        self.assertEqual(b"", bufio.read1(100))
1292        self.assertEqual(rawio._reads, 4)
1293
1294    def test_read1_arbitrary(self):
1295        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1296        bufio = self.tp(rawio)
1297        self.assertEqual(b"a", bufio.read(1))
1298        self.assertEqual(b"bc", bufio.read1())
1299        self.assertEqual(b"d", bufio.read1())
1300        self.assertEqual(b"efg", bufio.read1(-1))
1301        self.assertEqual(rawio._reads, 3)
1302        self.assertEqual(b"", bufio.read1())
1303        self.assertEqual(rawio._reads, 4)
1304
1305    def test_readinto(self):
1306        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1307        bufio = self.tp(rawio)
1308        b = bytearray(2)
1309        self.assertEqual(bufio.readinto(b), 2)
1310        self.assertEqual(b, b"ab")
1311        self.assertEqual(bufio.readinto(b), 2)
1312        self.assertEqual(b, b"cd")
1313        self.assertEqual(bufio.readinto(b), 2)
1314        self.assertEqual(b, b"ef")
1315        self.assertEqual(bufio.readinto(b), 1)
1316        self.assertEqual(b, b"gf")
1317        self.assertEqual(bufio.readinto(b), 0)
1318        self.assertEqual(b, b"gf")
1319        rawio = self.MockRawIO((b"abc", None))
1320        bufio = self.tp(rawio)
1321        self.assertEqual(bufio.readinto(b), 2)
1322        self.assertEqual(b, b"ab")
1323        self.assertEqual(bufio.readinto(b), 1)
1324        self.assertEqual(b, b"cb")
1325
1326    def test_readinto1(self):
1327        buffer_size = 10
1328        rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1329        bufio = self.tp(rawio, buffer_size=buffer_size)
1330        b = bytearray(2)
1331        self.assertEqual(bufio.peek(3), b'abc')
1332        self.assertEqual(rawio._reads, 1)
1333        self.assertEqual(bufio.readinto1(b), 2)
1334        self.assertEqual(b, b"ab")
1335        self.assertEqual(rawio._reads, 1)
1336        self.assertEqual(bufio.readinto1(b), 1)
1337        self.assertEqual(b[:1], b"c")
1338        self.assertEqual(rawio._reads, 1)
1339        self.assertEqual(bufio.readinto1(b), 2)
1340        self.assertEqual(b, b"de")
1341        self.assertEqual(rawio._reads, 2)
1342        b = bytearray(2*buffer_size)
1343        self.assertEqual(bufio.peek(3), b'fgh')
1344        self.assertEqual(rawio._reads, 3)
1345        self.assertEqual(bufio.readinto1(b), 6)
1346        self.assertEqual(b[:6], b"fghjkl")
1347        self.assertEqual(rawio._reads, 4)
1348
1349    def test_readinto_array(self):
1350        buffer_size = 60
1351        data = b"a" * 26
1352        rawio = self.MockRawIO((data,))
1353        bufio = self.tp(rawio, buffer_size=buffer_size)
1354
1355        # Create an array with element size > 1 byte
1356        b = array.array('i', b'x' * 32)
1357        assert len(b) != 16
1358
1359        # Read into it. We should get as many *bytes* as we can fit into b
1360        # (which is more than the number of elements)
1361        n = bufio.readinto(b)
1362        self.assertGreater(n, len(b))
1363
1364        # Check that old contents of b are preserved
1365        bm = memoryview(b).cast('B')
1366        self.assertLess(n, len(bm))
1367        self.assertEqual(bm[:n], data[:n])
1368        self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1369
1370    def test_readinto1_array(self):
1371        buffer_size = 60
1372        data = b"a" * 26
1373        rawio = self.MockRawIO((data,))
1374        bufio = self.tp(rawio, buffer_size=buffer_size)
1375
1376        # Create an array with element size > 1 byte
1377        b = array.array('i', b'x' * 32)
1378        assert len(b) != 16
1379
1380        # Read into it. We should get as many *bytes* as we can fit into b
1381        # (which is more than the number of elements)
1382        n = bufio.readinto1(b)
1383        self.assertGreater(n, len(b))
1384
1385        # Check that old contents of b are preserved
1386        bm = memoryview(b).cast('B')
1387        self.assertLess(n, len(bm))
1388        self.assertEqual(bm[:n], data[:n])
1389        self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1390
1391    def test_readlines(self):
1392        def bufio():
1393            rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1394            return self.tp(rawio)
1395        self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1396        self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1397        self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
1398
1399    def test_buffering(self):
1400        data = b"abcdefghi"
1401        dlen = len(data)
1402
1403        tests = [
1404            [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1405            [ 100, [ 3, 3, 3],     [ dlen ]    ],
1406            [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1407        ]
1408
1409        for bufsize, buf_read_sizes, raw_read_sizes in tests:
1410            rawio = self.MockFileIO(data)
1411            bufio = self.tp(rawio, buffer_size=bufsize)
1412            pos = 0
1413            for nbytes in buf_read_sizes:
1414                self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
1415                pos += nbytes
1416            # this is mildly implementation-dependent
1417            self.assertEqual(rawio.read_history, raw_read_sizes)
1418
1419    def test_read_non_blocking(self):
1420        # Inject some None's in there to simulate EWOULDBLOCK
1421        rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1422        bufio = self.tp(rawio)
1423        self.assertEqual(b"abcd", bufio.read(6))
1424        self.assertEqual(b"e", bufio.read(1))
1425        self.assertEqual(b"fg", bufio.read())
1426        self.assertEqual(b"", bufio.peek(1))
1427        self.assertIsNone(bufio.read())
1428        self.assertEqual(b"", bufio.read())
1429
1430        rawio = self.MockRawIO((b"a", None, None))
1431        self.assertEqual(b"a", rawio.readall())
1432        self.assertIsNone(rawio.readall())
1433
1434    def test_read_past_eof(self):
1435        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1436        bufio = self.tp(rawio)
1437
1438        self.assertEqual(b"abcdefg", bufio.read(9000))
1439
1440    def test_read_all(self):
1441        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1442        bufio = self.tp(rawio)
1443
1444        self.assertEqual(b"abcdefg", bufio.read())
1445
1446    @support.requires_resource('cpu')
1447    def test_threads(self):
1448        try:
1449            # Write out many bytes with exactly the same number of 0's,
1450            # 1's... 255's. This will help us check that concurrent reading
1451            # doesn't duplicate or forget contents.
1452            N = 1000
1453            l = list(range(256)) * N
1454            random.shuffle(l)
1455            s = bytes(bytearray(l))
1456            with self.open(os_helper.TESTFN, "wb") as f:
1457                f.write(s)
1458            with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw:
1459                bufio = self.tp(raw, 8)
1460                errors = []
1461                results = []
1462                def f():
1463                    try:
1464                        # Intra-buffer read then buffer-flushing read
1465                        for n in cycle([1, 19]):
1466                            s = bufio.read(n)
1467                            if not s:
1468                                break
1469                            # list.append() is atomic
1470                            results.append(s)
1471                    except Exception as e:
1472                        errors.append(e)
1473                        raise
1474                threads = [threading.Thread(target=f) for x in range(20)]
1475                with threading_helper.start_threads(threads):
1476                    time.sleep(0.02) # yield
1477                self.assertFalse(errors,
1478                    "the following exceptions were caught: %r" % errors)
1479                s = b''.join(results)
1480                for i in range(256):
1481                    c = bytes(bytearray([i]))
1482                    self.assertEqual(s.count(c), N)
1483        finally:
1484            os_helper.unlink(os_helper.TESTFN)
1485
1486    def test_unseekable(self):
1487        bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1488        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1489        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1490        bufio.read(1)
1491        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1492        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1493
1494    def test_misbehaved_io(self):
1495        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1496        bufio = self.tp(rawio)
1497        self.assertRaises(OSError, bufio.seek, 0)
1498        self.assertRaises(OSError, bufio.tell)
1499
1500        # Silence destructor error
1501        bufio.close = lambda: None
1502
1503    def test_no_extraneous_read(self):
1504        # Issue #9550; when the raw IO object has satisfied the read request,
1505        # we should not issue any additional reads, otherwise it may block
1506        # (e.g. socket).
1507        bufsize = 16
1508        for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1509            rawio = self.MockRawIO([b"x" * n])
1510            bufio = self.tp(rawio, bufsize)
1511            self.assertEqual(bufio.read(n), b"x" * n)
1512            # Simple case: one raw read is enough to satisfy the request.
1513            self.assertEqual(rawio._extraneous_reads, 0,
1514                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1515            # A more complex case where two raw reads are needed to satisfy
1516            # the request.
1517            rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1518            bufio = self.tp(rawio, bufsize)
1519            self.assertEqual(bufio.read(n), b"x" * n)
1520            self.assertEqual(rawio._extraneous_reads, 0,
1521                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1522
1523    def test_read_on_closed(self):
1524        # Issue #23796
1525        b = io.BufferedReader(io.BytesIO(b"12"))
1526        b.read(1)
1527        b.close()
1528        self.assertRaises(ValueError, b.peek)
1529        self.assertRaises(ValueError, b.read1, 1)
1530
1531    def test_truncate_on_read_only(self):
1532        rawio = self.MockFileIO(b"abc")
1533        bufio = self.tp(rawio)
1534        self.assertFalse(bufio.writable())
1535        self.assertRaises(self.UnsupportedOperation, bufio.truncate)
1536        self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
1537
1538
1539class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
1540    tp = io.BufferedReader
1541
1542    @skip_if_sanitizer(memory=True, address=True, reason= "sanitizer defaults to crashing "
1543                       "instead of returning NULL for malloc failure.")
1544    def test_constructor(self):
1545        BufferedReaderTest.test_constructor(self)
1546        # The allocation can succeed on 32-bit builds, e.g. with more
1547        # than 2 GiB RAM and a 64-bit kernel.
1548        if sys.maxsize > 0x7FFFFFFF:
1549            rawio = self.MockRawIO()
1550            bufio = self.tp(rawio)
1551            self.assertRaises((OverflowError, MemoryError, ValueError),
1552                bufio.__init__, rawio, sys.maxsize)
1553
1554    def test_initialization(self):
1555        rawio = self.MockRawIO([b"abc"])
1556        bufio = self.tp(rawio)
1557        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1558        self.assertRaises(ValueError, bufio.read)
1559        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1560        self.assertRaises(ValueError, bufio.read)
1561        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1562        self.assertRaises(ValueError, bufio.read)
1563
1564    def test_misbehaved_io_read(self):
1565        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1566        bufio = self.tp(rawio)
1567        # _pyio.BufferedReader seems to implement reading different, so that
1568        # checking this is not so easy.
1569        self.assertRaises(OSError, bufio.read, 10)
1570
1571    def test_garbage_collection(self):
1572        # C BufferedReader objects are collected.
1573        # The Python version has __del__, so it ends into gc.garbage instead
1574        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1575        with warnings_helper.check_warnings(('', ResourceWarning)):
1576            rawio = self.FileIO(os_helper.TESTFN, "w+b")
1577            f = self.tp(rawio)
1578            f.f = f
1579            wr = weakref.ref(f)
1580            del f
1581            support.gc_collect()
1582        self.assertIsNone(wr(), wr)
1583
1584    def test_args_error(self):
1585        # Issue #17275
1586        with self.assertRaisesRegex(TypeError, "BufferedReader"):
1587            self.tp(io.BytesIO(), 1024, 1024, 1024)
1588
1589    def test_bad_readinto_value(self):
1590        rawio = io.BufferedReader(io.BytesIO(b"12"))
1591        rawio.readinto = lambda buf: -1
1592        bufio = self.tp(rawio)
1593        with self.assertRaises(OSError) as cm:
1594            bufio.readline()
1595        self.assertIsNone(cm.exception.__cause__)
1596
1597    def test_bad_readinto_type(self):
1598        rawio = io.BufferedReader(io.BytesIO(b"12"))
1599        rawio.readinto = lambda buf: b''
1600        bufio = self.tp(rawio)
1601        with self.assertRaises(OSError) as cm:
1602            bufio.readline()
1603        self.assertIsInstance(cm.exception.__cause__, TypeError)
1604
1605
1606class PyBufferedReaderTest(BufferedReaderTest):
1607    tp = pyio.BufferedReader
1608
1609
1610class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1611    write_mode = "wb"
1612
1613    def test_constructor(self):
1614        rawio = self.MockRawIO()
1615        bufio = self.tp(rawio)
1616        bufio.__init__(rawio)
1617        bufio.__init__(rawio, buffer_size=1024)
1618        bufio.__init__(rawio, buffer_size=16)
1619        self.assertEqual(3, bufio.write(b"abc"))
1620        bufio.flush()
1621        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1622        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1623        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1624        bufio.__init__(rawio)
1625        self.assertEqual(3, bufio.write(b"ghi"))
1626        bufio.flush()
1627        self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
1628
1629    def test_uninitialized(self):
1630        bufio = self.tp.__new__(self.tp)
1631        del bufio
1632        bufio = self.tp.__new__(self.tp)
1633        self.assertRaisesRegex((ValueError, AttributeError),
1634                               'uninitialized|has no attribute',
1635                               bufio.write, b'')
1636        bufio.__init__(self.MockRawIO())
1637        self.assertEqual(bufio.write(b''), 0)
1638
1639    def test_detach_flush(self):
1640        raw = self.MockRawIO()
1641        buf = self.tp(raw)
1642        buf.write(b"howdy!")
1643        self.assertFalse(raw._write_stack)
1644        buf.detach()
1645        self.assertEqual(raw._write_stack, [b"howdy!"])
1646
1647    def test_write(self):
1648        # Write to the buffered IO but don't overflow the buffer.
1649        writer = self.MockRawIO()
1650        bufio = self.tp(writer, 8)
1651        bufio.write(b"abc")
1652        self.assertFalse(writer._write_stack)
1653        buffer = bytearray(b"def")
1654        bufio.write(buffer)
1655        buffer[:] = b"***"  # Overwrite our copy of the data
1656        bufio.flush()
1657        self.assertEqual(b"".join(writer._write_stack), b"abcdef")
1658
1659    def test_write_overflow(self):
1660        writer = self.MockRawIO()
1661        bufio = self.tp(writer, 8)
1662        contents = b"abcdefghijklmnop"
1663        for n in range(0, len(contents), 3):
1664            bufio.write(contents[n:n+3])
1665        flushed = b"".join(writer._write_stack)
1666        # At least (total - 8) bytes were implicitly flushed, perhaps more
1667        # depending on the implementation.
1668        self.assertTrue(flushed.startswith(contents[:-8]), flushed)
1669
1670    def check_writes(self, intermediate_func):
1671        # Lots of writes, test the flushed output is as expected.
1672        contents = bytes(range(256)) * 1000
1673        n = 0
1674        writer = self.MockRawIO()
1675        bufio = self.tp(writer, 13)
1676        # Generator of write sizes: repeat each N 15 times then proceed to N+1
1677        def gen_sizes():
1678            for size in count(1):
1679                for i in range(15):
1680                    yield size
1681        sizes = gen_sizes()
1682        while n < len(contents):
1683            size = min(next(sizes), len(contents) - n)
1684            self.assertEqual(bufio.write(contents[n:n+size]), size)
1685            intermediate_func(bufio)
1686            n += size
1687        bufio.flush()
1688        self.assertEqual(contents, b"".join(writer._write_stack))
1689
1690    def test_writes(self):
1691        self.check_writes(lambda bufio: None)
1692
1693    def test_writes_and_flushes(self):
1694        self.check_writes(lambda bufio: bufio.flush())
1695
1696    def test_writes_and_seeks(self):
1697        def _seekabs(bufio):
1698            pos = bufio.tell()
1699            bufio.seek(pos + 1, 0)
1700            bufio.seek(pos - 1, 0)
1701            bufio.seek(pos, 0)
1702        self.check_writes(_seekabs)
1703        def _seekrel(bufio):
1704            pos = bufio.seek(0, 1)
1705            bufio.seek(+1, 1)
1706            bufio.seek(-1, 1)
1707            bufio.seek(pos, 0)
1708        self.check_writes(_seekrel)
1709
1710    def test_writes_and_truncates(self):
1711        self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
1712
1713    def test_write_non_blocking(self):
1714        raw = self.MockNonBlockWriterIO()
1715        bufio = self.tp(raw, 8)
1716
1717        self.assertEqual(bufio.write(b"abcd"), 4)
1718        self.assertEqual(bufio.write(b"efghi"), 5)
1719        # 1 byte will be written, the rest will be buffered
1720        raw.block_on(b"k")
1721        self.assertEqual(bufio.write(b"jklmn"), 5)
1722
1723        # 8 bytes will be written, 8 will be buffered and the rest will be lost
1724        raw.block_on(b"0")
1725        try:
1726            bufio.write(b"opqrwxyz0123456789")
1727        except self.BlockingIOError as e:
1728            written = e.characters_written
1729        else:
1730            self.fail("BlockingIOError should have been raised")
1731        self.assertEqual(written, 16)
1732        self.assertEqual(raw.pop_written(),
1733            b"abcdefghijklmnopqrwxyz")
1734
1735        self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
1736        s = raw.pop_written()
1737        # Previously buffered bytes were flushed
1738        self.assertTrue(s.startswith(b"01234567A"), s)
1739
1740    def test_write_and_rewind(self):
1741        raw = io.BytesIO()
1742        bufio = self.tp(raw, 4)
1743        self.assertEqual(bufio.write(b"abcdef"), 6)
1744        self.assertEqual(bufio.tell(), 6)
1745        bufio.seek(0, 0)
1746        self.assertEqual(bufio.write(b"XY"), 2)
1747        bufio.seek(6, 0)
1748        self.assertEqual(raw.getvalue(), b"XYcdef")
1749        self.assertEqual(bufio.write(b"123456"), 6)
1750        bufio.flush()
1751        self.assertEqual(raw.getvalue(), b"XYcdef123456")
1752
1753    def test_flush(self):
1754        writer = self.MockRawIO()
1755        bufio = self.tp(writer, 8)
1756        bufio.write(b"abc")
1757        bufio.flush()
1758        self.assertEqual(b"abc", writer._write_stack[0])
1759
1760    def test_writelines(self):
1761        l = [b'ab', b'cd', b'ef']
1762        writer = self.MockRawIO()
1763        bufio = self.tp(writer, 8)
1764        bufio.writelines(l)
1765        bufio.flush()
1766        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1767
1768    def test_writelines_userlist(self):
1769        l = UserList([b'ab', b'cd', b'ef'])
1770        writer = self.MockRawIO()
1771        bufio = self.tp(writer, 8)
1772        bufio.writelines(l)
1773        bufio.flush()
1774        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1775
1776    def test_writelines_error(self):
1777        writer = self.MockRawIO()
1778        bufio = self.tp(writer, 8)
1779        self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1780        self.assertRaises(TypeError, bufio.writelines, None)
1781        self.assertRaises(TypeError, bufio.writelines, 'abc')
1782
1783    def test_destructor(self):
1784        writer = self.MockRawIO()
1785        bufio = self.tp(writer, 8)
1786        bufio.write(b"abc")
1787        del bufio
1788        support.gc_collect()
1789        self.assertEqual(b"abc", writer._write_stack[0])
1790
1791    def test_truncate(self):
1792        # Truncate implicitly flushes the buffer.
1793        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1794        with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
1795            bufio = self.tp(raw, 8)
1796            bufio.write(b"abcdef")
1797            self.assertEqual(bufio.truncate(3), 3)
1798            self.assertEqual(bufio.tell(), 6)
1799        with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
1800            self.assertEqual(f.read(), b"abc")
1801
1802    def test_truncate_after_write(self):
1803        # Ensure that truncate preserves the file position after
1804        # writes longer than the buffer size.
1805        # Issue: https://bugs.python.org/issue32228
1806        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1807        with self.open(os_helper.TESTFN, "wb") as f:
1808            # Fill with some buffer
1809            f.write(b'\x00' * 10000)
1810        buffer_sizes = [8192, 4096, 200]
1811        for buffer_size in buffer_sizes:
1812            with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f:
1813                f.write(b'\x00' * (buffer_size + 1))
1814                # After write write_pos and write_end are set to 0
1815                f.read(1)
1816                # read operation makes sure that pos != raw_pos
1817                f.truncate()
1818                self.assertEqual(f.tell(), buffer_size + 2)
1819
1820    @support.requires_resource('cpu')
1821    def test_threads(self):
1822        try:
1823            # Write out many bytes from many threads and test they were
1824            # all flushed.
1825            N = 1000
1826            contents = bytes(range(256)) * N
1827            sizes = cycle([1, 19])
1828            n = 0
1829            queue = deque()
1830            while n < len(contents):
1831                size = next(sizes)
1832                queue.append(contents[n:n+size])
1833                n += size
1834            del contents
1835            # We use a real file object because it allows us to
1836            # exercise situations where the GIL is released before
1837            # writing the buffer to the raw streams. This is in addition
1838            # to concurrency issues due to switching threads in the middle
1839            # of Python code.
1840            with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
1841                bufio = self.tp(raw, 8)
1842                errors = []
1843                def f():
1844                    try:
1845                        while True:
1846                            try:
1847                                s = queue.popleft()
1848                            except IndexError:
1849                                return
1850                            bufio.write(s)
1851                    except Exception as e:
1852                        errors.append(e)
1853                        raise
1854                threads = [threading.Thread(target=f) for x in range(20)]
1855                with threading_helper.start_threads(threads):
1856                    time.sleep(0.02) # yield
1857                self.assertFalse(errors,
1858                    "the following exceptions were caught: %r" % errors)
1859                bufio.close()
1860            with self.open(os_helper.TESTFN, "rb") as f:
1861                s = f.read()
1862            for i in range(256):
1863                self.assertEqual(s.count(bytes([i])), N)
1864        finally:
1865            os_helper.unlink(os_helper.TESTFN)
1866
1867    def test_misbehaved_io(self):
1868        rawio = self.MisbehavedRawIO()
1869        bufio = self.tp(rawio, 5)
1870        self.assertRaises(OSError, bufio.seek, 0)
1871        self.assertRaises(OSError, bufio.tell)
1872        self.assertRaises(OSError, bufio.write, b"abcdef")
1873
1874        # Silence destructor error
1875        bufio.close = lambda: None
1876
1877    def test_max_buffer_size_removal(self):
1878        with self.assertRaises(TypeError):
1879            self.tp(self.MockRawIO(), 8, 12)
1880
1881    def test_write_error_on_close(self):
1882        raw = self.MockRawIO()
1883        def bad_write(b):
1884            raise OSError()
1885        raw.write = bad_write
1886        b = self.tp(raw)
1887        b.write(b'spam')
1888        self.assertRaises(OSError, b.close) # exception not swallowed
1889        self.assertTrue(b.closed)
1890
1891    def test_slow_close_from_thread(self):
1892        # Issue #31976
1893        rawio = self.SlowFlushRawIO()
1894        bufio = self.tp(rawio, 8)
1895        t = threading.Thread(target=bufio.close)
1896        t.start()
1897        rawio.in_flush.wait()
1898        self.assertRaises(ValueError, bufio.write, b'spam')
1899        self.assertTrue(bufio.closed)
1900        t.join()
1901
1902
1903
1904class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
1905    tp = io.BufferedWriter
1906
1907    @skip_if_sanitizer(memory=True, address=True, reason= "sanitizer defaults to crashing "
1908                       "instead of returning NULL for malloc failure.")
1909    def test_constructor(self):
1910        BufferedWriterTest.test_constructor(self)
1911        # The allocation can succeed on 32-bit builds, e.g. with more
1912        # than 2 GiB RAM and a 64-bit kernel.
1913        if sys.maxsize > 0x7FFFFFFF:
1914            rawio = self.MockRawIO()
1915            bufio = self.tp(rawio)
1916            self.assertRaises((OverflowError, MemoryError, ValueError),
1917                bufio.__init__, rawio, sys.maxsize)
1918
1919    def test_initialization(self):
1920        rawio = self.MockRawIO()
1921        bufio = self.tp(rawio)
1922        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1923        self.assertRaises(ValueError, bufio.write, b"def")
1924        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1925        self.assertRaises(ValueError, bufio.write, b"def")
1926        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1927        self.assertRaises(ValueError, bufio.write, b"def")
1928
1929    def test_garbage_collection(self):
1930        # C BufferedWriter objects are collected, and collecting them flushes
1931        # all data to disk.
1932        # The Python version has __del__, so it ends into gc.garbage instead
1933        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1934        with warnings_helper.check_warnings(('', ResourceWarning)):
1935            rawio = self.FileIO(os_helper.TESTFN, "w+b")
1936            f = self.tp(rawio)
1937            f.write(b"123xxx")
1938            f.x = f
1939            wr = weakref.ref(f)
1940            del f
1941            support.gc_collect()
1942        self.assertIsNone(wr(), wr)
1943        with self.open(os_helper.TESTFN, "rb") as f:
1944            self.assertEqual(f.read(), b"123xxx")
1945
1946    def test_args_error(self):
1947        # Issue #17275
1948        with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1949            self.tp(io.BytesIO(), 1024, 1024, 1024)
1950
1951
1952class PyBufferedWriterTest(BufferedWriterTest):
1953    tp = pyio.BufferedWriter
1954
1955class BufferedRWPairTest(unittest.TestCase):
1956
1957    def test_constructor(self):
1958        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1959        self.assertFalse(pair.closed)
1960
1961    def test_uninitialized(self):
1962        pair = self.tp.__new__(self.tp)
1963        del pair
1964        pair = self.tp.__new__(self.tp)
1965        self.assertRaisesRegex((ValueError, AttributeError),
1966                               'uninitialized|has no attribute',
1967                               pair.read, 0)
1968        self.assertRaisesRegex((ValueError, AttributeError),
1969                               'uninitialized|has no attribute',
1970                               pair.write, b'')
1971        pair.__init__(self.MockRawIO(), self.MockRawIO())
1972        self.assertEqual(pair.read(0), b'')
1973        self.assertEqual(pair.write(b''), 0)
1974
1975    def test_detach(self):
1976        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1977        self.assertRaises(self.UnsupportedOperation, pair.detach)
1978
1979    def test_constructor_max_buffer_size_removal(self):
1980        with self.assertRaises(TypeError):
1981            self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1982
1983    def test_constructor_with_not_readable(self):
1984        class NotReadable(MockRawIO):
1985            def readable(self):
1986                return False
1987
1988        self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
1989
1990    def test_constructor_with_not_writeable(self):
1991        class NotWriteable(MockRawIO):
1992            def writable(self):
1993                return False
1994
1995        self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
1996
1997    def test_read(self):
1998        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1999
2000        self.assertEqual(pair.read(3), b"abc")
2001        self.assertEqual(pair.read(1), b"d")
2002        self.assertEqual(pair.read(), b"ef")
2003        pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
2004        self.assertEqual(pair.read(None), b"abc")
2005
2006    def test_readlines(self):
2007        pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
2008        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2009        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2010        self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
2011
2012    def test_read1(self):
2013        # .read1() is delegated to the underlying reader object, so this test
2014        # can be shallow.
2015        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2016
2017        self.assertEqual(pair.read1(3), b"abc")
2018        self.assertEqual(pair.read1(), b"def")
2019
2020    def test_readinto(self):
2021        for method in ("readinto", "readinto1"):
2022            with self.subTest(method):
2023                pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2024
2025                data = byteslike(b'\0' * 5)
2026                self.assertEqual(getattr(pair, method)(data), 5)
2027                self.assertEqual(bytes(data), b"abcde")
2028
2029    def test_write(self):
2030        w = self.MockRawIO()
2031        pair = self.tp(self.MockRawIO(), w)
2032
2033        pair.write(b"abc")
2034        pair.flush()
2035        buffer = bytearray(b"def")
2036        pair.write(buffer)
2037        buffer[:] = b"***"  # Overwrite our copy of the data
2038        pair.flush()
2039        self.assertEqual(w._write_stack, [b"abc", b"def"])
2040
2041    def test_peek(self):
2042        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2043
2044        self.assertTrue(pair.peek(3).startswith(b"abc"))
2045        self.assertEqual(pair.read(3), b"abc")
2046
2047    def test_readable(self):
2048        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2049        self.assertTrue(pair.readable())
2050
2051    def test_writeable(self):
2052        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2053        self.assertTrue(pair.writable())
2054
2055    def test_seekable(self):
2056        # BufferedRWPairs are never seekable, even if their readers and writers
2057        # are.
2058        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2059        self.assertFalse(pair.seekable())
2060
2061    # .flush() is delegated to the underlying writer object and has been
2062    # tested in the test_write method.
2063
2064    def test_close_and_closed(self):
2065        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2066        self.assertFalse(pair.closed)
2067        pair.close()
2068        self.assertTrue(pair.closed)
2069
2070    def test_reader_close_error_on_close(self):
2071        def reader_close():
2072            reader_non_existing
2073        reader = self.MockRawIO()
2074        reader.close = reader_close
2075        writer = self.MockRawIO()
2076        pair = self.tp(reader, writer)
2077        with self.assertRaises(NameError) as err:
2078            pair.close()
2079        self.assertIn('reader_non_existing', str(err.exception))
2080        self.assertTrue(pair.closed)
2081        self.assertFalse(reader.closed)
2082        self.assertTrue(writer.closed)
2083
2084        # Silence destructor error
2085        reader.close = lambda: None
2086
2087    def test_writer_close_error_on_close(self):
2088        def writer_close():
2089            writer_non_existing
2090        reader = self.MockRawIO()
2091        writer = self.MockRawIO()
2092        writer.close = writer_close
2093        pair = self.tp(reader, writer)
2094        with self.assertRaises(NameError) as err:
2095            pair.close()
2096        self.assertIn('writer_non_existing', str(err.exception))
2097        self.assertFalse(pair.closed)
2098        self.assertTrue(reader.closed)
2099        self.assertFalse(writer.closed)
2100
2101        # Silence destructor error
2102        writer.close = lambda: None
2103        writer = None
2104
2105        # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
2106        with support.catch_unraisable_exception():
2107            # Ignore BufferedRWPair unraisable exception
2108            with support.catch_unraisable_exception():
2109                pair = None
2110                support.gc_collect()
2111            support.gc_collect()
2112
2113    def test_reader_writer_close_error_on_close(self):
2114        def reader_close():
2115            reader_non_existing
2116        def writer_close():
2117            writer_non_existing
2118        reader = self.MockRawIO()
2119        reader.close = reader_close
2120        writer = self.MockRawIO()
2121        writer.close = writer_close
2122        pair = self.tp(reader, writer)
2123        with self.assertRaises(NameError) as err:
2124            pair.close()
2125        self.assertIn('reader_non_existing', str(err.exception))
2126        self.assertIsInstance(err.exception.__context__, NameError)
2127        self.assertIn('writer_non_existing', str(err.exception.__context__))
2128        self.assertFalse(pair.closed)
2129        self.assertFalse(reader.closed)
2130        self.assertFalse(writer.closed)
2131
2132        # Silence destructor error
2133        reader.close = lambda: None
2134        writer.close = lambda: None
2135
2136    def test_isatty(self):
2137        class SelectableIsAtty(MockRawIO):
2138            def __init__(self, isatty):
2139                MockRawIO.__init__(self)
2140                self._isatty = isatty
2141
2142            def isatty(self):
2143                return self._isatty
2144
2145        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2146        self.assertFalse(pair.isatty())
2147
2148        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2149        self.assertTrue(pair.isatty())
2150
2151        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2152        self.assertTrue(pair.isatty())
2153
2154        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2155        self.assertTrue(pair.isatty())
2156
2157    def test_weakref_clearing(self):
2158        brw = self.tp(self.MockRawIO(), self.MockRawIO())
2159        ref = weakref.ref(brw)
2160        brw = None
2161        ref = None # Shouldn't segfault.
2162
2163class CBufferedRWPairTest(BufferedRWPairTest):
2164    tp = io.BufferedRWPair
2165
2166class PyBufferedRWPairTest(BufferedRWPairTest):
2167    tp = pyio.BufferedRWPair
2168
2169
2170class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2171    read_mode = "rb+"
2172    write_mode = "wb+"
2173
2174    def test_constructor(self):
2175        BufferedReaderTest.test_constructor(self)
2176        BufferedWriterTest.test_constructor(self)
2177
2178    def test_uninitialized(self):
2179        BufferedReaderTest.test_uninitialized(self)
2180        BufferedWriterTest.test_uninitialized(self)
2181
2182    def test_read_and_write(self):
2183        raw = self.MockRawIO((b"asdf", b"ghjk"))
2184        rw = self.tp(raw, 8)
2185
2186        self.assertEqual(b"as", rw.read(2))
2187        rw.write(b"ddd")
2188        rw.write(b"eee")
2189        self.assertFalse(raw._write_stack) # Buffer writes
2190        self.assertEqual(b"ghjk", rw.read())
2191        self.assertEqual(b"dddeee", raw._write_stack[0])
2192
2193    def test_seek_and_tell(self):
2194        raw = self.BytesIO(b"asdfghjkl")
2195        rw = self.tp(raw)
2196
2197        self.assertEqual(b"as", rw.read(2))
2198        self.assertEqual(2, rw.tell())
2199        rw.seek(0, 0)
2200        self.assertEqual(b"asdf", rw.read(4))
2201
2202        rw.write(b"123f")
2203        rw.seek(0, 0)
2204        self.assertEqual(b"asdf123fl", rw.read())
2205        self.assertEqual(9, rw.tell())
2206        rw.seek(-4, 2)
2207        self.assertEqual(5, rw.tell())
2208        rw.seek(2, 1)
2209        self.assertEqual(7, rw.tell())
2210        self.assertEqual(b"fl", rw.read(11))
2211        rw.flush()
2212        self.assertEqual(b"asdf123fl", raw.getvalue())
2213
2214        self.assertRaises(TypeError, rw.seek, 0.0)
2215
2216    def check_flush_and_read(self, read_func):
2217        raw = self.BytesIO(b"abcdefghi")
2218        bufio = self.tp(raw)
2219
2220        self.assertEqual(b"ab", read_func(bufio, 2))
2221        bufio.write(b"12")
2222        self.assertEqual(b"ef", read_func(bufio, 2))
2223        self.assertEqual(6, bufio.tell())
2224        bufio.flush()
2225        self.assertEqual(6, bufio.tell())
2226        self.assertEqual(b"ghi", read_func(bufio))
2227        raw.seek(0, 0)
2228        raw.write(b"XYZ")
2229        # flush() resets the read buffer
2230        bufio.flush()
2231        bufio.seek(0, 0)
2232        self.assertEqual(b"XYZ", read_func(bufio, 3))
2233
2234    def test_flush_and_read(self):
2235        self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2236
2237    def test_flush_and_readinto(self):
2238        def _readinto(bufio, n=-1):
2239            b = bytearray(n if n >= 0 else 9999)
2240            n = bufio.readinto(b)
2241            return bytes(b[:n])
2242        self.check_flush_and_read(_readinto)
2243
2244    def test_flush_and_peek(self):
2245        def _peek(bufio, n=-1):
2246            # This relies on the fact that the buffer can contain the whole
2247            # raw stream, otherwise peek() can return less.
2248            b = bufio.peek(n)
2249            if n != -1:
2250                b = b[:n]
2251            bufio.seek(len(b), 1)
2252            return b
2253        self.check_flush_and_read(_peek)
2254
2255    def test_flush_and_write(self):
2256        raw = self.BytesIO(b"abcdefghi")
2257        bufio = self.tp(raw)
2258
2259        bufio.write(b"123")
2260        bufio.flush()
2261        bufio.write(b"45")
2262        bufio.flush()
2263        bufio.seek(0, 0)
2264        self.assertEqual(b"12345fghi", raw.getvalue())
2265        self.assertEqual(b"12345fghi", bufio.read())
2266
2267    def test_threads(self):
2268        BufferedReaderTest.test_threads(self)
2269        BufferedWriterTest.test_threads(self)
2270
2271    def test_writes_and_peek(self):
2272        def _peek(bufio):
2273            bufio.peek(1)
2274        self.check_writes(_peek)
2275        def _peek(bufio):
2276            pos = bufio.tell()
2277            bufio.seek(-1, 1)
2278            bufio.peek(1)
2279            bufio.seek(pos, 0)
2280        self.check_writes(_peek)
2281
2282    def test_writes_and_reads(self):
2283        def _read(bufio):
2284            bufio.seek(-1, 1)
2285            bufio.read(1)
2286        self.check_writes(_read)
2287
2288    def test_writes_and_read1s(self):
2289        def _read1(bufio):
2290            bufio.seek(-1, 1)
2291            bufio.read1(1)
2292        self.check_writes(_read1)
2293
2294    def test_writes_and_readintos(self):
2295        def _read(bufio):
2296            bufio.seek(-1, 1)
2297            bufio.readinto(bytearray(1))
2298        self.check_writes(_read)
2299
2300    def test_write_after_readahead(self):
2301        # Issue #6629: writing after the buffer was filled by readahead should
2302        # first rewind the raw stream.
2303        for overwrite_size in [1, 5]:
2304            raw = self.BytesIO(b"A" * 10)
2305            bufio = self.tp(raw, 4)
2306            # Trigger readahead
2307            self.assertEqual(bufio.read(1), b"A")
2308            self.assertEqual(bufio.tell(), 1)
2309            # Overwriting should rewind the raw stream if it needs so
2310            bufio.write(b"B" * overwrite_size)
2311            self.assertEqual(bufio.tell(), overwrite_size + 1)
2312            # If the write size was smaller than the buffer size, flush() and
2313            # check that rewind happens.
2314            bufio.flush()
2315            self.assertEqual(bufio.tell(), overwrite_size + 1)
2316            s = raw.getvalue()
2317            self.assertEqual(s,
2318                b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2319
2320    def test_write_rewind_write(self):
2321        # Various combinations of reading / writing / seeking backwards / writing again
2322        def mutate(bufio, pos1, pos2):
2323            assert pos2 >= pos1
2324            # Fill the buffer
2325            bufio.seek(pos1)
2326            bufio.read(pos2 - pos1)
2327            bufio.write(b'\x02')
2328            # This writes earlier than the previous write, but still inside
2329            # the buffer.
2330            bufio.seek(pos1)
2331            bufio.write(b'\x01')
2332
2333        b = b"\x80\x81\x82\x83\x84"
2334        for i in range(0, len(b)):
2335            for j in range(i, len(b)):
2336                raw = self.BytesIO(b)
2337                bufio = self.tp(raw, 100)
2338                mutate(bufio, i, j)
2339                bufio.flush()
2340                expected = bytearray(b)
2341                expected[j] = 2
2342                expected[i] = 1
2343                self.assertEqual(raw.getvalue(), expected,
2344                                 "failed result for i=%d, j=%d" % (i, j))
2345
2346    def test_truncate_after_read_or_write(self):
2347        raw = self.BytesIO(b"A" * 10)
2348        bufio = self.tp(raw, 100)
2349        self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2350        self.assertEqual(bufio.truncate(), 2)
2351        self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2352        self.assertEqual(bufio.truncate(), 4)
2353
2354    def test_misbehaved_io(self):
2355        BufferedReaderTest.test_misbehaved_io(self)
2356        BufferedWriterTest.test_misbehaved_io(self)
2357
2358    def test_interleaved_read_write(self):
2359        # Test for issue #12213
2360        with self.BytesIO(b'abcdefgh') as raw:
2361            with self.tp(raw, 100) as f:
2362                f.write(b"1")
2363                self.assertEqual(f.read(1), b'b')
2364                f.write(b'2')
2365                self.assertEqual(f.read1(1), b'd')
2366                f.write(b'3')
2367                buf = bytearray(1)
2368                f.readinto(buf)
2369                self.assertEqual(buf, b'f')
2370                f.write(b'4')
2371                self.assertEqual(f.peek(1), b'h')
2372                f.flush()
2373                self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2374
2375        with self.BytesIO(b'abc') as raw:
2376            with self.tp(raw, 100) as f:
2377                self.assertEqual(f.read(1), b'a')
2378                f.write(b"2")
2379                self.assertEqual(f.read(1), b'c')
2380                f.flush()
2381                self.assertEqual(raw.getvalue(), b'a2c')
2382
2383    def test_interleaved_readline_write(self):
2384        with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2385            with self.tp(raw) as f:
2386                f.write(b'1')
2387                self.assertEqual(f.readline(), b'b\n')
2388                f.write(b'2')
2389                self.assertEqual(f.readline(), b'def\n')
2390                f.write(b'3')
2391                self.assertEqual(f.readline(), b'\n')
2392                f.flush()
2393                self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2394
2395    # You can't construct a BufferedRandom over a non-seekable stream.
2396    test_unseekable = None
2397
2398    # writable() returns True, so there's no point to test it over
2399    # a writable stream.
2400    test_truncate_on_read_only = None
2401
2402
2403class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
2404    tp = io.BufferedRandom
2405
2406    @skip_if_sanitizer(memory=True, address=True, reason= "sanitizer defaults to crashing "
2407                       "instead of returning NULL for malloc failure.")
2408    def test_constructor(self):
2409        BufferedRandomTest.test_constructor(self)
2410        # The allocation can succeed on 32-bit builds, e.g. with more
2411        # than 2 GiB RAM and a 64-bit kernel.
2412        if sys.maxsize > 0x7FFFFFFF:
2413            rawio = self.MockRawIO()
2414            bufio = self.tp(rawio)
2415            self.assertRaises((OverflowError, MemoryError, ValueError),
2416                bufio.__init__, rawio, sys.maxsize)
2417
2418    def test_garbage_collection(self):
2419        CBufferedReaderTest.test_garbage_collection(self)
2420        CBufferedWriterTest.test_garbage_collection(self)
2421
2422    def test_args_error(self):
2423        # Issue #17275
2424        with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2425            self.tp(io.BytesIO(), 1024, 1024, 1024)
2426
2427
2428class PyBufferedRandomTest(BufferedRandomTest):
2429    tp = pyio.BufferedRandom
2430
2431
2432# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2433# properties:
2434#   - A single output character can correspond to many bytes of input.
2435#   - The number of input bytes to complete the character can be
2436#     undetermined until the last input byte is received.
2437#   - The number of input bytes can vary depending on previous input.
2438#   - A single input byte can correspond to many characters of output.
2439#   - The number of output characters can be undetermined until the
2440#     last input byte is received.
2441#   - The number of output characters can vary depending on previous input.
2442
2443class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2444    """
2445    For testing seek/tell behavior with a stateful, buffering decoder.
2446
2447    Input is a sequence of words.  Words may be fixed-length (length set
2448    by input) or variable-length (period-terminated).  In variable-length
2449    mode, extra periods are ignored.  Possible words are:
2450      - 'i' followed by a number sets the input length, I (maximum 99).
2451        When I is set to 0, words are space-terminated.
2452      - 'o' followed by a number sets the output length, O (maximum 99).
2453      - Any other word is converted into a word followed by a period on
2454        the output.  The output word consists of the input word truncated
2455        or padded out with hyphens to make its length equal to O.  If O
2456        is 0, the word is output verbatim without truncating or padding.
2457    I and O are initially set to 1.  When I changes, any buffered input is
2458    re-scanned according to the new I.  EOF also terminates the last word.
2459    """
2460
2461    def __init__(self, errors='strict'):
2462        codecs.IncrementalDecoder.__init__(self, errors)
2463        self.reset()
2464
2465    def __repr__(self):
2466        return '<SID %x>' % id(self)
2467
2468    def reset(self):
2469        self.i = 1
2470        self.o = 1
2471        self.buffer = bytearray()
2472
2473    def getstate(self):
2474        i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2475        return bytes(self.buffer), i*100 + o
2476
2477    def setstate(self, state):
2478        buffer, io = state
2479        self.buffer = bytearray(buffer)
2480        i, o = divmod(io, 100)
2481        self.i, self.o = i ^ 1, o ^ 1
2482
2483    def decode(self, input, final=False):
2484        output = ''
2485        for b in input:
2486            if self.i == 0: # variable-length, terminated with period
2487                if b == ord('.'):
2488                    if self.buffer:
2489                        output += self.process_word()
2490                else:
2491                    self.buffer.append(b)
2492            else: # fixed-length, terminate after self.i bytes
2493                self.buffer.append(b)
2494                if len(self.buffer) == self.i:
2495                    output += self.process_word()
2496        if final and self.buffer: # EOF terminates the last word
2497            output += self.process_word()
2498        return output
2499
2500    def process_word(self):
2501        output = ''
2502        if self.buffer[0] == ord('i'):
2503            self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2504        elif self.buffer[0] == ord('o'):
2505            self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2506        else:
2507            output = self.buffer.decode('ascii')
2508            if len(output) < self.o:
2509                output += '-'*self.o # pad out with hyphens
2510            if self.o:
2511                output = output[:self.o] # truncate to output length
2512            output += '.'
2513        self.buffer = bytearray()
2514        return output
2515
2516    codecEnabled = False
2517
2518
2519# bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak
2520# when registering codecs and cleanup functions.
2521def lookupTestDecoder(name):
2522    if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder':
2523        latin1 = codecs.lookup('latin-1')
2524        return codecs.CodecInfo(
2525            name='test_decoder', encode=latin1.encode, decode=None,
2526            incrementalencoder=None,
2527            streamreader=None, streamwriter=None,
2528            incrementaldecoder=StatefulIncrementalDecoder)
2529
2530
2531class StatefulIncrementalDecoderTest(unittest.TestCase):
2532    """
2533    Make sure the StatefulIncrementalDecoder actually works.
2534    """
2535
2536    test_cases = [
2537        # I=1, O=1 (fixed-length input == fixed-length output)
2538        (b'abcd', False, 'a.b.c.d.'),
2539        # I=0, O=0 (variable-length input, variable-length output)
2540        (b'oiabcd', True, 'abcd.'),
2541        # I=0, O=0 (should ignore extra periods)
2542        (b'oi...abcd...', True, 'abcd.'),
2543        # I=0, O=6 (variable-length input, fixed-length output)
2544        (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2545        # I=2, O=6 (fixed-length input < fixed-length output)
2546        (b'i.i2.o6xyz', True, 'xy----.z-----.'),
2547        # I=6, O=3 (fixed-length input > fixed-length output)
2548        (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2549        # I=0, then 3; O=29, then 15 (with longer output)
2550        (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2551         'a----------------------------.' +
2552         'b----------------------------.' +
2553         'cde--------------------------.' +
2554         'abcdefghijabcde.' +
2555         'a.b------------.' +
2556         '.c.------------.' +
2557         'd.e------------.' +
2558         'k--------------.' +
2559         'l--------------.' +
2560         'm--------------.')
2561    ]
2562
2563    def test_decoder(self):
2564        # Try a few one-shot test cases.
2565        for input, eof, output in self.test_cases:
2566            d = StatefulIncrementalDecoder()
2567            self.assertEqual(d.decode(input, eof), output)
2568
2569        # Also test an unfinished decode, followed by forcing EOF.
2570        d = StatefulIncrementalDecoder()
2571        self.assertEqual(d.decode(b'oiabcd'), '')
2572        self.assertEqual(d.decode(b'', 1), 'abcd.')
2573
2574class TextIOWrapperTest(unittest.TestCase):
2575
2576    def setUp(self):
2577        self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2578        self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
2579        os_helper.unlink(os_helper.TESTFN)
2580        codecs.register(lookupTestDecoder)
2581        self.addCleanup(codecs.unregister, lookupTestDecoder)
2582
2583    def tearDown(self):
2584        os_helper.unlink(os_helper.TESTFN)
2585
2586    def test_constructor(self):
2587        r = self.BytesIO(b"\xc3\xa9\n\n")
2588        b = self.BufferedReader(r, 1000)
2589        t = self.TextIOWrapper(b, encoding="utf-8")
2590        t.__init__(b, encoding="latin-1", newline="\r\n")
2591        self.assertEqual(t.encoding, "latin-1")
2592        self.assertEqual(t.line_buffering, False)
2593        t.__init__(b, encoding="utf-8", line_buffering=True)
2594        self.assertEqual(t.encoding, "utf-8")
2595        self.assertEqual(t.line_buffering, True)
2596        self.assertEqual("\xe9\n", t.readline())
2597        self.assertRaises(TypeError, t.__init__, b, encoding="utf-8", newline=42)
2598        self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
2599
2600    def test_uninitialized(self):
2601        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2602        del t
2603        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2604        self.assertRaises(Exception, repr, t)
2605        self.assertRaisesRegex((ValueError, AttributeError),
2606                               'uninitialized|has no attribute',
2607                               t.read, 0)
2608        t.__init__(self.MockRawIO(), encoding="utf-8")
2609        self.assertEqual(t.read(0), '')
2610
2611    def test_non_text_encoding_codecs_are_rejected(self):
2612        # Ensure the constructor complains if passed a codec that isn't
2613        # marked as a text encoding
2614        # http://bugs.python.org/issue20404
2615        r = self.BytesIO()
2616        b = self.BufferedWriter(r)
2617        with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2618            self.TextIOWrapper(b, encoding="hex")
2619
2620    def test_detach(self):
2621        r = self.BytesIO()
2622        b = self.BufferedWriter(r)
2623        t = self.TextIOWrapper(b, encoding="ascii")
2624        self.assertIs(t.detach(), b)
2625
2626        t = self.TextIOWrapper(b, encoding="ascii")
2627        t.write("howdy")
2628        self.assertFalse(r.getvalue())
2629        t.detach()
2630        self.assertEqual(r.getvalue(), b"howdy")
2631        self.assertRaises(ValueError, t.detach)
2632
2633        # Operations independent of the detached stream should still work
2634        repr(t)
2635        self.assertEqual(t.encoding, "ascii")
2636        self.assertEqual(t.errors, "strict")
2637        self.assertFalse(t.line_buffering)
2638        self.assertFalse(t.write_through)
2639
2640    def test_repr(self):
2641        raw = self.BytesIO("hello".encode("utf-8"))
2642        b = self.BufferedReader(raw)
2643        t = self.TextIOWrapper(b, encoding="utf-8")
2644        modname = self.TextIOWrapper.__module__
2645        self.assertRegex(repr(t),
2646                         r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
2647        raw.name = "dummy"
2648        self.assertRegex(repr(t),
2649                         r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2650        t.mode = "r"
2651        self.assertRegex(repr(t),
2652                         r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
2653        raw.name = b"dummy"
2654        self.assertRegex(repr(t),
2655                         r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
2656
2657        t.buffer.detach()
2658        repr(t)  # Should not raise an exception
2659
2660    def test_recursive_repr(self):
2661        # Issue #25455
2662        raw = self.BytesIO()
2663        t = self.TextIOWrapper(raw, encoding="utf-8")
2664        with support.swap_attr(raw, 'name', t):
2665            try:
2666                repr(t)  # Should not crash
2667            except RuntimeError:
2668                pass
2669
2670    def test_line_buffering(self):
2671        r = self.BytesIO()
2672        b = self.BufferedWriter(r, 1000)
2673        t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True)
2674        t.write("X")
2675        self.assertEqual(r.getvalue(), b"")  # No flush happened
2676        t.write("Y\nZ")
2677        self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
2678        t.write("A\rB")
2679        self.assertEqual(r.getvalue(), b"XY\nZA\rB")
2680
2681    def test_reconfigure_line_buffering(self):
2682        r = self.BytesIO()
2683        b = self.BufferedWriter(r, 1000)
2684        t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False)
2685        t.write("AB\nC")
2686        self.assertEqual(r.getvalue(), b"")
2687
2688        t.reconfigure(line_buffering=True)   # implicit flush
2689        self.assertEqual(r.getvalue(), b"AB\nC")
2690        t.write("DEF\nG")
2691        self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2692        t.write("H")
2693        self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2694        t.reconfigure(line_buffering=False)   # implicit flush
2695        self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2696        t.write("IJ")
2697        self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2698
2699        # Keeping default value
2700        t.reconfigure()
2701        t.reconfigure(line_buffering=None)
2702        self.assertEqual(t.line_buffering, False)
2703        t.reconfigure(line_buffering=True)
2704        t.reconfigure()
2705        t.reconfigure(line_buffering=None)
2706        self.assertEqual(t.line_buffering, True)
2707
2708    @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
2709    def test_default_encoding(self):
2710        old_environ = dict(os.environ)
2711        try:
2712            # try to get a user preferred encoding different than the current
2713            # locale encoding to check that TextIOWrapper() uses the current
2714            # locale encoding and not the user preferred encoding
2715            for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2716                if key in os.environ:
2717                    del os.environ[key]
2718
2719            current_locale_encoding = locale.getpreferredencoding(False)
2720            b = self.BytesIO()
2721            with warnings.catch_warnings():
2722                warnings.simplefilter("ignore", EncodingWarning)
2723                t = self.TextIOWrapper(b)
2724            self.assertEqual(t.encoding, current_locale_encoding)
2725        finally:
2726            os.environ.clear()
2727            os.environ.update(old_environ)
2728
2729    @support.cpython_only
2730    @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
2731    def test_device_encoding(self):
2732        # Issue 15989
2733        import _testcapi
2734        b = self.BytesIO()
2735        b.fileno = lambda: _testcapi.INT_MAX + 1
2736        self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale")
2737        b.fileno = lambda: _testcapi.UINT_MAX + 1
2738        self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale")
2739
2740    def test_encoding(self):
2741        # Check the encoding attribute is always set, and valid
2742        b = self.BytesIO()
2743        t = self.TextIOWrapper(b, encoding="utf-8")
2744        self.assertEqual(t.encoding, "utf-8")
2745        with warnings.catch_warnings():
2746            warnings.simplefilter("ignore", EncodingWarning)
2747            t = self.TextIOWrapper(b)
2748        self.assertIsNotNone(t.encoding)
2749        codecs.lookup(t.encoding)
2750
2751    def test_encoding_errors_reading(self):
2752        # (1) default
2753        b = self.BytesIO(b"abc\n\xff\n")
2754        t = self.TextIOWrapper(b, encoding="ascii")
2755        self.assertRaises(UnicodeError, t.read)
2756        # (2) explicit strict
2757        b = self.BytesIO(b"abc\n\xff\n")
2758        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2759        self.assertRaises(UnicodeError, t.read)
2760        # (3) ignore
2761        b = self.BytesIO(b"abc\n\xff\n")
2762        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
2763        self.assertEqual(t.read(), "abc\n\n")
2764        # (4) replace
2765        b = self.BytesIO(b"abc\n\xff\n")
2766        t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
2767        self.assertEqual(t.read(), "abc\n\ufffd\n")
2768
2769    def test_encoding_errors_writing(self):
2770        # (1) default
2771        b = self.BytesIO()
2772        t = self.TextIOWrapper(b, encoding="ascii")
2773        self.assertRaises(UnicodeError, t.write, "\xff")
2774        # (2) explicit strict
2775        b = self.BytesIO()
2776        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2777        self.assertRaises(UnicodeError, t.write, "\xff")
2778        # (3) ignore
2779        b = self.BytesIO()
2780        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
2781                             newline="\n")
2782        t.write("abc\xffdef\n")
2783        t.flush()
2784        self.assertEqual(b.getvalue(), b"abcdef\n")
2785        # (4) replace
2786        b = self.BytesIO()
2787        t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
2788                             newline="\n")
2789        t.write("abc\xffdef\n")
2790        t.flush()
2791        self.assertEqual(b.getvalue(), b"abc?def\n")
2792
2793    def test_newlines(self):
2794        input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2795
2796        tests = [
2797            [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2798            [ '', input_lines ],
2799            [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2800            [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2801            [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2802        ]
2803        encodings = (
2804            'utf-8', 'latin-1',
2805            'utf-16', 'utf-16-le', 'utf-16-be',
2806            'utf-32', 'utf-32-le', 'utf-32-be',
2807        )
2808
2809        # Try a range of buffer sizes to test the case where \r is the last
2810        # character in TextIOWrapper._pending_line.
2811        for encoding in encodings:
2812            # XXX: str.encode() should return bytes
2813            data = bytes(''.join(input_lines).encode(encoding))
2814            for do_reads in (False, True):
2815                for bufsize in range(1, 10):
2816                    for newline, exp_lines in tests:
2817                        bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2818                        textio = self.TextIOWrapper(bufio, newline=newline,
2819                                                  encoding=encoding)
2820                        if do_reads:
2821                            got_lines = []
2822                            while True:
2823                                c2 = textio.read(2)
2824                                if c2 == '':
2825                                    break
2826                                self.assertEqual(len(c2), 2)
2827                                got_lines.append(c2 + textio.readline())
2828                        else:
2829                            got_lines = list(textio)
2830
2831                        for got_line, exp_line in zip(got_lines, exp_lines):
2832                            self.assertEqual(got_line, exp_line)
2833                        self.assertEqual(len(got_lines), len(exp_lines))
2834
2835    def test_newlines_input(self):
2836        testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
2837        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2838        for newline, expected in [
2839            (None, normalized.decode("ascii").splitlines(keepends=True)),
2840            ("", testdata.decode("ascii").splitlines(keepends=True)),
2841            ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2842            ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2843            ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
2844            ]:
2845            buf = self.BytesIO(testdata)
2846            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2847            self.assertEqual(txt.readlines(), expected)
2848            txt.seek(0)
2849            self.assertEqual(txt.read(), "".join(expected))
2850
2851    def test_newlines_output(self):
2852        testdict = {
2853            "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2854            "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2855            "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2856            "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2857            }
2858        tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2859        for newline, expected in tests:
2860            buf = self.BytesIO()
2861            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2862            txt.write("AAA\nB")
2863            txt.write("BB\nCCC\n")
2864            txt.write("X\rY\r\nZ")
2865            txt.flush()
2866            self.assertEqual(buf.closed, False)
2867            self.assertEqual(buf.getvalue(), expected)
2868
2869    def test_destructor(self):
2870        l = []
2871        base = self.BytesIO
2872        class MyBytesIO(base):
2873            def close(self):
2874                l.append(self.getvalue())
2875                base.close(self)
2876        b = MyBytesIO()
2877        t = self.TextIOWrapper(b, encoding="ascii")
2878        t.write("abc")
2879        del t
2880        support.gc_collect()
2881        self.assertEqual([b"abc"], l)
2882
2883    def test_override_destructor(self):
2884        record = []
2885        class MyTextIO(self.TextIOWrapper):
2886            def __del__(self):
2887                record.append(1)
2888                try:
2889                    f = super().__del__
2890                except AttributeError:
2891                    pass
2892                else:
2893                    f()
2894            def close(self):
2895                record.append(2)
2896                super().close()
2897            def flush(self):
2898                record.append(3)
2899                super().flush()
2900        b = self.BytesIO()
2901        t = MyTextIO(b, encoding="ascii")
2902        del t
2903        support.gc_collect()
2904        self.assertEqual(record, [1, 2, 3])
2905
2906    def test_error_through_destructor(self):
2907        # Test that the exception state is not modified by a destructor,
2908        # even if close() fails.
2909        rawio = self.CloseFailureIO()
2910        with support.catch_unraisable_exception() as cm:
2911            with self.assertRaises(AttributeError):
2912                self.TextIOWrapper(rawio, encoding="utf-8").xyzzy
2913
2914            if not IOBASE_EMITS_UNRAISABLE:
2915                self.assertIsNone(cm.unraisable)
2916            elif cm.unraisable is not None:
2917                self.assertEqual(cm.unraisable.exc_type, OSError)
2918
2919    # Systematic tests of the text I/O API
2920
2921    def test_basic_io(self):
2922        for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2923            for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
2924                f = self.open(os_helper.TESTFN, "w+", encoding=enc)
2925                f._CHUNK_SIZE = chunksize
2926                self.assertEqual(f.write("abc"), 3)
2927                f.close()
2928                f = self.open(os_helper.TESTFN, "r+", encoding=enc)
2929                f._CHUNK_SIZE = chunksize
2930                self.assertEqual(f.tell(), 0)
2931                self.assertEqual(f.read(), "abc")
2932                cookie = f.tell()
2933                self.assertEqual(f.seek(0), 0)
2934                self.assertEqual(f.read(None), "abc")
2935                f.seek(0)
2936                self.assertEqual(f.read(2), "ab")
2937                self.assertEqual(f.read(1), "c")
2938                self.assertEqual(f.read(1), "")
2939                self.assertEqual(f.read(), "")
2940                self.assertEqual(f.tell(), cookie)
2941                self.assertEqual(f.seek(0), 0)
2942                self.assertEqual(f.seek(0, 2), cookie)
2943                self.assertEqual(f.write("def"), 3)
2944                self.assertEqual(f.seek(cookie), cookie)
2945                self.assertEqual(f.read(), "def")
2946                if enc.startswith("utf"):
2947                    self.multi_line_test(f, enc)
2948                f.close()
2949
2950    def multi_line_test(self, f, enc):
2951        f.seek(0)
2952        f.truncate()
2953        sample = "s\xff\u0fff\uffff"
2954        wlines = []
2955        for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2956            chars = []
2957            for i in range(size):
2958                chars.append(sample[i % len(sample)])
2959            line = "".join(chars) + "\n"
2960            wlines.append((f.tell(), line))
2961            f.write(line)
2962        f.seek(0)
2963        rlines = []
2964        while True:
2965            pos = f.tell()
2966            line = f.readline()
2967            if not line:
2968                break
2969            rlines.append((pos, line))
2970        self.assertEqual(rlines, wlines)
2971
2972    def test_telling(self):
2973        f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
2974        p0 = f.tell()
2975        f.write("\xff\n")
2976        p1 = f.tell()
2977        f.write("\xff\n")
2978        p2 = f.tell()
2979        f.seek(0)
2980        self.assertEqual(f.tell(), p0)
2981        self.assertEqual(f.readline(), "\xff\n")
2982        self.assertEqual(f.tell(), p1)
2983        self.assertEqual(f.readline(), "\xff\n")
2984        self.assertEqual(f.tell(), p2)
2985        f.seek(0)
2986        for line in f:
2987            self.assertEqual(line, "\xff\n")
2988            self.assertRaises(OSError, f.tell)
2989        self.assertEqual(f.tell(), p2)
2990        f.close()
2991
2992    def test_seeking(self):
2993        chunk_size = _default_chunk_size()
2994        prefix_size = chunk_size - 2
2995        u_prefix = "a" * prefix_size
2996        prefix = bytes(u_prefix.encode("utf-8"))
2997        self.assertEqual(len(u_prefix), len(prefix))
2998        u_suffix = "\u8888\n"
2999        suffix = bytes(u_suffix.encode("utf-8"))
3000        line = prefix + suffix
3001        with self.open(os_helper.TESTFN, "wb") as f:
3002            f.write(line*2)
3003        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
3004            s = f.read(prefix_size)
3005            self.assertEqual(s, str(prefix, "ascii"))
3006            self.assertEqual(f.tell(), prefix_size)
3007            self.assertEqual(f.readline(), u_suffix)
3008
3009    def test_seeking_too(self):
3010        # Regression test for a specific bug
3011        data = b'\xe0\xbf\xbf\n'
3012        with self.open(os_helper.TESTFN, "wb") as f:
3013            f.write(data)
3014        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
3015            f._CHUNK_SIZE  # Just test that it exists
3016            f._CHUNK_SIZE = 2
3017            f.readline()
3018            f.tell()
3019
3020    def test_seek_and_tell(self):
3021        #Test seek/tell using the StatefulIncrementalDecoder.
3022        # Make test faster by doing smaller seeks
3023        CHUNK_SIZE = 128
3024
3025        def test_seek_and_tell_with_data(data, min_pos=0):
3026            """Tell/seek to various points within a data stream and ensure
3027            that the decoded data returned by read() is consistent."""
3028            f = self.open(os_helper.TESTFN, 'wb')
3029            f.write(data)
3030            f.close()
3031            f = self.open(os_helper.TESTFN, encoding='test_decoder')
3032            f._CHUNK_SIZE = CHUNK_SIZE
3033            decoded = f.read()
3034            f.close()
3035
3036            for i in range(min_pos, len(decoded) + 1): # seek positions
3037                for j in [1, 5, len(decoded) - i]: # read lengths
3038                    f = self.open(os_helper.TESTFN, encoding='test_decoder')
3039                    self.assertEqual(f.read(i), decoded[:i])
3040                    cookie = f.tell()
3041                    self.assertEqual(f.read(j), decoded[i:i + j])
3042                    f.seek(cookie)
3043                    self.assertEqual(f.read(), decoded[i:])
3044                    f.close()
3045
3046        # Enable the test decoder.
3047        StatefulIncrementalDecoder.codecEnabled = 1
3048
3049        # Run the tests.
3050        try:
3051            # Try each test case.
3052            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3053                test_seek_and_tell_with_data(input)
3054
3055            # Position each test case so that it crosses a chunk boundary.
3056            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3057                offset = CHUNK_SIZE - len(input)//2
3058                prefix = b'.'*offset
3059                # Don't bother seeking into the prefix (takes too long).
3060                min_pos = offset*2
3061                test_seek_and_tell_with_data(prefix + input, min_pos)
3062
3063        # Ensure our test decoder won't interfere with subsequent tests.
3064        finally:
3065            StatefulIncrementalDecoder.codecEnabled = 0
3066
3067    def test_multibyte_seek_and_tell(self):
3068        f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
3069        f.write("AB\n\u3046\u3048\n")
3070        f.close()
3071
3072        f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
3073        self.assertEqual(f.readline(), "AB\n")
3074        p0 = f.tell()
3075        self.assertEqual(f.readline(), "\u3046\u3048\n")
3076        p1 = f.tell()
3077        f.seek(p0)
3078        self.assertEqual(f.readline(), "\u3046\u3048\n")
3079        self.assertEqual(f.tell(), p1)
3080        f.close()
3081
3082    def test_seek_with_encoder_state(self):
3083        f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
3084        f.write("\u00e6\u0300")
3085        p0 = f.tell()
3086        f.write("\u00e6")
3087        f.seek(p0)
3088        f.write("\u0300")
3089        f.close()
3090
3091        f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
3092        self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3093        f.close()
3094
3095    def test_encoded_writes(self):
3096        data = "1234567890"
3097        tests = ("utf-16",
3098                 "utf-16-le",
3099                 "utf-16-be",
3100                 "utf-32",
3101                 "utf-32-le",
3102                 "utf-32-be")
3103        for encoding in tests:
3104            buf = self.BytesIO()
3105            f = self.TextIOWrapper(buf, encoding=encoding)
3106            # Check if the BOM is written only once (see issue1753).
3107            f.write(data)
3108            f.write(data)
3109            f.seek(0)
3110            self.assertEqual(f.read(), data * 2)
3111            f.seek(0)
3112            self.assertEqual(f.read(), data * 2)
3113            self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
3114
3115    def test_unreadable(self):
3116        class UnReadable(self.BytesIO):
3117            def readable(self):
3118                return False
3119        txt = self.TextIOWrapper(UnReadable(), encoding="utf-8")
3120        self.assertRaises(OSError, txt.read)
3121
3122    def test_read_one_by_one(self):
3123        txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8")
3124        reads = ""
3125        while True:
3126            c = txt.read(1)
3127            if not c:
3128                break
3129            reads += c
3130        self.assertEqual(reads, "AA\nBB")
3131
3132    def test_readlines(self):
3133        txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8")
3134        self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3135        txt.seek(0)
3136        self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3137        txt.seek(0)
3138        self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3139
3140    # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
3141    def test_read_by_chunk(self):
3142        # make sure "\r\n" straddles 128 char boundary.
3143        txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8")
3144        reads = ""
3145        while True:
3146            c = txt.read(128)
3147            if not c:
3148                break
3149            reads += c
3150        self.assertEqual(reads, "A"*127+"\nB")
3151
3152    def test_writelines(self):
3153        l = ['ab', 'cd', 'ef']
3154        buf = self.BytesIO()
3155        txt = self.TextIOWrapper(buf, encoding="utf-8")
3156        txt.writelines(l)
3157        txt.flush()
3158        self.assertEqual(buf.getvalue(), b'abcdef')
3159
3160    def test_writelines_userlist(self):
3161        l = UserList(['ab', 'cd', 'ef'])
3162        buf = self.BytesIO()
3163        txt = self.TextIOWrapper(buf, encoding="utf-8")
3164        txt.writelines(l)
3165        txt.flush()
3166        self.assertEqual(buf.getvalue(), b'abcdef')
3167
3168    def test_writelines_error(self):
3169        txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8")
3170        self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3171        self.assertRaises(TypeError, txt.writelines, None)
3172        self.assertRaises(TypeError, txt.writelines, b'abc')
3173
3174    def test_issue1395_1(self):
3175        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3176
3177        # read one char at a time
3178        reads = ""
3179        while True:
3180            c = txt.read(1)
3181            if not c:
3182                break
3183            reads += c
3184        self.assertEqual(reads, self.normalized)
3185
3186    def test_issue1395_2(self):
3187        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3188        txt._CHUNK_SIZE = 4
3189
3190        reads = ""
3191        while True:
3192            c = txt.read(4)
3193            if not c:
3194                break
3195            reads += c
3196        self.assertEqual(reads, self.normalized)
3197
3198    def test_issue1395_3(self):
3199        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3200        txt._CHUNK_SIZE = 4
3201
3202        reads = txt.read(4)
3203        reads += txt.read(4)
3204        reads += txt.readline()
3205        reads += txt.readline()
3206        reads += txt.readline()
3207        self.assertEqual(reads, self.normalized)
3208
3209    def test_issue1395_4(self):
3210        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3211        txt._CHUNK_SIZE = 4
3212
3213        reads = txt.read(4)
3214        reads += txt.read()
3215        self.assertEqual(reads, self.normalized)
3216
3217    def test_issue1395_5(self):
3218        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3219        txt._CHUNK_SIZE = 4
3220
3221        reads = txt.read(4)
3222        pos = txt.tell()
3223        txt.seek(0)
3224        txt.seek(pos)
3225        self.assertEqual(txt.read(4), "BBB\n")
3226
3227    def test_issue2282(self):
3228        buffer = self.BytesIO(self.testdata)
3229        txt = self.TextIOWrapper(buffer, encoding="ascii")
3230
3231        self.assertEqual(buffer.seekable(), txt.seekable())
3232
3233    def test_append_bom(self):
3234        # The BOM is not written again when appending to a non-empty file
3235        filename = os_helper.TESTFN
3236        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3237            with self.open(filename, 'w', encoding=charset) as f:
3238                f.write('aaa')
3239                pos = f.tell()
3240            with self.open(filename, 'rb') as f:
3241                self.assertEqual(f.read(), 'aaa'.encode(charset))
3242
3243            with self.open(filename, 'a', encoding=charset) as f:
3244                f.write('xxx')
3245            with self.open(filename, 'rb') as f:
3246                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3247
3248    def test_seek_bom(self):
3249        # Same test, but when seeking manually
3250        filename = os_helper.TESTFN
3251        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3252            with self.open(filename, 'w', encoding=charset) as f:
3253                f.write('aaa')
3254                pos = f.tell()
3255            with self.open(filename, 'r+', encoding=charset) as f:
3256                f.seek(pos)
3257                f.write('zzz')
3258                f.seek(0)
3259                f.write('bbb')
3260            with self.open(filename, 'rb') as f:
3261                self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
3262
3263    def test_seek_append_bom(self):
3264        # Same test, but first seek to the start and then to the end
3265        filename = os_helper.TESTFN
3266        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3267            with self.open(filename, 'w', encoding=charset) as f:
3268                f.write('aaa')
3269            with self.open(filename, 'a', encoding=charset) as f:
3270                f.seek(0)
3271                f.seek(0, self.SEEK_END)
3272                f.write('xxx')
3273            with self.open(filename, 'rb') as f:
3274                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3275
3276    def test_errors_property(self):
3277        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
3278            self.assertEqual(f.errors, "strict")
3279        with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f:
3280            self.assertEqual(f.errors, "replace")
3281
3282    @support.no_tracing
3283    def test_threads_write(self):
3284        # Issue6750: concurrent writes could duplicate data
3285        event = threading.Event()
3286        with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f:
3287            def run(n):
3288                text = "Thread%03d\n" % n
3289                event.wait()
3290                f.write(text)
3291            threads = [threading.Thread(target=run, args=(x,))
3292                       for x in range(20)]
3293            with threading_helper.start_threads(threads, event.set):
3294                time.sleep(0.02)
3295        with self.open(os_helper.TESTFN, encoding="utf-8") as f:
3296            content = f.read()
3297            for n in range(20):
3298                self.assertEqual(content.count("Thread%03d\n" % n), 1)
3299
3300    def test_flush_error_on_close(self):
3301        # Test that text file is closed despite failed flush
3302        # and that flush() is called before file closed.
3303        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3304        closed = []
3305        def bad_flush():
3306            closed[:] = [txt.closed, txt.buffer.closed]
3307            raise OSError()
3308        txt.flush = bad_flush
3309        self.assertRaises(OSError, txt.close) # exception not swallowed
3310        self.assertTrue(txt.closed)
3311        self.assertTrue(txt.buffer.closed)
3312        self.assertTrue(closed)      # flush() called
3313        self.assertFalse(closed[0])  # flush() called before file closed
3314        self.assertFalse(closed[1])
3315        txt.flush = lambda: None  # break reference loop
3316
3317    def test_close_error_on_close(self):
3318        buffer = self.BytesIO(self.testdata)
3319        def bad_flush():
3320            raise OSError('flush')
3321        def bad_close():
3322            raise OSError('close')
3323        buffer.close = bad_close
3324        txt = self.TextIOWrapper(buffer, encoding="ascii")
3325        txt.flush = bad_flush
3326        with self.assertRaises(OSError) as err: # exception not swallowed
3327            txt.close()
3328        self.assertEqual(err.exception.args, ('close',))
3329        self.assertIsInstance(err.exception.__context__, OSError)
3330        self.assertEqual(err.exception.__context__.args, ('flush',))
3331        self.assertFalse(txt.closed)
3332
3333        # Silence destructor error
3334        buffer.close = lambda: None
3335        txt.flush = lambda: None
3336
3337    def test_nonnormalized_close_error_on_close(self):
3338        # Issue #21677
3339        buffer = self.BytesIO(self.testdata)
3340        def bad_flush():
3341            raise non_existing_flush
3342        def bad_close():
3343            raise non_existing_close
3344        buffer.close = bad_close
3345        txt = self.TextIOWrapper(buffer, encoding="ascii")
3346        txt.flush = bad_flush
3347        with self.assertRaises(NameError) as err: # exception not swallowed
3348            txt.close()
3349        self.assertIn('non_existing_close', str(err.exception))
3350        self.assertIsInstance(err.exception.__context__, NameError)
3351        self.assertIn('non_existing_flush', str(err.exception.__context__))
3352        self.assertFalse(txt.closed)
3353
3354        # Silence destructor error
3355        buffer.close = lambda: None
3356        txt.flush = lambda: None
3357
3358    def test_multi_close(self):
3359        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3360        txt.close()
3361        txt.close()
3362        txt.close()
3363        self.assertRaises(ValueError, txt.flush)
3364
3365    def test_unseekable(self):
3366        txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8")
3367        self.assertRaises(self.UnsupportedOperation, txt.tell)
3368        self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3369
3370    def test_readonly_attributes(self):
3371        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3372        buf = self.BytesIO(self.testdata)
3373        with self.assertRaises(AttributeError):
3374            txt.buffer = buf
3375
3376    def test_rawio(self):
3377        # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3378        # that subprocess.Popen() can have the required unbuffered
3379        # semantics with universal_newlines=True.
3380        raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3381        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3382        # Reads
3383        self.assertEqual(txt.read(4), 'abcd')
3384        self.assertEqual(txt.readline(), 'efghi\n')
3385        self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3386
3387    def test_rawio_write_through(self):
3388        # Issue #12591: with write_through=True, writes don't need a flush
3389        raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3390        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3391                                 write_through=True)
3392        txt.write('1')
3393        txt.write('23\n4')
3394        txt.write('5')
3395        self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3396
3397    def test_bufio_write_through(self):
3398        # Issue #21396: write_through=True doesn't force a flush()
3399        # on the underlying binary buffered object.
3400        flush_called, write_called = [], []
3401        class BufferedWriter(self.BufferedWriter):
3402            def flush(self, *args, **kwargs):
3403                flush_called.append(True)
3404                return super().flush(*args, **kwargs)
3405            def write(self, *args, **kwargs):
3406                write_called.append(True)
3407                return super().write(*args, **kwargs)
3408
3409        rawio = self.BytesIO()
3410        data = b"a"
3411        bufio = BufferedWriter(rawio, len(data)*2)
3412        textio = self.TextIOWrapper(bufio, encoding='ascii',
3413                                    write_through=True)
3414        # write to the buffered io but don't overflow the buffer
3415        text = data.decode('ascii')
3416        textio.write(text)
3417
3418        # buffer.flush is not called with write_through=True
3419        self.assertFalse(flush_called)
3420        # buffer.write *is* called with write_through=True
3421        self.assertTrue(write_called)
3422        self.assertEqual(rawio.getvalue(), b"") # no flush
3423
3424        write_called = [] # reset
3425        textio.write(text * 10) # total content is larger than bufio buffer
3426        self.assertTrue(write_called)
3427        self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3428
3429    def test_reconfigure_write_through(self):
3430        raw = self.MockRawIO([])
3431        t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3432        t.write('1')
3433        t.reconfigure(write_through=True)  # implied flush
3434        self.assertEqual(t.write_through, True)
3435        self.assertEqual(b''.join(raw._write_stack), b'1')
3436        t.write('23')
3437        self.assertEqual(b''.join(raw._write_stack), b'123')
3438        t.reconfigure(write_through=False)
3439        self.assertEqual(t.write_through, False)
3440        t.write('45')
3441        t.flush()
3442        self.assertEqual(b''.join(raw._write_stack), b'12345')
3443        # Keeping default value
3444        t.reconfigure()
3445        t.reconfigure(write_through=None)
3446        self.assertEqual(t.write_through, False)
3447        t.reconfigure(write_through=True)
3448        t.reconfigure()
3449        t.reconfigure(write_through=None)
3450        self.assertEqual(t.write_through, True)
3451
3452    def test_read_nonbytes(self):
3453        # Issue #17106
3454        # Crash when underlying read() returns non-bytes
3455        t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
3456        self.assertRaises(TypeError, t.read, 1)
3457        t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
3458        self.assertRaises(TypeError, t.readline)
3459        t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
3460        self.assertRaises(TypeError, t.read)
3461
3462    def test_illegal_encoder(self):
3463        # Issue 31271: Calling write() while the return value of encoder's
3464        # encode() is invalid shouldn't cause an assertion failure.
3465        rot13 = codecs.lookup("rot13")
3466        with support.swap_attr(rot13, '_is_text_encoding', True):
3467            t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3468        self.assertRaises(TypeError, t.write, 'bar')
3469
3470    def test_illegal_decoder(self):
3471        # Issue #17106
3472        # Bypass the early encoding check added in issue 20404
3473        def _make_illegal_wrapper():
3474            quopri = codecs.lookup("quopri")
3475            quopri._is_text_encoding = True
3476            try:
3477                t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3478                                       newline='\n', encoding="quopri")
3479            finally:
3480                quopri._is_text_encoding = False
3481            return t
3482        # Crash when decoder returns non-string
3483        t = _make_illegal_wrapper()
3484        self.assertRaises(TypeError, t.read, 1)
3485        t = _make_illegal_wrapper()
3486        self.assertRaises(TypeError, t.readline)
3487        t = _make_illegal_wrapper()
3488        self.assertRaises(TypeError, t.read)
3489
3490        # Issue 31243: calling read() while the return value of decoder's
3491        # getstate() is invalid should neither crash the interpreter nor
3492        # raise a SystemError.
3493        def _make_very_illegal_wrapper(getstate_ret_val):
3494            class BadDecoder:
3495                def getstate(self):
3496                    return getstate_ret_val
3497            def _get_bad_decoder(dummy):
3498                return BadDecoder()
3499            quopri = codecs.lookup("quopri")
3500            with support.swap_attr(quopri, 'incrementaldecoder',
3501                                   _get_bad_decoder):
3502                return _make_illegal_wrapper()
3503        t = _make_very_illegal_wrapper(42)
3504        self.assertRaises(TypeError, t.read, 42)
3505        t = _make_very_illegal_wrapper(())
3506        self.assertRaises(TypeError, t.read, 42)
3507        t = _make_very_illegal_wrapper((1, 2))
3508        self.assertRaises(TypeError, t.read, 42)
3509
3510    def _check_create_at_shutdown(self, **kwargs):
3511        # Issue #20037: creating a TextIOWrapper at shutdown
3512        # shouldn't crash the interpreter.
3513        iomod = self.io.__name__
3514        code = """if 1:
3515            import codecs
3516            import {iomod} as io
3517
3518            # Avoid looking up codecs at shutdown
3519            codecs.lookup('utf-8')
3520
3521            class C:
3522                def __init__(self):
3523                    self.buf = io.BytesIO()
3524                def __del__(self):
3525                    io.TextIOWrapper(self.buf, **{kwargs})
3526                    print("ok")
3527            c = C()
3528            """.format(iomod=iomod, kwargs=kwargs)
3529        return assert_python_ok("-c", code)
3530
3531    def test_create_at_shutdown_without_encoding(self):
3532        rc, out, err = self._check_create_at_shutdown()
3533        if err:
3534            # Can error out with a RuntimeError if the module state
3535            # isn't found.
3536            self.assertIn(self.shutdown_error, err.decode())
3537        else:
3538            self.assertEqual("ok", out.decode().strip())
3539
3540    def test_create_at_shutdown_with_encoding(self):
3541        rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3542                                                      errors='strict')
3543        self.assertFalse(err)
3544        self.assertEqual("ok", out.decode().strip())
3545
3546    def test_read_byteslike(self):
3547        r = MemviewBytesIO(b'Just some random string\n')
3548        t = self.TextIOWrapper(r, 'utf-8')
3549
3550        # TextIOwrapper will not read the full string, because
3551        # we truncate it to a multiple of the native int size
3552        # so that we can construct a more complex memoryview.
3553        bytes_val =  _to_memoryview(r.getvalue()).tobytes()
3554
3555        self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3556
3557    def test_issue22849(self):
3558        class F(object):
3559            def readable(self): return True
3560            def writable(self): return True
3561            def seekable(self): return True
3562
3563        for i in range(10):
3564            try:
3565                self.TextIOWrapper(F(), encoding='utf-8')
3566            except Exception:
3567                pass
3568
3569        F.tell = lambda x: 0
3570        t = self.TextIOWrapper(F(), encoding='utf-8')
3571
3572    def test_reconfigure_encoding_read(self):
3573        # latin1 -> utf8
3574        # (latin1 can decode utf-8 encoded string)
3575        data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3576        raw = self.BytesIO(data)
3577        txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3578        self.assertEqual(txt.readline(), 'abc\xe9\n')
3579        with self.assertRaises(self.UnsupportedOperation):
3580            txt.reconfigure(encoding='utf-8')
3581        with self.assertRaises(self.UnsupportedOperation):
3582            txt.reconfigure(newline=None)
3583
3584    def test_reconfigure_write_fromascii(self):
3585        # ascii has a specific encodefunc in the C implementation,
3586        # but utf-8-sig has not. Make sure that we get rid of the
3587        # cached encodefunc when we switch encoders.
3588        raw = self.BytesIO()
3589        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3590        txt.write('foo\n')
3591        txt.reconfigure(encoding='utf-8-sig')
3592        txt.write('\xe9\n')
3593        txt.flush()
3594        self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3595
3596    def test_reconfigure_write(self):
3597        # latin -> utf8
3598        raw = self.BytesIO()
3599        txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3600        txt.write('abc\xe9\n')
3601        txt.reconfigure(encoding='utf-8')
3602        self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3603        txt.write('d\xe9f\n')
3604        txt.flush()
3605        self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3606
3607        # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3608        # the file
3609        raw = self.BytesIO()
3610        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3611        txt.write('abc\n')
3612        txt.reconfigure(encoding='utf-8-sig')
3613        txt.write('d\xe9f\n')
3614        txt.flush()
3615        self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3616
3617    def test_reconfigure_write_non_seekable(self):
3618        raw = self.BytesIO()
3619        raw.seekable = lambda: False
3620        raw.seek = None
3621        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3622        txt.write('abc\n')
3623        txt.reconfigure(encoding='utf-8-sig')
3624        txt.write('d\xe9f\n')
3625        txt.flush()
3626
3627        # If the raw stream is not seekable, there'll be a BOM
3628        self.assertEqual(raw.getvalue(),  b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3629
3630    def test_reconfigure_defaults(self):
3631        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3632        txt.reconfigure(encoding=None)
3633        self.assertEqual(txt.encoding, 'ascii')
3634        self.assertEqual(txt.errors, 'replace')
3635        txt.write('LF\n')
3636
3637        txt.reconfigure(newline='\r\n')
3638        self.assertEqual(txt.encoding, 'ascii')
3639        self.assertEqual(txt.errors, 'replace')
3640
3641        txt.reconfigure(errors='ignore')
3642        self.assertEqual(txt.encoding, 'ascii')
3643        self.assertEqual(txt.errors, 'ignore')
3644        txt.write('CRLF\n')
3645
3646        txt.reconfigure(encoding='utf-8', newline=None)
3647        self.assertEqual(txt.errors, 'strict')
3648        txt.seek(0)
3649        self.assertEqual(txt.read(), 'LF\nCRLF\n')
3650
3651        self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3652
3653    def test_reconfigure_newline(self):
3654        raw = self.BytesIO(b'CR\rEOF')
3655        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3656        txt.reconfigure(newline=None)
3657        self.assertEqual(txt.readline(), 'CR\n')
3658        raw = self.BytesIO(b'CR\rEOF')
3659        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3660        txt.reconfigure(newline='')
3661        self.assertEqual(txt.readline(), 'CR\r')
3662        raw = self.BytesIO(b'CR\rLF\nEOF')
3663        txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3664        txt.reconfigure(newline='\n')
3665        self.assertEqual(txt.readline(), 'CR\rLF\n')
3666        raw = self.BytesIO(b'LF\nCR\rEOF')
3667        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3668        txt.reconfigure(newline='\r')
3669        self.assertEqual(txt.readline(), 'LF\nCR\r')
3670        raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3671        txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3672        txt.reconfigure(newline='\r\n')
3673        self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3674
3675        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3676        txt.reconfigure(newline=None)
3677        txt.write('linesep\n')
3678        txt.reconfigure(newline='')
3679        txt.write('LF\n')
3680        txt.reconfigure(newline='\n')
3681        txt.write('LF\n')
3682        txt.reconfigure(newline='\r')
3683        txt.write('CR\n')
3684        txt.reconfigure(newline='\r\n')
3685        txt.write('CRLF\n')
3686        expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3687        self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3688
3689    def test_issue25862(self):
3690        # Assertion failures occurred in tell() after read() and write().
3691        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3692        t.read(1)
3693        t.read()
3694        t.tell()
3695        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3696        t.read(1)
3697        t.write('x')
3698        t.tell()
3699
3700
3701class MemviewBytesIO(io.BytesIO):
3702    '''A BytesIO object whose read method returns memoryviews
3703       rather than bytes'''
3704
3705    def read1(self, len_):
3706        return _to_memoryview(super().read1(len_))
3707
3708    def read(self, len_):
3709        return _to_memoryview(super().read(len_))
3710
3711def _to_memoryview(buf):
3712    '''Convert bytes-object *buf* to a non-trivial memoryview'''
3713
3714    arr = array.array('i')
3715    idx = len(buf) - len(buf) % arr.itemsize
3716    arr.frombytes(buf[:idx])
3717    return memoryview(arr)
3718
3719
3720class CTextIOWrapperTest(TextIOWrapperTest):
3721    io = io
3722    shutdown_error = "LookupError: unknown encoding: ascii"
3723
3724    def test_initialization(self):
3725        r = self.BytesIO(b"\xc3\xa9\n\n")
3726        b = self.BufferedReader(r, 1000)
3727        t = self.TextIOWrapper(b, encoding="utf-8")
3728        self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
3729        self.assertRaises(ValueError, t.read)
3730
3731        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3732        self.assertRaises(Exception, repr, t)
3733
3734    def test_garbage_collection(self):
3735        # C TextIOWrapper objects are collected, and collecting them flushes
3736        # all data to disk.
3737        # The Python version has __del__, so it ends in gc.garbage instead.
3738        with warnings_helper.check_warnings(('', ResourceWarning)):
3739            rawio = io.FileIO(os_helper.TESTFN, "wb")
3740            b = self.BufferedWriter(rawio)
3741            t = self.TextIOWrapper(b, encoding="ascii")
3742            t.write("456def")
3743            t.x = t
3744            wr = weakref.ref(t)
3745            del t
3746            support.gc_collect()
3747        self.assertIsNone(wr(), wr)
3748        with self.open(os_helper.TESTFN, "rb") as f:
3749            self.assertEqual(f.read(), b"456def")
3750
3751    def test_rwpair_cleared_before_textio(self):
3752        # Issue 13070: TextIOWrapper's finalization would crash when called
3753        # after the reference to the underlying BufferedRWPair's writer got
3754        # cleared by the GC.
3755        for i in range(1000):
3756            b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3757            t1 = self.TextIOWrapper(b1, encoding="ascii")
3758            b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3759            t2 = self.TextIOWrapper(b2, encoding="ascii")
3760            # circular references
3761            t1.buddy = t2
3762            t2.buddy = t1
3763        support.gc_collect()
3764
3765    def test_del__CHUNK_SIZE_SystemError(self):
3766        t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3767        with self.assertRaises(AttributeError):
3768            del t._CHUNK_SIZE
3769
3770    def test_internal_buffer_size(self):
3771        # bpo-43260: TextIOWrapper's internal buffer should not store
3772        # data larger than chunk size.
3773        chunk_size = 8192  # default chunk size, updated later
3774
3775        class MockIO(self.MockRawIO):
3776            def write(self, data):
3777                if len(data) > chunk_size:
3778                    raise RuntimeError
3779                return super().write(data)
3780
3781        buf = MockIO()
3782        t = self.TextIOWrapper(buf, encoding="ascii")
3783        chunk_size = t._CHUNK_SIZE
3784        t.write("abc")
3785        t.write("def")
3786        # default chunk size is 8192 bytes so t don't write data to buf.
3787        self.assertEqual([], buf._write_stack)
3788
3789        with self.assertRaises(RuntimeError):
3790            t.write("x"*(chunk_size+1))
3791
3792        self.assertEqual([b"abcdef"], buf._write_stack)
3793        t.write("ghi")
3794        t.write("x"*chunk_size)
3795        self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack)
3796
3797
3798class PyTextIOWrapperTest(TextIOWrapperTest):
3799    io = pyio
3800    shutdown_error = "LookupError: unknown encoding: ascii"
3801
3802
3803class IncrementalNewlineDecoderTest(unittest.TestCase):
3804
3805    def check_newline_decoding_utf8(self, decoder):
3806        # UTF-8 specific tests for a newline decoder
3807        def _check_decode(b, s, **kwargs):
3808            # We exercise getstate() / setstate() as well as decode()
3809            state = decoder.getstate()
3810            self.assertEqual(decoder.decode(b, **kwargs), s)
3811            decoder.setstate(state)
3812            self.assertEqual(decoder.decode(b, **kwargs), s)
3813
3814        _check_decode(b'\xe8\xa2\x88', "\u8888")
3815
3816        _check_decode(b'\xe8', "")
3817        _check_decode(b'\xa2', "")
3818        _check_decode(b'\x88', "\u8888")
3819
3820        _check_decode(b'\xe8', "")
3821        _check_decode(b'\xa2', "")
3822        _check_decode(b'\x88', "\u8888")
3823
3824        _check_decode(b'\xe8', "")
3825        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3826
3827        decoder.reset()
3828        _check_decode(b'\n', "\n")
3829        _check_decode(b'\r', "")
3830        _check_decode(b'', "\n", final=True)
3831        _check_decode(b'\r', "\n", final=True)
3832
3833        _check_decode(b'\r', "")
3834        _check_decode(b'a', "\na")
3835
3836        _check_decode(b'\r\r\n', "\n\n")
3837        _check_decode(b'\r', "")
3838        _check_decode(b'\r', "\n")
3839        _check_decode(b'\na', "\na")
3840
3841        _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3842        _check_decode(b'\xe8\xa2\x88', "\u8888")
3843        _check_decode(b'\n', "\n")
3844        _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3845        _check_decode(b'\n', "\n")
3846
3847    def check_newline_decoding(self, decoder, encoding):
3848        result = []
3849        if encoding is not None:
3850            encoder = codecs.getincrementalencoder(encoding)()
3851            def _decode_bytewise(s):
3852                # Decode one byte at a time
3853                for b in encoder.encode(s):
3854                    result.append(decoder.decode(bytes([b])))
3855        else:
3856            encoder = None
3857            def _decode_bytewise(s):
3858                # Decode one char at a time
3859                for c in s:
3860                    result.append(decoder.decode(c))
3861        self.assertEqual(decoder.newlines, None)
3862        _decode_bytewise("abc\n\r")
3863        self.assertEqual(decoder.newlines, '\n')
3864        _decode_bytewise("\nabc")
3865        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
3866        _decode_bytewise("abc\r")
3867        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
3868        _decode_bytewise("abc")
3869        self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
3870        _decode_bytewise("abc\r")
3871        self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
3872        decoder.reset()
3873        input = "abc"
3874        if encoder is not None:
3875            encoder.reset()
3876            input = encoder.encode(input)
3877        self.assertEqual(decoder.decode(input), "abc")
3878        self.assertEqual(decoder.newlines, None)
3879
3880    def test_newline_decoder(self):
3881        encodings = (
3882            # None meaning the IncrementalNewlineDecoder takes unicode input
3883            # rather than bytes input
3884            None, 'utf-8', 'latin-1',
3885            'utf-16', 'utf-16-le', 'utf-16-be',
3886            'utf-32', 'utf-32-le', 'utf-32-be',
3887        )
3888        for enc in encodings:
3889            decoder = enc and codecs.getincrementaldecoder(enc)()
3890            decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3891            self.check_newline_decoding(decoder, enc)
3892        decoder = codecs.getincrementaldecoder("utf-8")()
3893        decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3894        self.check_newline_decoding_utf8(decoder)
3895        self.assertRaises(TypeError, decoder.setstate, 42)
3896
3897    def test_newline_bytes(self):
3898        # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3899        def _check(dec):
3900            self.assertEqual(dec.newlines, None)
3901            self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3902            self.assertEqual(dec.newlines, None)
3903            self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3904            self.assertEqual(dec.newlines, None)
3905        dec = self.IncrementalNewlineDecoder(None, translate=False)
3906        _check(dec)
3907        dec = self.IncrementalNewlineDecoder(None, translate=True)
3908        _check(dec)
3909
3910    def test_translate(self):
3911        # issue 35062
3912        for translate in (-2, -1, 1, 2):
3913            decoder = codecs.getincrementaldecoder("utf-8")()
3914            decoder = self.IncrementalNewlineDecoder(decoder, translate)
3915            self.check_newline_decoding_utf8(decoder)
3916        decoder = codecs.getincrementaldecoder("utf-8")()
3917        decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3918        self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3919
3920class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3921    pass
3922
3923class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3924    pass
3925
3926
3927# XXX Tests for open()
3928
3929class MiscIOTest(unittest.TestCase):
3930
3931    def tearDown(self):
3932        os_helper.unlink(os_helper.TESTFN)
3933
3934    def test___all__(self):
3935        for name in self.io.__all__:
3936            obj = getattr(self.io, name, None)
3937            self.assertIsNotNone(obj, name)
3938            if name in ("open", "open_code"):
3939                continue
3940            elif "error" in name.lower() or name == "UnsupportedOperation":
3941                self.assertTrue(issubclass(obj, Exception), name)
3942            elif not name.startswith("SEEK_"):
3943                self.assertTrue(issubclass(obj, self.IOBase))
3944
3945    def test_attributes(self):
3946        f = self.open(os_helper.TESTFN, "wb", buffering=0)
3947        self.assertEqual(f.mode, "wb")
3948        f.close()
3949
3950        with warnings_helper.check_warnings(('', DeprecationWarning)):
3951            f = self.open(os_helper.TESTFN, "U", encoding="utf-8")
3952        self.assertEqual(f.name,            os_helper.TESTFN)
3953        self.assertEqual(f.buffer.name,     os_helper.TESTFN)
3954        self.assertEqual(f.buffer.raw.name, os_helper.TESTFN)
3955        self.assertEqual(f.mode,            "U")
3956        self.assertEqual(f.buffer.mode,     "rb")
3957        self.assertEqual(f.buffer.raw.mode, "rb")
3958        f.close()
3959
3960        f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
3961        self.assertEqual(f.mode,            "w+")
3962        self.assertEqual(f.buffer.mode,     "rb+") # Does it really matter?
3963        self.assertEqual(f.buffer.raw.mode, "rb+")
3964
3965        g = self.open(f.fileno(), "wb", closefd=False)
3966        self.assertEqual(g.mode,     "wb")
3967        self.assertEqual(g.raw.mode, "wb")
3968        self.assertEqual(g.name,     f.fileno())
3969        self.assertEqual(g.raw.name, f.fileno())
3970        f.close()
3971        g.close()
3972
3973    def test_open_pipe_with_append(self):
3974        # bpo-27805: Ignore ESPIPE from lseek() in open().
3975        r, w = os.pipe()
3976        self.addCleanup(os.close, r)
3977        f = self.open(w, 'a', encoding="utf-8")
3978        self.addCleanup(f.close)
3979        # Check that the file is marked non-seekable. On Windows, however, lseek
3980        # somehow succeeds on pipes.
3981        if sys.platform != 'win32':
3982            self.assertFalse(f.seekable())
3983
3984    def test_io_after_close(self):
3985        for kwargs in [
3986                {"mode": "w"},
3987                {"mode": "wb"},
3988                {"mode": "w", "buffering": 1},
3989                {"mode": "w", "buffering": 2},
3990                {"mode": "wb", "buffering": 0},
3991                {"mode": "r"},
3992                {"mode": "rb"},
3993                {"mode": "r", "buffering": 1},
3994                {"mode": "r", "buffering": 2},
3995                {"mode": "rb", "buffering": 0},
3996                {"mode": "w+"},
3997                {"mode": "w+b"},
3998                {"mode": "w+", "buffering": 1},
3999                {"mode": "w+", "buffering": 2},
4000                {"mode": "w+b", "buffering": 0},
4001            ]:
4002            if "b" not in kwargs["mode"]:
4003                kwargs["encoding"] = "utf-8"
4004            f = self.open(os_helper.TESTFN, **kwargs)
4005            f.close()
4006            self.assertRaises(ValueError, f.flush)
4007            self.assertRaises(ValueError, f.fileno)
4008            self.assertRaises(ValueError, f.isatty)
4009            self.assertRaises(ValueError, f.__iter__)
4010            if hasattr(f, "peek"):
4011                self.assertRaises(ValueError, f.peek, 1)
4012            self.assertRaises(ValueError, f.read)
4013            if hasattr(f, "read1"):
4014                self.assertRaises(ValueError, f.read1, 1024)
4015                self.assertRaises(ValueError, f.read1)
4016            if hasattr(f, "readall"):
4017                self.assertRaises(ValueError, f.readall)
4018            if hasattr(f, "readinto"):
4019                self.assertRaises(ValueError, f.readinto, bytearray(1024))
4020            if hasattr(f, "readinto1"):
4021                self.assertRaises(ValueError, f.readinto1, bytearray(1024))
4022            self.assertRaises(ValueError, f.readline)
4023            self.assertRaises(ValueError, f.readlines)
4024            self.assertRaises(ValueError, f.readlines, 1)
4025            self.assertRaises(ValueError, f.seek, 0)
4026            self.assertRaises(ValueError, f.tell)
4027            self.assertRaises(ValueError, f.truncate)
4028            self.assertRaises(ValueError, f.write,
4029                              b"" if "b" in kwargs['mode'] else "")
4030            self.assertRaises(ValueError, f.writelines, [])
4031            self.assertRaises(ValueError, next, f)
4032
4033    def test_blockingioerror(self):
4034        # Various BlockingIOError issues
4035        class C(str):
4036            pass
4037        c = C("")
4038        b = self.BlockingIOError(1, c)
4039        c.b = b
4040        b.c = c
4041        wr = weakref.ref(c)
4042        del c, b
4043        support.gc_collect()
4044        self.assertIsNone(wr(), wr)
4045
4046    def test_abcs(self):
4047        # Test the visible base classes are ABCs.
4048        self.assertIsInstance(self.IOBase, abc.ABCMeta)
4049        self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
4050        self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
4051        self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
4052
4053    def _check_abc_inheritance(self, abcmodule):
4054        with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
4055            self.assertIsInstance(f, abcmodule.IOBase)
4056            self.assertIsInstance(f, abcmodule.RawIOBase)
4057            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4058            self.assertNotIsInstance(f, abcmodule.TextIOBase)
4059        with self.open(os_helper.TESTFN, "wb") as f:
4060            self.assertIsInstance(f, abcmodule.IOBase)
4061            self.assertNotIsInstance(f, abcmodule.RawIOBase)
4062            self.assertIsInstance(f, abcmodule.BufferedIOBase)
4063            self.assertNotIsInstance(f, abcmodule.TextIOBase)
4064        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
4065            self.assertIsInstance(f, abcmodule.IOBase)
4066            self.assertNotIsInstance(f, abcmodule.RawIOBase)
4067            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4068            self.assertIsInstance(f, abcmodule.TextIOBase)
4069
4070    def test_abc_inheritance(self):
4071        # Test implementations inherit from their respective ABCs
4072        self._check_abc_inheritance(self)
4073
4074    def test_abc_inheritance_official(self):
4075        # Test implementations inherit from the official ABCs of the
4076        # baseline "io" module.
4077        self._check_abc_inheritance(io)
4078
4079    def _check_warn_on_dealloc(self, *args, **kwargs):
4080        f = open(*args, **kwargs)
4081        r = repr(f)
4082        with self.assertWarns(ResourceWarning) as cm:
4083            f = None
4084            support.gc_collect()
4085        self.assertIn(r, str(cm.warning.args[0]))
4086
4087    def test_warn_on_dealloc(self):
4088        self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0)
4089        self._check_warn_on_dealloc(os_helper.TESTFN, "wb")
4090        self._check_warn_on_dealloc(os_helper.TESTFN, "w", encoding="utf-8")
4091
4092    def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4093        fds = []
4094        def cleanup_fds():
4095            for fd in fds:
4096                try:
4097                    os.close(fd)
4098                except OSError as e:
4099                    if e.errno != errno.EBADF:
4100                        raise
4101        self.addCleanup(cleanup_fds)
4102        r, w = os.pipe()
4103        fds += r, w
4104        self._check_warn_on_dealloc(r, *args, **kwargs)
4105        # When using closefd=False, there's no warning
4106        r, w = os.pipe()
4107        fds += r, w
4108        with warnings_helper.check_no_resource_warning(self):
4109            open(r, *args, closefd=False, **kwargs)
4110
4111    def test_warn_on_dealloc_fd(self):
4112        self._check_warn_on_dealloc_fd("rb", buffering=0)
4113        self._check_warn_on_dealloc_fd("rb")
4114        self._check_warn_on_dealloc_fd("r", encoding="utf-8")
4115
4116
4117    def test_pickling(self):
4118        # Pickling file objects is forbidden
4119        for kwargs in [
4120                {"mode": "w"},
4121                {"mode": "wb"},
4122                {"mode": "wb", "buffering": 0},
4123                {"mode": "r"},
4124                {"mode": "rb"},
4125                {"mode": "rb", "buffering": 0},
4126                {"mode": "w+"},
4127                {"mode": "w+b"},
4128                {"mode": "w+b", "buffering": 0},
4129            ]:
4130            if "b" not in kwargs["mode"]:
4131                kwargs["encoding"] = "utf-8"
4132            for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
4133                with self.open(os_helper.TESTFN, **kwargs) as f:
4134                    self.assertRaises(TypeError, pickle.dumps, f, protocol)
4135
4136    def test_nonblock_pipe_write_bigbuf(self):
4137        self._test_nonblock_pipe_write(16*1024)
4138
4139    def test_nonblock_pipe_write_smallbuf(self):
4140        self._test_nonblock_pipe_write(1024)
4141
4142    @unittest.skipUnless(hasattr(os, 'set_blocking'),
4143                         'os.set_blocking() required for this test')
4144    def _test_nonblock_pipe_write(self, bufsize):
4145        sent = []
4146        received = []
4147        r, w = os.pipe()
4148        os.set_blocking(r, False)
4149        os.set_blocking(w, False)
4150
4151        # To exercise all code paths in the C implementation we need
4152        # to play with buffer sizes.  For instance, if we choose a
4153        # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4154        # then we will never get a partial write of the buffer.
4155        rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4156        wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4157
4158        with rf, wf:
4159            for N in 9999, 73, 7574:
4160                try:
4161                    i = 0
4162                    while True:
4163                        msg = bytes([i % 26 + 97]) * N
4164                        sent.append(msg)
4165                        wf.write(msg)
4166                        i += 1
4167
4168                except self.BlockingIOError as e:
4169                    self.assertEqual(e.args[0], errno.EAGAIN)
4170                    self.assertEqual(e.args[2], e.characters_written)
4171                    sent[-1] = sent[-1][:e.characters_written]
4172                    received.append(rf.read())
4173                    msg = b'BLOCKED'
4174                    wf.write(msg)
4175                    sent.append(msg)
4176
4177            while True:
4178                try:
4179                    wf.flush()
4180                    break
4181                except self.BlockingIOError as e:
4182                    self.assertEqual(e.args[0], errno.EAGAIN)
4183                    self.assertEqual(e.args[2], e.characters_written)
4184                    self.assertEqual(e.characters_written, 0)
4185                    received.append(rf.read())
4186
4187            received += iter(rf.read, None)
4188
4189        sent, received = b''.join(sent), b''.join(received)
4190        self.assertEqual(sent, received)
4191        self.assertTrue(wf.closed)
4192        self.assertTrue(rf.closed)
4193
4194    def test_create_fail(self):
4195        # 'x' mode fails if file is existing
4196        with self.open(os_helper.TESTFN, 'w', encoding="utf-8"):
4197            pass
4198        self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x', encoding="utf-8")
4199
4200    def test_create_writes(self):
4201        # 'x' mode opens for writing
4202        with self.open(os_helper.TESTFN, 'xb') as f:
4203            f.write(b"spam")
4204        with self.open(os_helper.TESTFN, 'rb') as f:
4205            self.assertEqual(b"spam", f.read())
4206
4207    def test_open_allargs(self):
4208        # there used to be a buffer overflow in the parser for rawmode
4209        self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+', encoding="utf-8")
4210
4211    def test_check_encoding_errors(self):
4212        # bpo-37388: open() and TextIOWrapper must check encoding and errors
4213        # arguments in dev mode
4214        mod = self.io.__name__
4215        filename = __file__
4216        invalid = 'Boom, Shaka Laka, Boom!'
4217        code = textwrap.dedent(f'''
4218            import sys
4219            from {mod} import open, TextIOWrapper
4220
4221            try:
4222                open({filename!r}, encoding={invalid!r})
4223            except LookupError:
4224                pass
4225            else:
4226                sys.exit(21)
4227
4228            try:
4229                open({filename!r}, errors={invalid!r})
4230            except LookupError:
4231                pass
4232            else:
4233                sys.exit(22)
4234
4235            fp = open({filename!r}, "rb")
4236            with fp:
4237                try:
4238                    TextIOWrapper(fp, encoding={invalid!r})
4239                except LookupError:
4240                    pass
4241                else:
4242                    sys.exit(23)
4243
4244                try:
4245                    TextIOWrapper(fp, errors={invalid!r})
4246                except LookupError:
4247                    pass
4248                else:
4249                    sys.exit(24)
4250
4251            sys.exit(10)
4252        ''')
4253        proc = assert_python_failure('-X', 'dev', '-c', code)
4254        self.assertEqual(proc.rc, 10, proc)
4255
4256    def test_check_encoding_warning(self):
4257        # PEP 597: Raise warning when encoding is not specified
4258        # and sys.flags.warn_default_encoding is set.
4259        mod = self.io.__name__
4260        filename = __file__
4261        code = textwrap.dedent(f'''\
4262            import sys
4263            from {mod} import open, TextIOWrapper
4264            import pathlib
4265
4266            with open({filename!r}) as f:           # line 5
4267                pass
4268
4269            pathlib.Path({filename!r}).read_text()  # line 8
4270        ''')
4271        proc = assert_python_ok('-X', 'warn_default_encoding', '-c', code)
4272        warnings = proc.err.splitlines()
4273        self.assertEqual(len(warnings), 2)
4274        self.assertTrue(
4275            warnings[0].startswith(b"<string>:5: EncodingWarning: "))
4276        self.assertTrue(
4277            warnings[1].startswith(b"<string>:8: EncodingWarning: "))
4278
4279    @support.cpython_only
4280    # Depending if OpenWrapper was already created or not, the warning is
4281    # emitted or not. For example, the attribute is already created when this
4282    # test is run multiple times.
4283    @warnings_helper.ignore_warnings(category=DeprecationWarning)
4284    def test_openwrapper(self):
4285        self.assertIs(self.io.OpenWrapper, self.io.open)
4286
4287
4288class CMiscIOTest(MiscIOTest):
4289    io = io
4290
4291    def test_readinto_buffer_overflow(self):
4292        # Issue #18025
4293        class BadReader(self.io.BufferedIOBase):
4294            def read(self, n=-1):
4295                return b'x' * 10**6
4296        bufio = BadReader()
4297        b = bytearray(2)
4298        self.assertRaises(ValueError, bufio.readinto, b)
4299
4300    def check_daemon_threads_shutdown_deadlock(self, stream_name):
4301        # Issue #23309: deadlocks at shutdown should be avoided when a
4302        # daemon thread and the main thread both write to a file.
4303        code = """if 1:
4304            import sys
4305            import time
4306            import threading
4307            from test.support import SuppressCrashReport
4308
4309            file = sys.{stream_name}
4310
4311            def run():
4312                while True:
4313                    file.write('.')
4314                    file.flush()
4315
4316            crash = SuppressCrashReport()
4317            crash.__enter__()
4318            # don't call __exit__(): the crash occurs at Python shutdown
4319
4320            thread = threading.Thread(target=run)
4321            thread.daemon = True
4322            thread.start()
4323
4324            time.sleep(0.5)
4325            file.write('!')
4326            file.flush()
4327            """.format_map(locals())
4328        res, _ = run_python_until_end("-c", code)
4329        err = res.err.decode()
4330        if res.rc != 0:
4331            # Failure: should be a fatal error
4332            pattern = (r"Fatal Python error: _enter_buffered_busy: "
4333                       r"could not acquire lock "
4334                       r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4335                       r"at interpreter shutdown, possibly due to "
4336                       r"daemon threads".format_map(locals()))
4337            self.assertRegex(err, pattern)
4338        else:
4339            self.assertFalse(err.strip('.!'))
4340
4341    def test_daemon_threads_shutdown_stdout_deadlock(self):
4342        self.check_daemon_threads_shutdown_deadlock('stdout')
4343
4344    def test_daemon_threads_shutdown_stderr_deadlock(self):
4345        self.check_daemon_threads_shutdown_deadlock('stderr')
4346
4347
4348class PyMiscIOTest(MiscIOTest):
4349    io = pyio
4350
4351
4352@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4353class SignalsTest(unittest.TestCase):
4354
4355    def setUp(self):
4356        self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4357
4358    def tearDown(self):
4359        signal.signal(signal.SIGALRM, self.oldalrm)
4360
4361    def alarm_interrupt(self, sig, frame):
4362        1/0
4363
4364    def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4365        """Check that a partial write, when it gets interrupted, properly
4366        invokes the signal handler, and bubbles up the exception raised
4367        in the latter."""
4368
4369        # XXX This test has three flaws that appear when objects are
4370        # XXX not reference counted.
4371
4372        # - if wio.write() happens to trigger a garbage collection,
4373        #   the signal exception may be raised when some __del__
4374        #   method is running; it will not reach the assertRaises()
4375        #   call.
4376
4377        # - more subtle, if the wio object is not destroyed at once
4378        #   and survives this function, the next opened file is likely
4379        #   to have the same fileno (since the file descriptor was
4380        #   actively closed).  When wio.__del__ is finally called, it
4381        #   will close the other's test file...  To trigger this with
4382        #   CPython, try adding "global wio" in this function.
4383
4384        # - This happens only for streams created by the _pyio module,
4385        #   because a wio.close() that fails still consider that the
4386        #   file needs to be closed again.  You can try adding an
4387        #   "assert wio.closed" at the end of the function.
4388
4389        # Fortunately, a little gc.collect() seems to be enough to
4390        # work around all these issues.
4391        support.gc_collect()  # For PyPy or other GCs.
4392
4393        read_results = []
4394        def _read():
4395            s = os.read(r, 1)
4396            read_results.append(s)
4397
4398        t = threading.Thread(target=_read)
4399        t.daemon = True
4400        r, w = os.pipe()
4401        fdopen_kwargs["closefd"] = False
4402        large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
4403        try:
4404            wio = self.io.open(w, **fdopen_kwargs)
4405            if hasattr(signal, 'pthread_sigmask'):
4406                # create the thread with SIGALRM signal blocked
4407                signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4408                t.start()
4409                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4410            else:
4411                t.start()
4412
4413            # Fill the pipe enough that the write will be blocking.
4414            # It will be interrupted by the timer armed above.  Since the
4415            # other thread has read one byte, the low-level write will
4416            # return with a successful (partial) result rather than an EINTR.
4417            # The buffered IO layer must check for pending signal
4418            # handlers, which in this case will invoke alarm_interrupt().
4419            signal.alarm(1)
4420            try:
4421                self.assertRaises(ZeroDivisionError, wio.write, large_data)
4422            finally:
4423                signal.alarm(0)
4424                t.join()
4425            # We got one byte, get another one and check that it isn't a
4426            # repeat of the first one.
4427            read_results.append(os.read(r, 1))
4428            self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4429        finally:
4430            os.close(w)
4431            os.close(r)
4432            # This is deliberate. If we didn't close the file descriptor
4433            # before closing wio, wio would try to flush its internal
4434            # buffer, and block again.
4435            try:
4436                wio.close()
4437            except OSError as e:
4438                if e.errno != errno.EBADF:
4439                    raise
4440
4441    def test_interrupted_write_unbuffered(self):
4442        self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4443
4444    def test_interrupted_write_buffered(self):
4445        self.check_interrupted_write(b"xy", b"xy", mode="wb")
4446
4447    def test_interrupted_write_text(self):
4448        self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4449
4450    @support.no_tracing
4451    def check_reentrant_write(self, data, **fdopen_kwargs):
4452        def on_alarm(*args):
4453            # Will be called reentrantly from the same thread
4454            wio.write(data)
4455            1/0
4456        signal.signal(signal.SIGALRM, on_alarm)
4457        r, w = os.pipe()
4458        wio = self.io.open(w, **fdopen_kwargs)
4459        try:
4460            signal.alarm(1)
4461            # Either the reentrant call to wio.write() fails with RuntimeError,
4462            # or the signal handler raises ZeroDivisionError.
4463            with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4464                while 1:
4465                    for i in range(100):
4466                        wio.write(data)
4467                        wio.flush()
4468                    # Make sure the buffer doesn't fill up and block further writes
4469                    os.read(r, len(data) * 100)
4470            exc = cm.exception
4471            if isinstance(exc, RuntimeError):
4472                self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4473        finally:
4474            signal.alarm(0)
4475            wio.close()
4476            os.close(r)
4477
4478    def test_reentrant_write_buffered(self):
4479        self.check_reentrant_write(b"xy", mode="wb")
4480
4481    def test_reentrant_write_text(self):
4482        self.check_reentrant_write("xy", mode="w", encoding="ascii")
4483
4484    def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4485        """Check that a buffered read, when it gets interrupted (either
4486        returning a partial result or EINTR), properly invokes the signal
4487        handler and retries if the latter returned successfully."""
4488        r, w = os.pipe()
4489        fdopen_kwargs["closefd"] = False
4490        def alarm_handler(sig, frame):
4491            os.write(w, b"bar")
4492        signal.signal(signal.SIGALRM, alarm_handler)
4493        try:
4494            rio = self.io.open(r, **fdopen_kwargs)
4495            os.write(w, b"foo")
4496            signal.alarm(1)
4497            # Expected behaviour:
4498            # - first raw read() returns partial b"foo"
4499            # - second raw read() returns EINTR
4500            # - third raw read() returns b"bar"
4501            self.assertEqual(decode(rio.read(6)), "foobar")
4502        finally:
4503            signal.alarm(0)
4504            rio.close()
4505            os.close(w)
4506            os.close(r)
4507
4508    def test_interrupted_read_retry_buffered(self):
4509        self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4510                                          mode="rb")
4511
4512    def test_interrupted_read_retry_text(self):
4513        self.check_interrupted_read_retry(lambda x: x,
4514                                          mode="r", encoding="latin1")
4515
4516    def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4517        """Check that a buffered write, when it gets interrupted (either
4518        returning a partial result or EINTR), properly invokes the signal
4519        handler and retries if the latter returned successfully."""
4520        select = import_helper.import_module("select")
4521
4522        # A quantity that exceeds the buffer size of an anonymous pipe's
4523        # write end.
4524        N = support.PIPE_MAX_SIZE
4525        r, w = os.pipe()
4526        fdopen_kwargs["closefd"] = False
4527
4528        # We need a separate thread to read from the pipe and allow the
4529        # write() to finish.  This thread is started after the SIGALRM is
4530        # received (forcing a first EINTR in write()).
4531        read_results = []
4532        write_finished = False
4533        error = None
4534        def _read():
4535            try:
4536                while not write_finished:
4537                    while r in select.select([r], [], [], 1.0)[0]:
4538                        s = os.read(r, 1024)
4539                        read_results.append(s)
4540            except BaseException as exc:
4541                nonlocal error
4542                error = exc
4543        t = threading.Thread(target=_read)
4544        t.daemon = True
4545        def alarm1(sig, frame):
4546            signal.signal(signal.SIGALRM, alarm2)
4547            signal.alarm(1)
4548        def alarm2(sig, frame):
4549            t.start()
4550
4551        large_data = item * N
4552        signal.signal(signal.SIGALRM, alarm1)
4553        try:
4554            wio = self.io.open(w, **fdopen_kwargs)
4555            signal.alarm(1)
4556            # Expected behaviour:
4557            # - first raw write() is partial (because of the limited pipe buffer
4558            #   and the first alarm)
4559            # - second raw write() returns EINTR (because of the second alarm)
4560            # - subsequent write()s are successful (either partial or complete)
4561            written = wio.write(large_data)
4562            self.assertEqual(N, written)
4563
4564            wio.flush()
4565            write_finished = True
4566            t.join()
4567
4568            self.assertIsNone(error)
4569            self.assertEqual(N, sum(len(x) for x in read_results))
4570        finally:
4571            signal.alarm(0)
4572            write_finished = True
4573            os.close(w)
4574            os.close(r)
4575            # This is deliberate. If we didn't close the file descriptor
4576            # before closing wio, wio would try to flush its internal
4577            # buffer, and could block (in case of failure).
4578            try:
4579                wio.close()
4580            except OSError as e:
4581                if e.errno != errno.EBADF:
4582                    raise
4583
4584    def test_interrupted_write_retry_buffered(self):
4585        self.check_interrupted_write_retry(b"x", mode="wb")
4586
4587    def test_interrupted_write_retry_text(self):
4588        self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4589
4590
4591class CSignalsTest(SignalsTest):
4592    io = io
4593
4594class PySignalsTest(SignalsTest):
4595    io = pyio
4596
4597    # Handling reentrancy issues would slow down _pyio even more, so the
4598    # tests are disabled.
4599    test_reentrant_write_buffered = None
4600    test_reentrant_write_text = None
4601
4602
4603def load_tests(*args):
4604    tests = (CIOTest, PyIOTest, APIMismatchTest,
4605             CBufferedReaderTest, PyBufferedReaderTest,
4606             CBufferedWriterTest, PyBufferedWriterTest,
4607             CBufferedRWPairTest, PyBufferedRWPairTest,
4608             CBufferedRandomTest, PyBufferedRandomTest,
4609             StatefulIncrementalDecoderTest,
4610             CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4611             CTextIOWrapperTest, PyTextIOWrapperTest,
4612             CMiscIOTest, PyMiscIOTest,
4613             CSignalsTest, PySignalsTest,
4614             )
4615
4616    # Put the namespaces of the IO module we are testing and some useful mock
4617    # classes in the __dict__ of each test.
4618    mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
4619             MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4620             SlowFlushRawIO)
4621    all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4622    c_io_ns = {name : getattr(io, name) for name in all_members}
4623    py_io_ns = {name : getattr(pyio, name) for name in all_members}
4624    globs = globals()
4625    c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4626    py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4627    for test in tests:
4628        if test.__name__.startswith("C"):
4629            for name, obj in c_io_ns.items():
4630                setattr(test, name, obj)
4631        elif test.__name__.startswith("Py"):
4632            for name, obj in py_io_ns.items():
4633                setattr(test, name, obj)
4634
4635    suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4636    return suite
4637
4638if __name__ == "__main__":
4639    unittest.main()
4640