• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 """Unit tests for the io module."""
2 
3 # Tests of io are scattered over the test suite:
4 # * test_bufio - tests file buffering
5 # * test_memoryio - tests BytesIO and StringIO
6 # * test_fileio - tests FileIO
7 # * test_file - tests the file interface
8 # * test_io - tests everything else in the io module
9 # * test_univnewlines - tests universal newline support
10 # * test_largefile - tests operations on a file greater than 2**32 bytes
11 #     (only enabled with -ulargefile)
12 
13 ################################################################################
14 # ATTENTION TEST WRITERS!!!
15 ################################################################################
16 # When writing tests for io, it's important to test both the C and Python
17 # implementations. This is usually done by writing a base test that refers to
18 # the type it is testing as an attribute. Then it provides custom subclasses to
19 # test both implementations. This file has lots of examples.
20 ################################################################################
21 
22 import abc
23 import array
24 import errno
25 import locale
26 import os
27 import pickle
28 import random
29 import signal
30 import sys
31 import sysconfig
32 import textwrap
33 import threading
34 import time
35 import unittest
36 import warnings
37 import weakref
38 from collections import deque, UserList
39 from itertools import cycle, count
40 from test import support
41 from test.support.script_helper import (
42     assert_python_ok, assert_python_failure, run_python_until_end)
43 from test.support import import_helper
44 from test.support import os_helper
45 from test.support import threading_helper
46 from test.support import warnings_helper
47 from test.support.os_helper import FakePath
48 
49 import codecs
50 import io  # C implementation of io
51 import _pyio as pyio # Python implementation of io
52 
53 try:
54     import ctypes
55 except ImportError:
56     def byteslike(*pos, **kw):
57         return array.array("b", bytes(*pos, **kw))
58 else:
59     def byteslike(*pos, **kw):
60         """Create a bytes-like object having no string or sequence methods"""
61         data = bytes(*pos, **kw)
62         obj = EmptyStruct()
63         ctypes.resize(obj, len(data))
64         memoryview(obj).cast("B")[:] = data
65         return obj
66     class EmptyStruct(ctypes.Structure):
67         pass
68 
69 _cflags = sysconfig.get_config_var('CFLAGS') or ''
70 _config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
71 MEMORY_SANITIZER = (
72     '-fsanitize=memory' in _cflags or
73     '--with-memory-sanitizer' in _config_args
74 )
75 
76 ADDRESS_SANITIZER = (
77     '-fsanitize=address' in _cflags
78 )
79 
80 # Does io.IOBase finalizer log the exception if the close() method fails?
81 # The exception is ignored silently by default in release build.
82 IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
83 
84 
85 def _default_chunk_size():
86     """Get the default TextIOWrapper chunk size"""
87     with open(__file__, "r", encoding="latin-1") as f:
88         return f._CHUNK_SIZE
89 
90 
91 class MockRawIOWithoutRead:
92     """A RawIO implementation without read(), so as to exercise the default
93     RawIO.read() which calls readinto()."""
94 
95     def __init__(self, read_stack=()):
96         self._read_stack = list(read_stack)
97         self._write_stack = []
98         self._reads = 0
99         self._extraneous_reads = 0
100 
101     def write(self, b):
102         self._write_stack.append(bytes(b))
103         return len(b)
104 
105     def writable(self):
106         return True
107 
108     def fileno(self):
109         return 42
110 
111     def readable(self):
112         return True
113 
114     def seekable(self):
115         return True
116 
117     def seek(self, pos, whence):
118         return 0   # wrong but we gotta return something
119 
120     def tell(self):
121         return 0   # same comment as above
122 
123     def readinto(self, buf):
124         self._reads += 1
125         max_len = len(buf)
126         try:
127             data = self._read_stack[0]
128         except IndexError:
129             self._extraneous_reads += 1
130             return 0
131         if data is None:
132             del self._read_stack[0]
133             return None
134         n = len(data)
135         if len(data) <= max_len:
136             del self._read_stack[0]
137             buf[:n] = data
138             return n
139         else:
140             buf[:] = data[:max_len]
141             self._read_stack[0] = data[max_len:]
142             return max_len
143 
144     def truncate(self, pos=None):
145         return pos
146 
147 class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
148     pass
149 
150 class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
151     pass
152 
153 
154 class MockRawIO(MockRawIOWithoutRead):
155 
156     def read(self, n=None):
157         self._reads += 1
158         try:
159             return self._read_stack.pop(0)
160         except:
161             self._extraneous_reads += 1
162             return b""
163 
164 class CMockRawIO(MockRawIO, io.RawIOBase):
165     pass
166 
167 class PyMockRawIO(MockRawIO, pyio.RawIOBase):
168     pass
169 
170 
171 class MisbehavedRawIO(MockRawIO):
172     def write(self, b):
173         return super().write(b) * 2
174 
175     def read(self, n=None):
176         return super().read(n) * 2
177 
178     def seek(self, pos, whence):
179         return -123
180 
181     def tell(self):
182         return -456
183 
184     def readinto(self, buf):
185         super().readinto(buf)
186         return len(buf) * 5
187 
188 class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
189     pass
190 
191 class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
192     pass
193 
194 
195 class SlowFlushRawIO(MockRawIO):
196     def __init__(self):
197         super().__init__()
198         self.in_flush = threading.Event()
199 
200     def flush(self):
201         self.in_flush.set()
202         time.sleep(0.25)
203 
204 class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
205     pass
206 
207 class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
208     pass
209 
210 
211 class CloseFailureIO(MockRawIO):
212     closed = 0
213 
214     def close(self):
215         if not self.closed:
216             self.closed = 1
217             raise OSError
218 
219 class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
220     pass
221 
222 class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
223     pass
224 
225 
226 class MockFileIO:
227 
228     def __init__(self, data):
229         self.read_history = []
230         super().__init__(data)
231 
232     def read(self, n=None):
233         res = super().read(n)
234         self.read_history.append(None if res is None else len(res))
235         return res
236 
237     def readinto(self, b):
238         res = super().readinto(b)
239         self.read_history.append(res)
240         return res
241 
242 class CMockFileIO(MockFileIO, io.BytesIO):
243     pass
244 
245 class PyMockFileIO(MockFileIO, pyio.BytesIO):
246     pass
247 
248 
249 class MockUnseekableIO:
250     def seekable(self):
251         return False
252 
253     def seek(self, *args):
254         raise self.UnsupportedOperation("not seekable")
255 
256     def tell(self, *args):
257         raise self.UnsupportedOperation("not seekable")
258 
259     def truncate(self, *args):
260         raise self.UnsupportedOperation("not seekable")
261 
262 class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
263     UnsupportedOperation = io.UnsupportedOperation
264 
265 class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
266     UnsupportedOperation = pyio.UnsupportedOperation
267 
268 
269 class MockNonBlockWriterIO:
270 
271     def __init__(self):
272         self._write_stack = []
273         self._blocker_char = None
274 
275     def pop_written(self):
276         s = b"".join(self._write_stack)
277         self._write_stack[:] = []
278         return s
279 
280     def block_on(self, char):
281         """Block when a given char is encountered."""
282         self._blocker_char = char
283 
284     def readable(self):
285         return True
286 
287     def seekable(self):
288         return True
289 
290     def seek(self, pos, whence=0):
291         # naive implementation, enough for tests
292         return 0
293 
294     def writable(self):
295         return True
296 
297     def write(self, b):
298         b = bytes(b)
299         n = -1
300         if self._blocker_char:
301             try:
302                 n = b.index(self._blocker_char)
303             except ValueError:
304                 pass
305             else:
306                 if n > 0:
307                     # write data up to the first blocker
308                     self._write_stack.append(b[:n])
309                     return n
310                 else:
311                     # cancel blocker and indicate would block
312                     self._blocker_char = None
313                     return None
314         self._write_stack.append(b)
315         return len(b)
316 
317 class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
318     BlockingIOError = io.BlockingIOError
319 
320 class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
321     BlockingIOError = pyio.BlockingIOError
322 
323 
324 class IOTest(unittest.TestCase):
325 
326     def setUp(self):
327         os_helper.unlink(os_helper.TESTFN)
328 
329     def tearDown(self):
330         os_helper.unlink(os_helper.TESTFN)
331 
332     def write_ops(self, f):
333         self.assertEqual(f.write(b"blah."), 5)
334         f.truncate(0)
335         self.assertEqual(f.tell(), 5)
336         f.seek(0)
337 
338         self.assertEqual(f.write(b"blah."), 5)
339         self.assertEqual(f.seek(0), 0)
340         self.assertEqual(f.write(b"Hello."), 6)
341         self.assertEqual(f.tell(), 6)
342         self.assertEqual(f.seek(-1, 1), 5)
343         self.assertEqual(f.tell(), 5)
344         buffer = bytearray(b" world\n\n\n")
345         self.assertEqual(f.write(buffer), 9)
346         buffer[:] = b"*" * 9  # Overwrite our copy of the data
347         self.assertEqual(f.seek(0), 0)
348         self.assertEqual(f.write(b"h"), 1)
349         self.assertEqual(f.seek(-1, 2), 13)
350         self.assertEqual(f.tell(), 13)
351 
352         self.assertEqual(f.truncate(12), 12)
353         self.assertEqual(f.tell(), 13)
354         self.assertRaises(TypeError, f.seek, 0.0)
355 
356     def read_ops(self, f, buffered=False):
357         data = f.read(5)
358         self.assertEqual(data, b"hello")
359         data = byteslike(data)
360         self.assertEqual(f.readinto(data), 5)
361         self.assertEqual(bytes(data), b" worl")
362         data = bytearray(5)
363         self.assertEqual(f.readinto(data), 2)
364         self.assertEqual(len(data), 5)
365         self.assertEqual(data[:2], b"d\n")
366         self.assertEqual(f.seek(0), 0)
367         self.assertEqual(f.read(20), b"hello world\n")
368         self.assertEqual(f.read(1), b"")
369         self.assertEqual(f.readinto(byteslike(b"x")), 0)
370         self.assertEqual(f.seek(-6, 2), 6)
371         self.assertEqual(f.read(5), b"world")
372         self.assertEqual(f.read(0), b"")
373         self.assertEqual(f.readinto(byteslike()), 0)
374         self.assertEqual(f.seek(-6, 1), 5)
375         self.assertEqual(f.read(5), b" worl")
376         self.assertEqual(f.tell(), 10)
377         self.assertRaises(TypeError, f.seek, 0.0)
378         if buffered:
379             f.seek(0)
380             self.assertEqual(f.read(), b"hello world\n")
381             f.seek(6)
382             self.assertEqual(f.read(), b"world\n")
383             self.assertEqual(f.read(), b"")
384             f.seek(0)
385             data = byteslike(5)
386             self.assertEqual(f.readinto1(data), 5)
387             self.assertEqual(bytes(data), b"hello")
388 
389     LARGE = 2**31
390 
391     def large_file_ops(self, f):
392         assert f.readable()
393         assert f.writable()
394         try:
395             self.assertEqual(f.seek(self.LARGE), self.LARGE)
396         except (OverflowError, ValueError):
397             self.skipTest("no largefile support")
398         self.assertEqual(f.tell(), self.LARGE)
399         self.assertEqual(f.write(b"xxx"), 3)
400         self.assertEqual(f.tell(), self.LARGE + 3)
401         self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
402         self.assertEqual(f.truncate(), self.LARGE + 2)
403         self.assertEqual(f.tell(), self.LARGE + 2)
404         self.assertEqual(f.seek(0, 2), self.LARGE + 2)
405         self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
406         self.assertEqual(f.tell(), self.LARGE + 2)
407         self.assertEqual(f.seek(0, 2), self.LARGE + 1)
408         self.assertEqual(f.seek(-1, 2), self.LARGE)
409         self.assertEqual(f.read(2), b"x")
410 
411     def test_invalid_operations(self):
412         # Try writing on a file opened in read mode and vice-versa.
413         exc = self.UnsupportedOperation
414         with self.open(os_helper.TESTFN, "w", encoding="utf-8") as fp:
415             self.assertRaises(exc, fp.read)
416             self.assertRaises(exc, fp.readline)
417         with self.open(os_helper.TESTFN, "wb") as fp:
418             self.assertRaises(exc, fp.read)
419             self.assertRaises(exc, fp.readline)
420         with self.open(os_helper.TESTFN, "wb", buffering=0) as fp:
421             self.assertRaises(exc, fp.read)
422             self.assertRaises(exc, fp.readline)
423         with self.open(os_helper.TESTFN, "rb", buffering=0) as fp:
424             self.assertRaises(exc, fp.write, b"blah")
425             self.assertRaises(exc, fp.writelines, [b"blah\n"])
426         with self.open(os_helper.TESTFN, "rb") as fp:
427             self.assertRaises(exc, fp.write, b"blah")
428             self.assertRaises(exc, fp.writelines, [b"blah\n"])
429         with self.open(os_helper.TESTFN, "r", encoding="utf-8") as fp:
430             self.assertRaises(exc, fp.write, "blah")
431             self.assertRaises(exc, fp.writelines, ["blah\n"])
432             # Non-zero seeking from current or end pos
433             self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
434             self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
435 
436     def test_optional_abilities(self):
437         # Test for OSError when optional APIs are not supported
438         # The purpose of this test is to try fileno(), reading, writing and
439         # seeking operations with various objects that indicate they do not
440         # support these operations.
441 
442         def pipe_reader():
443             [r, w] = os.pipe()
444             os.close(w)  # So that read() is harmless
445             return self.FileIO(r, "r")
446 
447         def pipe_writer():
448             [r, w] = os.pipe()
449             self.addCleanup(os.close, r)
450             # Guarantee that we can write into the pipe without blocking
451             thread = threading.Thread(target=os.read, args=(r, 100))
452             thread.start()
453             self.addCleanup(thread.join)
454             return self.FileIO(w, "w")
455 
456         def buffered_reader():
457             return self.BufferedReader(self.MockUnseekableIO())
458 
459         def buffered_writer():
460             return self.BufferedWriter(self.MockUnseekableIO())
461 
462         def buffered_random():
463             return self.BufferedRandom(self.BytesIO())
464 
465         def buffered_rw_pair():
466             return self.BufferedRWPair(self.MockUnseekableIO(),
467                 self.MockUnseekableIO())
468 
469         def text_reader():
470             class UnseekableReader(self.MockUnseekableIO):
471                 writable = self.BufferedIOBase.writable
472                 write = self.BufferedIOBase.write
473             return self.TextIOWrapper(UnseekableReader(), "ascii")
474 
475         def text_writer():
476             class UnseekableWriter(self.MockUnseekableIO):
477                 readable = self.BufferedIOBase.readable
478                 read = self.BufferedIOBase.read
479             return self.TextIOWrapper(UnseekableWriter(), "ascii")
480 
481         tests = (
482             (pipe_reader, "fr"), (pipe_writer, "fw"),
483             (buffered_reader, "r"), (buffered_writer, "w"),
484             (buffered_random, "rws"), (buffered_rw_pair, "rw"),
485             (text_reader, "r"), (text_writer, "w"),
486             (self.BytesIO, "rws"), (self.StringIO, "rws"),
487         )
488         for [test, abilities] in tests:
489             with self.subTest(test), test() as obj:
490                 readable = "r" in abilities
491                 self.assertEqual(obj.readable(), readable)
492                 writable = "w" in abilities
493                 self.assertEqual(obj.writable(), writable)
494 
495                 if isinstance(obj, self.TextIOBase):
496                     data = "3"
497                 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
498                     data = b"3"
499                 else:
500                     self.fail("Unknown base class")
501 
502                 if "f" in abilities:
503                     obj.fileno()
504                 else:
505                     self.assertRaises(OSError, obj.fileno)
506 
507                 if readable:
508                     obj.read(1)
509                     obj.read()
510                 else:
511                     self.assertRaises(OSError, obj.read, 1)
512                     self.assertRaises(OSError, obj.read)
513 
514                 if writable:
515                     obj.write(data)
516                 else:
517                     self.assertRaises(OSError, obj.write, data)
518 
519                 if sys.platform.startswith("win") and test in (
520                         pipe_reader, pipe_writer):
521                     # Pipes seem to appear as seekable on Windows
522                     continue
523                 seekable = "s" in abilities
524                 self.assertEqual(obj.seekable(), seekable)
525 
526                 if seekable:
527                     obj.tell()
528                     obj.seek(0)
529                 else:
530                     self.assertRaises(OSError, obj.tell)
531                     self.assertRaises(OSError, obj.seek, 0)
532 
533                 if writable and seekable:
534                     obj.truncate()
535                     obj.truncate(0)
536                 else:
537                     self.assertRaises(OSError, obj.truncate)
538                     self.assertRaises(OSError, obj.truncate, 0)
539 
540     def test_open_handles_NUL_chars(self):
541         fn_with_NUL = 'foo\0bar'
542         self.assertRaises(ValueError, self.open, fn_with_NUL, 'w', encoding="utf-8")
543 
544         bytes_fn = bytes(fn_with_NUL, 'ascii')
545         with warnings.catch_warnings():
546             warnings.simplefilter("ignore", DeprecationWarning)
547             self.assertRaises(ValueError, self.open, bytes_fn, 'w', encoding="utf-8")
548 
549     def test_raw_file_io(self):
550         with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
551             self.assertEqual(f.readable(), False)
552             self.assertEqual(f.writable(), True)
553             self.assertEqual(f.seekable(), True)
554             self.write_ops(f)
555         with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
556             self.assertEqual(f.readable(), True)
557             self.assertEqual(f.writable(), False)
558             self.assertEqual(f.seekable(), True)
559             self.read_ops(f)
560 
561     def test_buffered_file_io(self):
562         with self.open(os_helper.TESTFN, "wb") as f:
563             self.assertEqual(f.readable(), False)
564             self.assertEqual(f.writable(), True)
565             self.assertEqual(f.seekable(), True)
566             self.write_ops(f)
567         with self.open(os_helper.TESTFN, "rb") as f:
568             self.assertEqual(f.readable(), True)
569             self.assertEqual(f.writable(), False)
570             self.assertEqual(f.seekable(), True)
571             self.read_ops(f, True)
572 
573     def test_readline(self):
574         with self.open(os_helper.TESTFN, "wb") as f:
575             f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
576         with self.open(os_helper.TESTFN, "rb") as f:
577             self.assertEqual(f.readline(), b"abc\n")
578             self.assertEqual(f.readline(10), b"def\n")
579             self.assertEqual(f.readline(2), b"xy")
580             self.assertEqual(f.readline(4), b"zzy\n")
581             self.assertEqual(f.readline(), b"foo\x00bar\n")
582             self.assertEqual(f.readline(None), b"another line")
583             self.assertRaises(TypeError, f.readline, 5.3)
584         with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
585             self.assertRaises(TypeError, f.readline, 5.3)
586 
587     def test_readline_nonsizeable(self):
588         # Issue #30061
589         # Crash when readline() returns an object without __len__
590         class R(self.IOBase):
591             def readline(self):
592                 return None
593         self.assertRaises((TypeError, StopIteration), next, R())
594 
595     def test_next_nonsizeable(self):
596         # Issue #30061
597         # Crash when __next__() returns an object without __len__
598         class R(self.IOBase):
599             def __next__(self):
600                 return None
601         self.assertRaises(TypeError, R().readlines, 1)
602 
603     def test_raw_bytes_io(self):
604         f = self.BytesIO()
605         self.write_ops(f)
606         data = f.getvalue()
607         self.assertEqual(data, b"hello world\n")
608         f = self.BytesIO(data)
609         self.read_ops(f, True)
610 
611     def test_large_file_ops(self):
612         # On Windows and Mac OSX this test consumes large resources; It takes
613         # a long time to build the >2 GiB file and takes >2 GiB of disk space
614         # therefore the resource must be enabled to run this test.
615         if sys.platform[:3] == 'win' or sys.platform == 'darwin':
616             support.requires(
617                 'largefile',
618                 'test requires %s bytes and a long time to run' % self.LARGE)
619         with self.open(os_helper.TESTFN, "w+b", 0) as f:
620             self.large_file_ops(f)
621         with self.open(os_helper.TESTFN, "w+b") as f:
622             self.large_file_ops(f)
623 
624     def test_with_open(self):
625         for bufsize in (0, 100):
626             f = None
627             with self.open(os_helper.TESTFN, "wb", bufsize) as f:
628                 f.write(b"xxx")
629             self.assertEqual(f.closed, True)
630             f = None
631             try:
632                 with self.open(os_helper.TESTFN, "wb", bufsize) as f:
633                     1/0
634             except ZeroDivisionError:
635                 self.assertEqual(f.closed, True)
636             else:
637                 self.fail("1/0 didn't raise an exception")
638 
639     # issue 5008
640     def test_append_mode_tell(self):
641         with self.open(os_helper.TESTFN, "wb") as f:
642             f.write(b"xxx")
643         with self.open(os_helper.TESTFN, "ab", buffering=0) as f:
644             self.assertEqual(f.tell(), 3)
645         with self.open(os_helper.TESTFN, "ab") as f:
646             self.assertEqual(f.tell(), 3)
647         with self.open(os_helper.TESTFN, "a", encoding="utf-8") as f:
648             self.assertGreater(f.tell(), 0)
649 
650     def test_destructor(self):
651         record = []
652         class MyFileIO(self.FileIO):
653             def __del__(self):
654                 record.append(1)
655                 try:
656                     f = super().__del__
657                 except AttributeError:
658                     pass
659                 else:
660                     f()
661             def close(self):
662                 record.append(2)
663                 super().close()
664             def flush(self):
665                 record.append(3)
666                 super().flush()
667         with warnings_helper.check_warnings(('', ResourceWarning)):
668             f = MyFileIO(os_helper.TESTFN, "wb")
669             f.write(b"xxx")
670             del f
671             support.gc_collect()
672             self.assertEqual(record, [1, 2, 3])
673             with self.open(os_helper.TESTFN, "rb") as f:
674                 self.assertEqual(f.read(), b"xxx")
675 
676     def _check_base_destructor(self, base):
677         record = []
678         class MyIO(base):
679             def __init__(self):
680                 # This exercises the availability of attributes on object
681                 # destruction.
682                 # (in the C version, close() is called by the tp_dealloc
683                 # function, not by __del__)
684                 self.on_del = 1
685                 self.on_close = 2
686                 self.on_flush = 3
687             def __del__(self):
688                 record.append(self.on_del)
689                 try:
690                     f = super().__del__
691                 except AttributeError:
692                     pass
693                 else:
694                     f()
695             def close(self):
696                 record.append(self.on_close)
697                 super().close()
698             def flush(self):
699                 record.append(self.on_flush)
700                 super().flush()
701         f = MyIO()
702         del f
703         support.gc_collect()
704         self.assertEqual(record, [1, 2, 3])
705 
706     def test_IOBase_destructor(self):
707         self._check_base_destructor(self.IOBase)
708 
709     def test_RawIOBase_destructor(self):
710         self._check_base_destructor(self.RawIOBase)
711 
712     def test_BufferedIOBase_destructor(self):
713         self._check_base_destructor(self.BufferedIOBase)
714 
715     def test_TextIOBase_destructor(self):
716         self._check_base_destructor(self.TextIOBase)
717 
718     def test_close_flushes(self):
719         with self.open(os_helper.TESTFN, "wb") as f:
720             f.write(b"xxx")
721         with self.open(os_helper.TESTFN, "rb") as f:
722             self.assertEqual(f.read(), b"xxx")
723 
724     def test_array_writes(self):
725         a = array.array('i', range(10))
726         n = len(a.tobytes())
727         def check(f):
728             with f:
729                 self.assertEqual(f.write(a), n)
730                 f.writelines((a,))
731         check(self.BytesIO())
732         check(self.FileIO(os_helper.TESTFN, "w"))
733         check(self.BufferedWriter(self.MockRawIO()))
734         check(self.BufferedRandom(self.MockRawIO()))
735         check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
736 
737     def test_closefd(self):
738         self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w',
739                           encoding="utf-8", closefd=False)
740 
741     def test_read_closed(self):
742         with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
743             f.write("egg\n")
744         with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
745             file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False)
746             self.assertEqual(file.read(), "egg\n")
747             file.seek(0)
748             file.close()
749             self.assertRaises(ValueError, file.read)
750         with self.open(os_helper.TESTFN, "rb") as f:
751             file = self.open(f.fileno(), "rb", closefd=False)
752             self.assertEqual(file.read()[:3], b"egg")
753             file.close()
754             self.assertRaises(ValueError, file.readinto, bytearray(1))
755 
756     def test_no_closefd_with_filename(self):
757         # can't use closefd in combination with a file name
758         self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r",
759                           encoding="utf-8", closefd=False)
760 
761     def test_closefd_attr(self):
762         with self.open(os_helper.TESTFN, "wb") as f:
763             f.write(b"egg\n")
764         with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
765             self.assertEqual(f.buffer.raw.closefd, True)
766             file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False)
767             self.assertEqual(file.buffer.raw.closefd, False)
768 
769     def test_garbage_collection(self):
770         # FileIO objects are collected, and collecting them flushes
771         # all data to disk.
772         with warnings_helper.check_warnings(('', ResourceWarning)):
773             f = self.FileIO(os_helper.TESTFN, "wb")
774             f.write(b"abcxxx")
775             f.f = f
776             wr = weakref.ref(f)
777             del f
778             support.gc_collect()
779         self.assertIsNone(wr(), wr)
780         with self.open(os_helper.TESTFN, "rb") as f:
781             self.assertEqual(f.read(), b"abcxxx")
782 
783     def test_unbounded_file(self):
784         # Issue #1174606: reading from an unbounded stream such as /dev/zero.
785         zero = "/dev/zero"
786         if not os.path.exists(zero):
787             self.skipTest("{0} does not exist".format(zero))
788         if sys.maxsize > 0x7FFFFFFF:
789             self.skipTest("test can only run in a 32-bit address space")
790         if support.real_max_memuse < support._2G:
791             self.skipTest("test requires at least 2 GiB of memory")
792         with self.open(zero, "rb", buffering=0) as f:
793             self.assertRaises(OverflowError, f.read)
794         with self.open(zero, "rb") as f:
795             self.assertRaises(OverflowError, f.read)
796         with self.open(zero, "r") as f:
797             self.assertRaises(OverflowError, f.read)
798 
799     def check_flush_error_on_close(self, *args, **kwargs):
800         # Test that the file is closed despite failed flush
801         # and that flush() is called before file closed.
802         f = self.open(*args, **kwargs)
803         closed = []
804         def bad_flush():
805             closed[:] = [f.closed]
806             raise OSError()
807         f.flush = bad_flush
808         self.assertRaises(OSError, f.close) # exception not swallowed
809         self.assertTrue(f.closed)
810         self.assertTrue(closed)      # flush() called
811         self.assertFalse(closed[0])  # flush() called before file closed
812         f.flush = lambda: None  # break reference loop
813 
814     def test_flush_error_on_close(self):
815         # raw file
816         # Issue #5700: io.FileIO calls flush() after file closed
817         self.check_flush_error_on_close(os_helper.TESTFN, 'wb', buffering=0)
818         fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
819         self.check_flush_error_on_close(fd, 'wb', buffering=0)
820         fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
821         self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
822         os.close(fd)
823         # buffered io
824         self.check_flush_error_on_close(os_helper.TESTFN, 'wb')
825         fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
826         self.check_flush_error_on_close(fd, 'wb')
827         fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
828         self.check_flush_error_on_close(fd, 'wb', closefd=False)
829         os.close(fd)
830         # text io
831         self.check_flush_error_on_close(os_helper.TESTFN, 'w', encoding="utf-8")
832         fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
833         self.check_flush_error_on_close(fd, 'w', encoding="utf-8")
834         fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
835         self.check_flush_error_on_close(fd, 'w', encoding="utf-8", closefd=False)
836         os.close(fd)
837 
838     def test_multi_close(self):
839         f = self.open(os_helper.TESTFN, "wb", buffering=0)
840         f.close()
841         f.close()
842         f.close()
843         self.assertRaises(ValueError, f.flush)
844 
845     def test_RawIOBase_read(self):
846         # Exercise the default limited RawIOBase.read(n) implementation (which
847         # calls readinto() internally).
848         rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
849         self.assertEqual(rawio.read(2), b"ab")
850         self.assertEqual(rawio.read(2), b"c")
851         self.assertEqual(rawio.read(2), b"d")
852         self.assertEqual(rawio.read(2), None)
853         self.assertEqual(rawio.read(2), b"ef")
854         self.assertEqual(rawio.read(2), b"g")
855         self.assertEqual(rawio.read(2), None)
856         self.assertEqual(rawio.read(2), b"")
857 
858     def test_types_have_dict(self):
859         test = (
860             self.IOBase(),
861             self.RawIOBase(),
862             self.TextIOBase(),
863             self.StringIO(),
864             self.BytesIO()
865         )
866         for obj in test:
867             self.assertTrue(hasattr(obj, "__dict__"))
868 
869     def test_opener(self):
870         with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
871             f.write("egg\n")
872         fd = os.open(os_helper.TESTFN, os.O_RDONLY)
873         def opener(path, flags):
874             return fd
875         with self.open("non-existent", "r", encoding="utf-8", opener=opener) as f:
876             self.assertEqual(f.read(), "egg\n")
877 
878     def test_bad_opener_negative_1(self):
879         # Issue #27066.
880         def badopener(fname, flags):
881             return -1
882         with self.assertRaises(ValueError) as cm:
883             open('non-existent', 'r', opener=badopener)
884         self.assertEqual(str(cm.exception), 'opener returned -1')
885 
886     def test_bad_opener_other_negative(self):
887         # Issue #27066.
888         def badopener(fname, flags):
889             return -2
890         with self.assertRaises(ValueError) as cm:
891             open('non-existent', 'r', opener=badopener)
892         self.assertEqual(str(cm.exception), 'opener returned -2')
893 
894     def test_fileio_closefd(self):
895         # Issue #4841
896         with self.open(__file__, 'rb') as f1, \
897              self.open(__file__, 'rb') as f2:
898             fileio = self.FileIO(f1.fileno(), closefd=False)
899             # .__init__() must not close f1
900             fileio.__init__(f2.fileno(), closefd=False)
901             f1.readline()
902             # .close() must not close f2
903             fileio.close()
904             f2.readline()
905 
906     def test_nonbuffered_textio(self):
907         with warnings_helper.check_no_resource_warning(self):
908             with self.assertRaises(ValueError):
909                 self.open(os_helper.TESTFN, 'w', encoding="utf-8", buffering=0)
910 
911     def test_invalid_newline(self):
912         with warnings_helper.check_no_resource_warning(self):
913             with self.assertRaises(ValueError):
914                 self.open(os_helper.TESTFN, 'w', encoding="utf-8", newline='invalid')
915 
916     def test_buffered_readinto_mixin(self):
917         # Test the implementation provided by BufferedIOBase
918         class Stream(self.BufferedIOBase):
919             def read(self, size):
920                 return b"12345"
921             read1 = read
922         stream = Stream()
923         for method in ("readinto", "readinto1"):
924             with self.subTest(method):
925                 buffer = byteslike(5)
926                 self.assertEqual(getattr(stream, method)(buffer), 5)
927                 self.assertEqual(bytes(buffer), b"12345")
928 
929     def test_fspath_support(self):
930         def check_path_succeeds(path):
931             with self.open(path, "w", encoding="utf-8") as f:
932                 f.write("egg\n")
933 
934             with self.open(path, "r", encoding="utf-8") as f:
935                 self.assertEqual(f.read(), "egg\n")
936 
937         check_path_succeeds(FakePath(os_helper.TESTFN))
938         check_path_succeeds(FakePath(os.fsencode(os_helper.TESTFN)))
939 
940         with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
941             bad_path = FakePath(f.fileno())
942             with self.assertRaises(TypeError):
943                 self.open(bad_path, 'w', encoding="utf-8")
944 
945         bad_path = FakePath(None)
946         with self.assertRaises(TypeError):
947             self.open(bad_path, 'w', encoding="utf-8")
948 
949         bad_path = FakePath(FloatingPointError)
950         with self.assertRaises(FloatingPointError):
951             self.open(bad_path, 'w', encoding="utf-8")
952 
953         # ensure that refcounting is correct with some error conditions
954         with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
955             self.open(FakePath(os_helper.TESTFN), 'rwxa', encoding="utf-8")
956 
957     def test_RawIOBase_readall(self):
958         # Exercise the default unlimited RawIOBase.read() and readall()
959         # implementations.
960         rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
961         self.assertEqual(rawio.read(), b"abcdefg")
962         rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
963         self.assertEqual(rawio.readall(), b"abcdefg")
964 
965     def test_BufferedIOBase_readinto(self):
966         # Exercise the default BufferedIOBase.readinto() and readinto1()
967         # implementations (which call read() or read1() internally).
968         class Reader(self.BufferedIOBase):
969             def __init__(self, avail):
970                 self.avail = avail
971             def read(self, size):
972                 result = self.avail[:size]
973                 self.avail = self.avail[size:]
974                 return result
975             def read1(self, size):
976                 """Returns no more than 5 bytes at once"""
977                 return self.read(min(size, 5))
978         tests = (
979             # (test method, total data available, read buffer size, expected
980             #     read size)
981             ("readinto", 10, 5, 5),
982             ("readinto", 10, 6, 6),  # More than read1() can return
983             ("readinto", 5, 6, 5),  # Buffer larger than total available
984             ("readinto", 6, 7, 6),
985             ("readinto", 10, 0, 0),  # Empty buffer
986             ("readinto1", 10, 5, 5),  # Result limited to single read1() call
987             ("readinto1", 10, 6, 5),  # Buffer larger than read1() can return
988             ("readinto1", 5, 6, 5),  # Buffer larger than total available
989             ("readinto1", 6, 7, 5),
990             ("readinto1", 10, 0, 0),  # Empty buffer
991         )
992         UNUSED_BYTE = 0x81
993         for test in tests:
994             with self.subTest(test):
995                 method, avail, request, result = test
996                 reader = Reader(bytes(range(avail)))
997                 buffer = bytearray((UNUSED_BYTE,) * request)
998                 method = getattr(reader, method)
999                 self.assertEqual(method(buffer), result)
1000                 self.assertEqual(len(buffer), request)
1001                 self.assertSequenceEqual(buffer[:result], range(result))
1002                 unused = (UNUSED_BYTE,) * (request - result)
1003                 self.assertSequenceEqual(buffer[result:], unused)
1004                 self.assertEqual(len(reader.avail), avail - result)
1005 
1006     def test_close_assert(self):
1007         class R(self.IOBase):
1008             def __setattr__(self, name, value):
1009                 pass
1010             def flush(self):
1011                 raise OSError()
1012         f = R()
1013         # This would cause an assertion failure.
1014         self.assertRaises(OSError, f.close)
1015 
1016         # Silence destructor error
1017         R.flush = lambda self: None
1018 
1019 
1020 class CIOTest(IOTest):
1021 
1022     def test_IOBase_finalize(self):
1023         # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1024         # class which inherits IOBase and an object of this class are caught
1025         # in a reference cycle and close() is already in the method cache.
1026         class MyIO(self.IOBase):
1027             def close(self):
1028                 pass
1029 
1030         # create an instance to populate the method cache
1031         MyIO()
1032         obj = MyIO()
1033         obj.obj = obj
1034         wr = weakref.ref(obj)
1035         del MyIO
1036         del obj
1037         support.gc_collect()
1038         self.assertIsNone(wr(), wr)
1039 
1040 class PyIOTest(IOTest):
1041     pass
1042 
1043 
1044 @support.cpython_only
1045 class APIMismatchTest(unittest.TestCase):
1046 
1047     def test_RawIOBase_io_in_pyio_match(self):
1048         """Test that pyio RawIOBase class has all c RawIOBase methods"""
1049         mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1050                                                ignore=('__weakref__',))
1051         self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1052 
1053     def test_RawIOBase_pyio_in_io_match(self):
1054         """Test that c RawIOBase class has all pyio RawIOBase methods"""
1055         mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1056         self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1057 
1058 
1059 class CommonBufferedTests:
1060     # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1061 
1062     def test_detach(self):
1063         raw = self.MockRawIO()
1064         buf = self.tp(raw)
1065         self.assertIs(buf.detach(), raw)
1066         self.assertRaises(ValueError, buf.detach)
1067 
1068         repr(buf)  # Should still work
1069 
1070     def test_fileno(self):
1071         rawio = self.MockRawIO()
1072         bufio = self.tp(rawio)
1073 
1074         self.assertEqual(42, bufio.fileno())
1075 
1076     def test_invalid_args(self):
1077         rawio = self.MockRawIO()
1078         bufio = self.tp(rawio)
1079         # Invalid whence
1080         self.assertRaises(ValueError, bufio.seek, 0, -1)
1081         self.assertRaises(ValueError, bufio.seek, 0, 9)
1082 
1083     def test_override_destructor(self):
1084         tp = self.tp
1085         record = []
1086         class MyBufferedIO(tp):
1087             def __del__(self):
1088                 record.append(1)
1089                 try:
1090                     f = super().__del__
1091                 except AttributeError:
1092                     pass
1093                 else:
1094                     f()
1095             def close(self):
1096                 record.append(2)
1097                 super().close()
1098             def flush(self):
1099                 record.append(3)
1100                 super().flush()
1101         rawio = self.MockRawIO()
1102         bufio = MyBufferedIO(rawio)
1103         del bufio
1104         support.gc_collect()
1105         self.assertEqual(record, [1, 2, 3])
1106 
1107     def test_context_manager(self):
1108         # Test usability as a context manager
1109         rawio = self.MockRawIO()
1110         bufio = self.tp(rawio)
1111         def _with():
1112             with bufio:
1113                 pass
1114         _with()
1115         # bufio should now be closed, and using it a second time should raise
1116         # a ValueError.
1117         self.assertRaises(ValueError, _with)
1118 
1119     def test_error_through_destructor(self):
1120         # Test that the exception state is not modified by a destructor,
1121         # even if close() fails.
1122         rawio = self.CloseFailureIO()
1123         with support.catch_unraisable_exception() as cm:
1124             with self.assertRaises(AttributeError):
1125                 self.tp(rawio).xyzzy
1126 
1127             if not IOBASE_EMITS_UNRAISABLE:
1128                 self.assertIsNone(cm.unraisable)
1129             elif cm.unraisable is not None:
1130                 self.assertEqual(cm.unraisable.exc_type, OSError)
1131 
1132     def test_repr(self):
1133         raw = self.MockRawIO()
1134         b = self.tp(raw)
1135         clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1136         self.assertRegex(repr(b), "<%s>" % clsname)
1137         raw.name = "dummy"
1138         self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
1139         raw.name = b"dummy"
1140         self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
1141 
1142     def test_recursive_repr(self):
1143         # Issue #25455
1144         raw = self.MockRawIO()
1145         b = self.tp(raw)
1146         with support.swap_attr(raw, 'name', b):
1147             try:
1148                 repr(b)  # Should not crash
1149             except RuntimeError:
1150                 pass
1151 
1152     def test_flush_error_on_close(self):
1153         # Test that buffered file is closed despite failed flush
1154         # and that flush() is called before file closed.
1155         raw = self.MockRawIO()
1156         closed = []
1157         def bad_flush():
1158             closed[:] = [b.closed, raw.closed]
1159             raise OSError()
1160         raw.flush = bad_flush
1161         b = self.tp(raw)
1162         self.assertRaises(OSError, b.close) # exception not swallowed
1163         self.assertTrue(b.closed)
1164         self.assertTrue(raw.closed)
1165         self.assertTrue(closed)      # flush() called
1166         self.assertFalse(closed[0])  # flush() called before file closed
1167         self.assertFalse(closed[1])
1168         raw.flush = lambda: None  # break reference loop
1169 
1170     def test_close_error_on_close(self):
1171         raw = self.MockRawIO()
1172         def bad_flush():
1173             raise OSError('flush')
1174         def bad_close():
1175             raise OSError('close')
1176         raw.close = bad_close
1177         b = self.tp(raw)
1178         b.flush = bad_flush
1179         with self.assertRaises(OSError) as err: # exception not swallowed
1180             b.close()
1181         self.assertEqual(err.exception.args, ('close',))
1182         self.assertIsInstance(err.exception.__context__, OSError)
1183         self.assertEqual(err.exception.__context__.args, ('flush',))
1184         self.assertFalse(b.closed)
1185 
1186         # Silence destructor error
1187         raw.close = lambda: None
1188         b.flush = lambda: None
1189 
1190     def test_nonnormalized_close_error_on_close(self):
1191         # Issue #21677
1192         raw = self.MockRawIO()
1193         def bad_flush():
1194             raise non_existing_flush
1195         def bad_close():
1196             raise non_existing_close
1197         raw.close = bad_close
1198         b = self.tp(raw)
1199         b.flush = bad_flush
1200         with self.assertRaises(NameError) as err: # exception not swallowed
1201             b.close()
1202         self.assertIn('non_existing_close', str(err.exception))
1203         self.assertIsInstance(err.exception.__context__, NameError)
1204         self.assertIn('non_existing_flush', str(err.exception.__context__))
1205         self.assertFalse(b.closed)
1206 
1207         # Silence destructor error
1208         b.flush = lambda: None
1209         raw.close = lambda: None
1210 
1211     def test_multi_close(self):
1212         raw = self.MockRawIO()
1213         b = self.tp(raw)
1214         b.close()
1215         b.close()
1216         b.close()
1217         self.assertRaises(ValueError, b.flush)
1218 
1219     def test_unseekable(self):
1220         bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1221         self.assertRaises(self.UnsupportedOperation, bufio.tell)
1222         self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1223 
1224     def test_readonly_attributes(self):
1225         raw = self.MockRawIO()
1226         buf = self.tp(raw)
1227         x = self.MockRawIO()
1228         with self.assertRaises(AttributeError):
1229             buf.raw = x
1230 
1231 
1232 class SizeofTest:
1233 
1234     @support.cpython_only
1235     def test_sizeof(self):
1236         bufsize1 = 4096
1237         bufsize2 = 8192
1238         rawio = self.MockRawIO()
1239         bufio = self.tp(rawio, buffer_size=bufsize1)
1240         size = sys.getsizeof(bufio) - bufsize1
1241         rawio = self.MockRawIO()
1242         bufio = self.tp(rawio, buffer_size=bufsize2)
1243         self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1244 
1245     @support.cpython_only
1246     def test_buffer_freeing(self) :
1247         bufsize = 4096
1248         rawio = self.MockRawIO()
1249         bufio = self.tp(rawio, buffer_size=bufsize)
1250         size = sys.getsizeof(bufio) - bufsize
1251         bufio.close()
1252         self.assertEqual(sys.getsizeof(bufio), size)
1253 
1254 class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1255     read_mode = "rb"
1256 
1257     def test_constructor(self):
1258         rawio = self.MockRawIO([b"abc"])
1259         bufio = self.tp(rawio)
1260         bufio.__init__(rawio)
1261         bufio.__init__(rawio, buffer_size=1024)
1262         bufio.__init__(rawio, buffer_size=16)
1263         self.assertEqual(b"abc", bufio.read())
1264         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1265         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1266         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1267         rawio = self.MockRawIO([b"abc"])
1268         bufio.__init__(rawio)
1269         self.assertEqual(b"abc", bufio.read())
1270 
1271     def test_uninitialized(self):
1272         bufio = self.tp.__new__(self.tp)
1273         del bufio
1274         bufio = self.tp.__new__(self.tp)
1275         self.assertRaisesRegex((ValueError, AttributeError),
1276                                'uninitialized|has no attribute',
1277                                bufio.read, 0)
1278         bufio.__init__(self.MockRawIO())
1279         self.assertEqual(bufio.read(0), b'')
1280 
1281     def test_read(self):
1282         for arg in (None, 7):
1283             rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1284             bufio = self.tp(rawio)
1285             self.assertEqual(b"abcdefg", bufio.read(arg))
1286         # Invalid args
1287         self.assertRaises(ValueError, bufio.read, -2)
1288 
1289     def test_read1(self):
1290         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1291         bufio = self.tp(rawio)
1292         self.assertEqual(b"a", bufio.read(1))
1293         self.assertEqual(b"b", bufio.read1(1))
1294         self.assertEqual(rawio._reads, 1)
1295         self.assertEqual(b"", bufio.read1(0))
1296         self.assertEqual(b"c", bufio.read1(100))
1297         self.assertEqual(rawio._reads, 1)
1298         self.assertEqual(b"d", bufio.read1(100))
1299         self.assertEqual(rawio._reads, 2)
1300         self.assertEqual(b"efg", bufio.read1(100))
1301         self.assertEqual(rawio._reads, 3)
1302         self.assertEqual(b"", bufio.read1(100))
1303         self.assertEqual(rawio._reads, 4)
1304 
1305     def test_read1_arbitrary(self):
1306         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1307         bufio = self.tp(rawio)
1308         self.assertEqual(b"a", bufio.read(1))
1309         self.assertEqual(b"bc", bufio.read1())
1310         self.assertEqual(b"d", bufio.read1())
1311         self.assertEqual(b"efg", bufio.read1(-1))
1312         self.assertEqual(rawio._reads, 3)
1313         self.assertEqual(b"", bufio.read1())
1314         self.assertEqual(rawio._reads, 4)
1315 
1316     def test_readinto(self):
1317         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1318         bufio = self.tp(rawio)
1319         b = bytearray(2)
1320         self.assertEqual(bufio.readinto(b), 2)
1321         self.assertEqual(b, b"ab")
1322         self.assertEqual(bufio.readinto(b), 2)
1323         self.assertEqual(b, b"cd")
1324         self.assertEqual(bufio.readinto(b), 2)
1325         self.assertEqual(b, b"ef")
1326         self.assertEqual(bufio.readinto(b), 1)
1327         self.assertEqual(b, b"gf")
1328         self.assertEqual(bufio.readinto(b), 0)
1329         self.assertEqual(b, b"gf")
1330         rawio = self.MockRawIO((b"abc", None))
1331         bufio = self.tp(rawio)
1332         self.assertEqual(bufio.readinto(b), 2)
1333         self.assertEqual(b, b"ab")
1334         self.assertEqual(bufio.readinto(b), 1)
1335         self.assertEqual(b, b"cb")
1336 
1337     def test_readinto1(self):
1338         buffer_size = 10
1339         rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1340         bufio = self.tp(rawio, buffer_size=buffer_size)
1341         b = bytearray(2)
1342         self.assertEqual(bufio.peek(3), b'abc')
1343         self.assertEqual(rawio._reads, 1)
1344         self.assertEqual(bufio.readinto1(b), 2)
1345         self.assertEqual(b, b"ab")
1346         self.assertEqual(rawio._reads, 1)
1347         self.assertEqual(bufio.readinto1(b), 1)
1348         self.assertEqual(b[:1], b"c")
1349         self.assertEqual(rawio._reads, 1)
1350         self.assertEqual(bufio.readinto1(b), 2)
1351         self.assertEqual(b, b"de")
1352         self.assertEqual(rawio._reads, 2)
1353         b = bytearray(2*buffer_size)
1354         self.assertEqual(bufio.peek(3), b'fgh')
1355         self.assertEqual(rawio._reads, 3)
1356         self.assertEqual(bufio.readinto1(b), 6)
1357         self.assertEqual(b[:6], b"fghjkl")
1358         self.assertEqual(rawio._reads, 4)
1359 
1360     def test_readinto_array(self):
1361         buffer_size = 60
1362         data = b"a" * 26
1363         rawio = self.MockRawIO((data,))
1364         bufio = self.tp(rawio, buffer_size=buffer_size)
1365 
1366         # Create an array with element size > 1 byte
1367         b = array.array('i', b'x' * 32)
1368         assert len(b) != 16
1369 
1370         # Read into it. We should get as many *bytes* as we can fit into b
1371         # (which is more than the number of elements)
1372         n = bufio.readinto(b)
1373         self.assertGreater(n, len(b))
1374 
1375         # Check that old contents of b are preserved
1376         bm = memoryview(b).cast('B')
1377         self.assertLess(n, len(bm))
1378         self.assertEqual(bm[:n], data[:n])
1379         self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1380 
1381     def test_readinto1_array(self):
1382         buffer_size = 60
1383         data = b"a" * 26
1384         rawio = self.MockRawIO((data,))
1385         bufio = self.tp(rawio, buffer_size=buffer_size)
1386 
1387         # Create an array with element size > 1 byte
1388         b = array.array('i', b'x' * 32)
1389         assert len(b) != 16
1390 
1391         # Read into it. We should get as many *bytes* as we can fit into b
1392         # (which is more than the number of elements)
1393         n = bufio.readinto1(b)
1394         self.assertGreater(n, len(b))
1395 
1396         # Check that old contents of b are preserved
1397         bm = memoryview(b).cast('B')
1398         self.assertLess(n, len(bm))
1399         self.assertEqual(bm[:n], data[:n])
1400         self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1401 
1402     def test_readlines(self):
1403         def bufio():
1404             rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1405             return self.tp(rawio)
1406         self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1407         self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1408         self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
1409 
1410     def test_buffering(self):
1411         data = b"abcdefghi"
1412         dlen = len(data)
1413 
1414         tests = [
1415             [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1416             [ 100, [ 3, 3, 3],     [ dlen ]    ],
1417             [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1418         ]
1419 
1420         for bufsize, buf_read_sizes, raw_read_sizes in tests:
1421             rawio = self.MockFileIO(data)
1422             bufio = self.tp(rawio, buffer_size=bufsize)
1423             pos = 0
1424             for nbytes in buf_read_sizes:
1425                 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
1426                 pos += nbytes
1427             # this is mildly implementation-dependent
1428             self.assertEqual(rawio.read_history, raw_read_sizes)
1429 
1430     def test_read_non_blocking(self):
1431         # Inject some None's in there to simulate EWOULDBLOCK
1432         rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1433         bufio = self.tp(rawio)
1434         self.assertEqual(b"abcd", bufio.read(6))
1435         self.assertEqual(b"e", bufio.read(1))
1436         self.assertEqual(b"fg", bufio.read())
1437         self.assertEqual(b"", bufio.peek(1))
1438         self.assertIsNone(bufio.read())
1439         self.assertEqual(b"", bufio.read())
1440 
1441         rawio = self.MockRawIO((b"a", None, None))
1442         self.assertEqual(b"a", rawio.readall())
1443         self.assertIsNone(rawio.readall())
1444 
1445     def test_read_past_eof(self):
1446         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1447         bufio = self.tp(rawio)
1448 
1449         self.assertEqual(b"abcdefg", bufio.read(9000))
1450 
1451     def test_read_all(self):
1452         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1453         bufio = self.tp(rawio)
1454 
1455         self.assertEqual(b"abcdefg", bufio.read())
1456 
1457     @support.requires_resource('cpu')
1458     def test_threads(self):
1459         try:
1460             # Write out many bytes with exactly the same number of 0's,
1461             # 1's... 255's. This will help us check that concurrent reading
1462             # doesn't duplicate or forget contents.
1463             N = 1000
1464             l = list(range(256)) * N
1465             random.shuffle(l)
1466             s = bytes(bytearray(l))
1467             with self.open(os_helper.TESTFN, "wb") as f:
1468                 f.write(s)
1469             with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw:
1470                 bufio = self.tp(raw, 8)
1471                 errors = []
1472                 results = []
1473                 def f():
1474                     try:
1475                         # Intra-buffer read then buffer-flushing read
1476                         for n in cycle([1, 19]):
1477                             s = bufio.read(n)
1478                             if not s:
1479                                 break
1480                             # list.append() is atomic
1481                             results.append(s)
1482                     except Exception as e:
1483                         errors.append(e)
1484                         raise
1485                 threads = [threading.Thread(target=f) for x in range(20)]
1486                 with threading_helper.start_threads(threads):
1487                     time.sleep(0.02) # yield
1488                 self.assertFalse(errors,
1489                     "the following exceptions were caught: %r" % errors)
1490                 s = b''.join(results)
1491                 for i in range(256):
1492                     c = bytes(bytearray([i]))
1493                     self.assertEqual(s.count(c), N)
1494         finally:
1495             os_helper.unlink(os_helper.TESTFN)
1496 
1497     def test_unseekable(self):
1498         bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1499         self.assertRaises(self.UnsupportedOperation, bufio.tell)
1500         self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1501         bufio.read(1)
1502         self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1503         self.assertRaises(self.UnsupportedOperation, bufio.tell)
1504 
1505     def test_misbehaved_io(self):
1506         rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1507         bufio = self.tp(rawio)
1508         self.assertRaises(OSError, bufio.seek, 0)
1509         self.assertRaises(OSError, bufio.tell)
1510 
1511         # Silence destructor error
1512         bufio.close = lambda: None
1513 
1514     def test_no_extraneous_read(self):
1515         # Issue #9550; when the raw IO object has satisfied the read request,
1516         # we should not issue any additional reads, otherwise it may block
1517         # (e.g. socket).
1518         bufsize = 16
1519         for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1520             rawio = self.MockRawIO([b"x" * n])
1521             bufio = self.tp(rawio, bufsize)
1522             self.assertEqual(bufio.read(n), b"x" * n)
1523             # Simple case: one raw read is enough to satisfy the request.
1524             self.assertEqual(rawio._extraneous_reads, 0,
1525                              "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1526             # A more complex case where two raw reads are needed to satisfy
1527             # the request.
1528             rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1529             bufio = self.tp(rawio, bufsize)
1530             self.assertEqual(bufio.read(n), b"x" * n)
1531             self.assertEqual(rawio._extraneous_reads, 0,
1532                              "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1533 
1534     def test_read_on_closed(self):
1535         # Issue #23796
1536         b = io.BufferedReader(io.BytesIO(b"12"))
1537         b.read(1)
1538         b.close()
1539         self.assertRaises(ValueError, b.peek)
1540         self.assertRaises(ValueError, b.read1, 1)
1541 
1542     def test_truncate_on_read_only(self):
1543         rawio = self.MockFileIO(b"abc")
1544         bufio = self.tp(rawio)
1545         self.assertFalse(bufio.writable())
1546         self.assertRaises(self.UnsupportedOperation, bufio.truncate)
1547         self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
1548 
1549 
1550 class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
1551     tp = io.BufferedReader
1552 
1553     @unittest.skipIf(MEMORY_SANITIZER or ADDRESS_SANITIZER, "sanitizer defaults to crashing "
1554                      "instead of returning NULL for malloc failure.")
1555     def test_constructor(self):
1556         BufferedReaderTest.test_constructor(self)
1557         # The allocation can succeed on 32-bit builds, e.g. with more
1558         # than 2 GiB RAM and a 64-bit kernel.
1559         if sys.maxsize > 0x7FFFFFFF:
1560             rawio = self.MockRawIO()
1561             bufio = self.tp(rawio)
1562             self.assertRaises((OverflowError, MemoryError, ValueError),
1563                 bufio.__init__, rawio, sys.maxsize)
1564 
1565     def test_initialization(self):
1566         rawio = self.MockRawIO([b"abc"])
1567         bufio = self.tp(rawio)
1568         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1569         self.assertRaises(ValueError, bufio.read)
1570         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1571         self.assertRaises(ValueError, bufio.read)
1572         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1573         self.assertRaises(ValueError, bufio.read)
1574 
1575     def test_misbehaved_io_read(self):
1576         rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1577         bufio = self.tp(rawio)
1578         # _pyio.BufferedReader seems to implement reading different, so that
1579         # checking this is not so easy.
1580         self.assertRaises(OSError, bufio.read, 10)
1581 
1582     def test_garbage_collection(self):
1583         # C BufferedReader objects are collected.
1584         # The Python version has __del__, so it ends into gc.garbage instead
1585         self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1586         with warnings_helper.check_warnings(('', ResourceWarning)):
1587             rawio = self.FileIO(os_helper.TESTFN, "w+b")
1588             f = self.tp(rawio)
1589             f.f = f
1590             wr = weakref.ref(f)
1591             del f
1592             support.gc_collect()
1593         self.assertIsNone(wr(), wr)
1594 
1595     def test_args_error(self):
1596         # Issue #17275
1597         with self.assertRaisesRegex(TypeError, "BufferedReader"):
1598             self.tp(io.BytesIO(), 1024, 1024, 1024)
1599 
1600     def test_bad_readinto_value(self):
1601         rawio = io.BufferedReader(io.BytesIO(b"12"))
1602         rawio.readinto = lambda buf: -1
1603         bufio = self.tp(rawio)
1604         with self.assertRaises(OSError) as cm:
1605             bufio.readline()
1606         self.assertIsNone(cm.exception.__cause__)
1607 
1608     def test_bad_readinto_type(self):
1609         rawio = io.BufferedReader(io.BytesIO(b"12"))
1610         rawio.readinto = lambda buf: b''
1611         bufio = self.tp(rawio)
1612         with self.assertRaises(OSError) as cm:
1613             bufio.readline()
1614         self.assertIsInstance(cm.exception.__cause__, TypeError)
1615 
1616 
1617 class PyBufferedReaderTest(BufferedReaderTest):
1618     tp = pyio.BufferedReader
1619 
1620 
1621 class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1622     write_mode = "wb"
1623 
1624     def test_constructor(self):
1625         rawio = self.MockRawIO()
1626         bufio = self.tp(rawio)
1627         bufio.__init__(rawio)
1628         bufio.__init__(rawio, buffer_size=1024)
1629         bufio.__init__(rawio, buffer_size=16)
1630         self.assertEqual(3, bufio.write(b"abc"))
1631         bufio.flush()
1632         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1633         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1634         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1635         bufio.__init__(rawio)
1636         self.assertEqual(3, bufio.write(b"ghi"))
1637         bufio.flush()
1638         self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
1639 
1640     def test_uninitialized(self):
1641         bufio = self.tp.__new__(self.tp)
1642         del bufio
1643         bufio = self.tp.__new__(self.tp)
1644         self.assertRaisesRegex((ValueError, AttributeError),
1645                                'uninitialized|has no attribute',
1646                                bufio.write, b'')
1647         bufio.__init__(self.MockRawIO())
1648         self.assertEqual(bufio.write(b''), 0)
1649 
1650     def test_detach_flush(self):
1651         raw = self.MockRawIO()
1652         buf = self.tp(raw)
1653         buf.write(b"howdy!")
1654         self.assertFalse(raw._write_stack)
1655         buf.detach()
1656         self.assertEqual(raw._write_stack, [b"howdy!"])
1657 
1658     def test_write(self):
1659         # Write to the buffered IO but don't overflow the buffer.
1660         writer = self.MockRawIO()
1661         bufio = self.tp(writer, 8)
1662         bufio.write(b"abc")
1663         self.assertFalse(writer._write_stack)
1664         buffer = bytearray(b"def")
1665         bufio.write(buffer)
1666         buffer[:] = b"***"  # Overwrite our copy of the data
1667         bufio.flush()
1668         self.assertEqual(b"".join(writer._write_stack), b"abcdef")
1669 
1670     def test_write_overflow(self):
1671         writer = self.MockRawIO()
1672         bufio = self.tp(writer, 8)
1673         contents = b"abcdefghijklmnop"
1674         for n in range(0, len(contents), 3):
1675             bufio.write(contents[n:n+3])
1676         flushed = b"".join(writer._write_stack)
1677         # At least (total - 8) bytes were implicitly flushed, perhaps more
1678         # depending on the implementation.
1679         self.assertTrue(flushed.startswith(contents[:-8]), flushed)
1680 
1681     def check_writes(self, intermediate_func):
1682         # Lots of writes, test the flushed output is as expected.
1683         contents = bytes(range(256)) * 1000
1684         n = 0
1685         writer = self.MockRawIO()
1686         bufio = self.tp(writer, 13)
1687         # Generator of write sizes: repeat each N 15 times then proceed to N+1
1688         def gen_sizes():
1689             for size in count(1):
1690                 for i in range(15):
1691                     yield size
1692         sizes = gen_sizes()
1693         while n < len(contents):
1694             size = min(next(sizes), len(contents) - n)
1695             self.assertEqual(bufio.write(contents[n:n+size]), size)
1696             intermediate_func(bufio)
1697             n += size
1698         bufio.flush()
1699         self.assertEqual(contents, b"".join(writer._write_stack))
1700 
1701     def test_writes(self):
1702         self.check_writes(lambda bufio: None)
1703 
1704     def test_writes_and_flushes(self):
1705         self.check_writes(lambda bufio: bufio.flush())
1706 
1707     def test_writes_and_seeks(self):
1708         def _seekabs(bufio):
1709             pos = bufio.tell()
1710             bufio.seek(pos + 1, 0)
1711             bufio.seek(pos - 1, 0)
1712             bufio.seek(pos, 0)
1713         self.check_writes(_seekabs)
1714         def _seekrel(bufio):
1715             pos = bufio.seek(0, 1)
1716             bufio.seek(+1, 1)
1717             bufio.seek(-1, 1)
1718             bufio.seek(pos, 0)
1719         self.check_writes(_seekrel)
1720 
1721     def test_writes_and_truncates(self):
1722         self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
1723 
1724     def test_write_non_blocking(self):
1725         raw = self.MockNonBlockWriterIO()
1726         bufio = self.tp(raw, 8)
1727 
1728         self.assertEqual(bufio.write(b"abcd"), 4)
1729         self.assertEqual(bufio.write(b"efghi"), 5)
1730         # 1 byte will be written, the rest will be buffered
1731         raw.block_on(b"k")
1732         self.assertEqual(bufio.write(b"jklmn"), 5)
1733 
1734         # 8 bytes will be written, 8 will be buffered and the rest will be lost
1735         raw.block_on(b"0")
1736         try:
1737             bufio.write(b"opqrwxyz0123456789")
1738         except self.BlockingIOError as e:
1739             written = e.characters_written
1740         else:
1741             self.fail("BlockingIOError should have been raised")
1742         self.assertEqual(written, 16)
1743         self.assertEqual(raw.pop_written(),
1744             b"abcdefghijklmnopqrwxyz")
1745 
1746         self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
1747         s = raw.pop_written()
1748         # Previously buffered bytes were flushed
1749         self.assertTrue(s.startswith(b"01234567A"), s)
1750 
1751     def test_write_and_rewind(self):
1752         raw = io.BytesIO()
1753         bufio = self.tp(raw, 4)
1754         self.assertEqual(bufio.write(b"abcdef"), 6)
1755         self.assertEqual(bufio.tell(), 6)
1756         bufio.seek(0, 0)
1757         self.assertEqual(bufio.write(b"XY"), 2)
1758         bufio.seek(6, 0)
1759         self.assertEqual(raw.getvalue(), b"XYcdef")
1760         self.assertEqual(bufio.write(b"123456"), 6)
1761         bufio.flush()
1762         self.assertEqual(raw.getvalue(), b"XYcdef123456")
1763 
1764     def test_flush(self):
1765         writer = self.MockRawIO()
1766         bufio = self.tp(writer, 8)
1767         bufio.write(b"abc")
1768         bufio.flush()
1769         self.assertEqual(b"abc", writer._write_stack[0])
1770 
1771     def test_writelines(self):
1772         l = [b'ab', b'cd', b'ef']
1773         writer = self.MockRawIO()
1774         bufio = self.tp(writer, 8)
1775         bufio.writelines(l)
1776         bufio.flush()
1777         self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1778 
1779     def test_writelines_userlist(self):
1780         l = UserList([b'ab', b'cd', b'ef'])
1781         writer = self.MockRawIO()
1782         bufio = self.tp(writer, 8)
1783         bufio.writelines(l)
1784         bufio.flush()
1785         self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1786 
1787     def test_writelines_error(self):
1788         writer = self.MockRawIO()
1789         bufio = self.tp(writer, 8)
1790         self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1791         self.assertRaises(TypeError, bufio.writelines, None)
1792         self.assertRaises(TypeError, bufio.writelines, 'abc')
1793 
1794     def test_destructor(self):
1795         writer = self.MockRawIO()
1796         bufio = self.tp(writer, 8)
1797         bufio.write(b"abc")
1798         del bufio
1799         support.gc_collect()
1800         self.assertEqual(b"abc", writer._write_stack[0])
1801 
1802     def test_truncate(self):
1803         # Truncate implicitly flushes the buffer.
1804         self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1805         with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
1806             bufio = self.tp(raw, 8)
1807             bufio.write(b"abcdef")
1808             self.assertEqual(bufio.truncate(3), 3)
1809             self.assertEqual(bufio.tell(), 6)
1810         with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
1811             self.assertEqual(f.read(), b"abc")
1812 
1813     def test_truncate_after_write(self):
1814         # Ensure that truncate preserves the file position after
1815         # writes longer than the buffer size.
1816         # Issue: https://bugs.python.org/issue32228
1817         self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1818         with self.open(os_helper.TESTFN, "wb") as f:
1819             # Fill with some buffer
1820             f.write(b'\x00' * 10000)
1821         buffer_sizes = [8192, 4096, 200]
1822         for buffer_size in buffer_sizes:
1823             with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f:
1824                 f.write(b'\x00' * (buffer_size + 1))
1825                 # After write write_pos and write_end are set to 0
1826                 f.read(1)
1827                 # read operation makes sure that pos != raw_pos
1828                 f.truncate()
1829                 self.assertEqual(f.tell(), buffer_size + 2)
1830 
1831     @support.requires_resource('cpu')
1832     def test_threads(self):
1833         try:
1834             # Write out many bytes from many threads and test they were
1835             # all flushed.
1836             N = 1000
1837             contents = bytes(range(256)) * N
1838             sizes = cycle([1, 19])
1839             n = 0
1840             queue = deque()
1841             while n < len(contents):
1842                 size = next(sizes)
1843                 queue.append(contents[n:n+size])
1844                 n += size
1845             del contents
1846             # We use a real file object because it allows us to
1847             # exercise situations where the GIL is released before
1848             # writing the buffer to the raw streams. This is in addition
1849             # to concurrency issues due to switching threads in the middle
1850             # of Python code.
1851             with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
1852                 bufio = self.tp(raw, 8)
1853                 errors = []
1854                 def f():
1855                     try:
1856                         while True:
1857                             try:
1858                                 s = queue.popleft()
1859                             except IndexError:
1860                                 return
1861                             bufio.write(s)
1862                     except Exception as e:
1863                         errors.append(e)
1864                         raise
1865                 threads = [threading.Thread(target=f) for x in range(20)]
1866                 with threading_helper.start_threads(threads):
1867                     time.sleep(0.02) # yield
1868                 self.assertFalse(errors,
1869                     "the following exceptions were caught: %r" % errors)
1870                 bufio.close()
1871             with self.open(os_helper.TESTFN, "rb") as f:
1872                 s = f.read()
1873             for i in range(256):
1874                 self.assertEqual(s.count(bytes([i])), N)
1875         finally:
1876             os_helper.unlink(os_helper.TESTFN)
1877 
1878     def test_misbehaved_io(self):
1879         rawio = self.MisbehavedRawIO()
1880         bufio = self.tp(rawio, 5)
1881         self.assertRaises(OSError, bufio.seek, 0)
1882         self.assertRaises(OSError, bufio.tell)
1883         self.assertRaises(OSError, bufio.write, b"abcdef")
1884 
1885         # Silence destructor error
1886         bufio.close = lambda: None
1887 
1888     def test_max_buffer_size_removal(self):
1889         with self.assertRaises(TypeError):
1890             self.tp(self.MockRawIO(), 8, 12)
1891 
1892     def test_write_error_on_close(self):
1893         raw = self.MockRawIO()
1894         def bad_write(b):
1895             raise OSError()
1896         raw.write = bad_write
1897         b = self.tp(raw)
1898         b.write(b'spam')
1899         self.assertRaises(OSError, b.close) # exception not swallowed
1900         self.assertTrue(b.closed)
1901 
1902     def test_slow_close_from_thread(self):
1903         # Issue #31976
1904         rawio = self.SlowFlushRawIO()
1905         bufio = self.tp(rawio, 8)
1906         t = threading.Thread(target=bufio.close)
1907         t.start()
1908         rawio.in_flush.wait()
1909         self.assertRaises(ValueError, bufio.write, b'spam')
1910         self.assertTrue(bufio.closed)
1911         t.join()
1912 
1913 
1914 
1915 class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
1916     tp = io.BufferedWriter
1917 
1918     @unittest.skipIf(MEMORY_SANITIZER or ADDRESS_SANITIZER, "sanitizer defaults to crashing "
1919                      "instead of returning NULL for malloc failure.")
1920     def test_constructor(self):
1921         BufferedWriterTest.test_constructor(self)
1922         # The allocation can succeed on 32-bit builds, e.g. with more
1923         # than 2 GiB RAM and a 64-bit kernel.
1924         if sys.maxsize > 0x7FFFFFFF:
1925             rawio = self.MockRawIO()
1926             bufio = self.tp(rawio)
1927             self.assertRaises((OverflowError, MemoryError, ValueError),
1928                 bufio.__init__, rawio, sys.maxsize)
1929 
1930     def test_initialization(self):
1931         rawio = self.MockRawIO()
1932         bufio = self.tp(rawio)
1933         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1934         self.assertRaises(ValueError, bufio.write, b"def")
1935         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1936         self.assertRaises(ValueError, bufio.write, b"def")
1937         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1938         self.assertRaises(ValueError, bufio.write, b"def")
1939 
1940     def test_garbage_collection(self):
1941         # C BufferedWriter objects are collected, and collecting them flushes
1942         # all data to disk.
1943         # The Python version has __del__, so it ends into gc.garbage instead
1944         self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1945         with warnings_helper.check_warnings(('', ResourceWarning)):
1946             rawio = self.FileIO(os_helper.TESTFN, "w+b")
1947             f = self.tp(rawio)
1948             f.write(b"123xxx")
1949             f.x = f
1950             wr = weakref.ref(f)
1951             del f
1952             support.gc_collect()
1953         self.assertIsNone(wr(), wr)
1954         with self.open(os_helper.TESTFN, "rb") as f:
1955             self.assertEqual(f.read(), b"123xxx")
1956 
1957     def test_args_error(self):
1958         # Issue #17275
1959         with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1960             self.tp(io.BytesIO(), 1024, 1024, 1024)
1961 
1962 
1963 class PyBufferedWriterTest(BufferedWriterTest):
1964     tp = pyio.BufferedWriter
1965 
1966 class BufferedRWPairTest(unittest.TestCase):
1967 
1968     def test_constructor(self):
1969         pair = self.tp(self.MockRawIO(), self.MockRawIO())
1970         self.assertFalse(pair.closed)
1971 
1972     def test_uninitialized(self):
1973         pair = self.tp.__new__(self.tp)
1974         del pair
1975         pair = self.tp.__new__(self.tp)
1976         self.assertRaisesRegex((ValueError, AttributeError),
1977                                'uninitialized|has no attribute',
1978                                pair.read, 0)
1979         self.assertRaisesRegex((ValueError, AttributeError),
1980                                'uninitialized|has no attribute',
1981                                pair.write, b'')
1982         pair.__init__(self.MockRawIO(), self.MockRawIO())
1983         self.assertEqual(pair.read(0), b'')
1984         self.assertEqual(pair.write(b''), 0)
1985 
1986     def test_detach(self):
1987         pair = self.tp(self.MockRawIO(), self.MockRawIO())
1988         self.assertRaises(self.UnsupportedOperation, pair.detach)
1989 
1990     def test_constructor_max_buffer_size_removal(self):
1991         with self.assertRaises(TypeError):
1992             self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1993 
1994     def test_constructor_with_not_readable(self):
1995         class NotReadable(MockRawIO):
1996             def readable(self):
1997                 return False
1998 
1999         self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
2000 
2001     def test_constructor_with_not_writeable(self):
2002         class NotWriteable(MockRawIO):
2003             def writable(self):
2004                 return False
2005 
2006         self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
2007 
2008     def test_read(self):
2009         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2010 
2011         self.assertEqual(pair.read(3), b"abc")
2012         self.assertEqual(pair.read(1), b"d")
2013         self.assertEqual(pair.read(), b"ef")
2014         pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
2015         self.assertEqual(pair.read(None), b"abc")
2016 
2017     def test_readlines(self):
2018         pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
2019         self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2020         self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2021         self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
2022 
2023     def test_read1(self):
2024         # .read1() is delegated to the underlying reader object, so this test
2025         # can be shallow.
2026         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2027 
2028         self.assertEqual(pair.read1(3), b"abc")
2029         self.assertEqual(pair.read1(), b"def")
2030 
2031     def test_readinto(self):
2032         for method in ("readinto", "readinto1"):
2033             with self.subTest(method):
2034                 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2035 
2036                 data = byteslike(b'\0' * 5)
2037                 self.assertEqual(getattr(pair, method)(data), 5)
2038                 self.assertEqual(bytes(data), b"abcde")
2039 
2040     def test_write(self):
2041         w = self.MockRawIO()
2042         pair = self.tp(self.MockRawIO(), w)
2043 
2044         pair.write(b"abc")
2045         pair.flush()
2046         buffer = bytearray(b"def")
2047         pair.write(buffer)
2048         buffer[:] = b"***"  # Overwrite our copy of the data
2049         pair.flush()
2050         self.assertEqual(w._write_stack, [b"abc", b"def"])
2051 
2052     def test_peek(self):
2053         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2054 
2055         self.assertTrue(pair.peek(3).startswith(b"abc"))
2056         self.assertEqual(pair.read(3), b"abc")
2057 
2058     def test_readable(self):
2059         pair = self.tp(self.MockRawIO(), self.MockRawIO())
2060         self.assertTrue(pair.readable())
2061 
2062     def test_writeable(self):
2063         pair = self.tp(self.MockRawIO(), self.MockRawIO())
2064         self.assertTrue(pair.writable())
2065 
2066     def test_seekable(self):
2067         # BufferedRWPairs are never seekable, even if their readers and writers
2068         # are.
2069         pair = self.tp(self.MockRawIO(), self.MockRawIO())
2070         self.assertFalse(pair.seekable())
2071 
2072     # .flush() is delegated to the underlying writer object and has been
2073     # tested in the test_write method.
2074 
2075     def test_close_and_closed(self):
2076         pair = self.tp(self.MockRawIO(), self.MockRawIO())
2077         self.assertFalse(pair.closed)
2078         pair.close()
2079         self.assertTrue(pair.closed)
2080 
2081     def test_reader_close_error_on_close(self):
2082         def reader_close():
2083             reader_non_existing
2084         reader = self.MockRawIO()
2085         reader.close = reader_close
2086         writer = self.MockRawIO()
2087         pair = self.tp(reader, writer)
2088         with self.assertRaises(NameError) as err:
2089             pair.close()
2090         self.assertIn('reader_non_existing', str(err.exception))
2091         self.assertTrue(pair.closed)
2092         self.assertFalse(reader.closed)
2093         self.assertTrue(writer.closed)
2094 
2095         # Silence destructor error
2096         reader.close = lambda: None
2097 
2098     def test_writer_close_error_on_close(self):
2099         def writer_close():
2100             writer_non_existing
2101         reader = self.MockRawIO()
2102         writer = self.MockRawIO()
2103         writer.close = writer_close
2104         pair = self.tp(reader, writer)
2105         with self.assertRaises(NameError) as err:
2106             pair.close()
2107         self.assertIn('writer_non_existing', str(err.exception))
2108         self.assertFalse(pair.closed)
2109         self.assertTrue(reader.closed)
2110         self.assertFalse(writer.closed)
2111 
2112         # Silence destructor error
2113         writer.close = lambda: None
2114         writer = None
2115 
2116         # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
2117         with support.catch_unraisable_exception():
2118             # Ignore BufferedRWPair unraisable exception
2119             with support.catch_unraisable_exception():
2120                 pair = None
2121                 support.gc_collect()
2122             support.gc_collect()
2123 
2124     def test_reader_writer_close_error_on_close(self):
2125         def reader_close():
2126             reader_non_existing
2127         def writer_close():
2128             writer_non_existing
2129         reader = self.MockRawIO()
2130         reader.close = reader_close
2131         writer = self.MockRawIO()
2132         writer.close = writer_close
2133         pair = self.tp(reader, writer)
2134         with self.assertRaises(NameError) as err:
2135             pair.close()
2136         self.assertIn('reader_non_existing', str(err.exception))
2137         self.assertIsInstance(err.exception.__context__, NameError)
2138         self.assertIn('writer_non_existing', str(err.exception.__context__))
2139         self.assertFalse(pair.closed)
2140         self.assertFalse(reader.closed)
2141         self.assertFalse(writer.closed)
2142 
2143         # Silence destructor error
2144         reader.close = lambda: None
2145         writer.close = lambda: None
2146 
2147     def test_isatty(self):
2148         class SelectableIsAtty(MockRawIO):
2149             def __init__(self, isatty):
2150                 MockRawIO.__init__(self)
2151                 self._isatty = isatty
2152 
2153             def isatty(self):
2154                 return self._isatty
2155 
2156         pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2157         self.assertFalse(pair.isatty())
2158 
2159         pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2160         self.assertTrue(pair.isatty())
2161 
2162         pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2163         self.assertTrue(pair.isatty())
2164 
2165         pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2166         self.assertTrue(pair.isatty())
2167 
2168     def test_weakref_clearing(self):
2169         brw = self.tp(self.MockRawIO(), self.MockRawIO())
2170         ref = weakref.ref(brw)
2171         brw = None
2172         ref = None # Shouldn't segfault.
2173 
2174 class CBufferedRWPairTest(BufferedRWPairTest):
2175     tp = io.BufferedRWPair
2176 
2177 class PyBufferedRWPairTest(BufferedRWPairTest):
2178     tp = pyio.BufferedRWPair
2179 
2180 
2181 class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2182     read_mode = "rb+"
2183     write_mode = "wb+"
2184 
2185     def test_constructor(self):
2186         BufferedReaderTest.test_constructor(self)
2187         BufferedWriterTest.test_constructor(self)
2188 
2189     def test_uninitialized(self):
2190         BufferedReaderTest.test_uninitialized(self)
2191         BufferedWriterTest.test_uninitialized(self)
2192 
2193     def test_read_and_write(self):
2194         raw = self.MockRawIO((b"asdf", b"ghjk"))
2195         rw = self.tp(raw, 8)
2196 
2197         self.assertEqual(b"as", rw.read(2))
2198         rw.write(b"ddd")
2199         rw.write(b"eee")
2200         self.assertFalse(raw._write_stack) # Buffer writes
2201         self.assertEqual(b"ghjk", rw.read())
2202         self.assertEqual(b"dddeee", raw._write_stack[0])
2203 
2204     def test_seek_and_tell(self):
2205         raw = self.BytesIO(b"asdfghjkl")
2206         rw = self.tp(raw)
2207 
2208         self.assertEqual(b"as", rw.read(2))
2209         self.assertEqual(2, rw.tell())
2210         rw.seek(0, 0)
2211         self.assertEqual(b"asdf", rw.read(4))
2212 
2213         rw.write(b"123f")
2214         rw.seek(0, 0)
2215         self.assertEqual(b"asdf123fl", rw.read())
2216         self.assertEqual(9, rw.tell())
2217         rw.seek(-4, 2)
2218         self.assertEqual(5, rw.tell())
2219         rw.seek(2, 1)
2220         self.assertEqual(7, rw.tell())
2221         self.assertEqual(b"fl", rw.read(11))
2222         rw.flush()
2223         self.assertEqual(b"asdf123fl", raw.getvalue())
2224 
2225         self.assertRaises(TypeError, rw.seek, 0.0)
2226 
2227     def check_flush_and_read(self, read_func):
2228         raw = self.BytesIO(b"abcdefghi")
2229         bufio = self.tp(raw)
2230 
2231         self.assertEqual(b"ab", read_func(bufio, 2))
2232         bufio.write(b"12")
2233         self.assertEqual(b"ef", read_func(bufio, 2))
2234         self.assertEqual(6, bufio.tell())
2235         bufio.flush()
2236         self.assertEqual(6, bufio.tell())
2237         self.assertEqual(b"ghi", read_func(bufio))
2238         raw.seek(0, 0)
2239         raw.write(b"XYZ")
2240         # flush() resets the read buffer
2241         bufio.flush()
2242         bufio.seek(0, 0)
2243         self.assertEqual(b"XYZ", read_func(bufio, 3))
2244 
2245     def test_flush_and_read(self):
2246         self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2247 
2248     def test_flush_and_readinto(self):
2249         def _readinto(bufio, n=-1):
2250             b = bytearray(n if n >= 0 else 9999)
2251             n = bufio.readinto(b)
2252             return bytes(b[:n])
2253         self.check_flush_and_read(_readinto)
2254 
2255     def test_flush_and_peek(self):
2256         def _peek(bufio, n=-1):
2257             # This relies on the fact that the buffer can contain the whole
2258             # raw stream, otherwise peek() can return less.
2259             b = bufio.peek(n)
2260             if n != -1:
2261                 b = b[:n]
2262             bufio.seek(len(b), 1)
2263             return b
2264         self.check_flush_and_read(_peek)
2265 
2266     def test_flush_and_write(self):
2267         raw = self.BytesIO(b"abcdefghi")
2268         bufio = self.tp(raw)
2269 
2270         bufio.write(b"123")
2271         bufio.flush()
2272         bufio.write(b"45")
2273         bufio.flush()
2274         bufio.seek(0, 0)
2275         self.assertEqual(b"12345fghi", raw.getvalue())
2276         self.assertEqual(b"12345fghi", bufio.read())
2277 
2278     def test_threads(self):
2279         BufferedReaderTest.test_threads(self)
2280         BufferedWriterTest.test_threads(self)
2281 
2282     def test_writes_and_peek(self):
2283         def _peek(bufio):
2284             bufio.peek(1)
2285         self.check_writes(_peek)
2286         def _peek(bufio):
2287             pos = bufio.tell()
2288             bufio.seek(-1, 1)
2289             bufio.peek(1)
2290             bufio.seek(pos, 0)
2291         self.check_writes(_peek)
2292 
2293     def test_writes_and_reads(self):
2294         def _read(bufio):
2295             bufio.seek(-1, 1)
2296             bufio.read(1)
2297         self.check_writes(_read)
2298 
2299     def test_writes_and_read1s(self):
2300         def _read1(bufio):
2301             bufio.seek(-1, 1)
2302             bufio.read1(1)
2303         self.check_writes(_read1)
2304 
2305     def test_writes_and_readintos(self):
2306         def _read(bufio):
2307             bufio.seek(-1, 1)
2308             bufio.readinto(bytearray(1))
2309         self.check_writes(_read)
2310 
2311     def test_write_after_readahead(self):
2312         # Issue #6629: writing after the buffer was filled by readahead should
2313         # first rewind the raw stream.
2314         for overwrite_size in [1, 5]:
2315             raw = self.BytesIO(b"A" * 10)
2316             bufio = self.tp(raw, 4)
2317             # Trigger readahead
2318             self.assertEqual(bufio.read(1), b"A")
2319             self.assertEqual(bufio.tell(), 1)
2320             # Overwriting should rewind the raw stream if it needs so
2321             bufio.write(b"B" * overwrite_size)
2322             self.assertEqual(bufio.tell(), overwrite_size + 1)
2323             # If the write size was smaller than the buffer size, flush() and
2324             # check that rewind happens.
2325             bufio.flush()
2326             self.assertEqual(bufio.tell(), overwrite_size + 1)
2327             s = raw.getvalue()
2328             self.assertEqual(s,
2329                 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2330 
2331     def test_write_rewind_write(self):
2332         # Various combinations of reading / writing / seeking backwards / writing again
2333         def mutate(bufio, pos1, pos2):
2334             assert pos2 >= pos1
2335             # Fill the buffer
2336             bufio.seek(pos1)
2337             bufio.read(pos2 - pos1)
2338             bufio.write(b'\x02')
2339             # This writes earlier than the previous write, but still inside
2340             # the buffer.
2341             bufio.seek(pos1)
2342             bufio.write(b'\x01')
2343 
2344         b = b"\x80\x81\x82\x83\x84"
2345         for i in range(0, len(b)):
2346             for j in range(i, len(b)):
2347                 raw = self.BytesIO(b)
2348                 bufio = self.tp(raw, 100)
2349                 mutate(bufio, i, j)
2350                 bufio.flush()
2351                 expected = bytearray(b)
2352                 expected[j] = 2
2353                 expected[i] = 1
2354                 self.assertEqual(raw.getvalue(), expected,
2355                                  "failed result for i=%d, j=%d" % (i, j))
2356 
2357     def test_truncate_after_read_or_write(self):
2358         raw = self.BytesIO(b"A" * 10)
2359         bufio = self.tp(raw, 100)
2360         self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2361         self.assertEqual(bufio.truncate(), 2)
2362         self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2363         self.assertEqual(bufio.truncate(), 4)
2364 
2365     def test_misbehaved_io(self):
2366         BufferedReaderTest.test_misbehaved_io(self)
2367         BufferedWriterTest.test_misbehaved_io(self)
2368 
2369     def test_interleaved_read_write(self):
2370         # Test for issue #12213
2371         with self.BytesIO(b'abcdefgh') as raw:
2372             with self.tp(raw, 100) as f:
2373                 f.write(b"1")
2374                 self.assertEqual(f.read(1), b'b')
2375                 f.write(b'2')
2376                 self.assertEqual(f.read1(1), b'd')
2377                 f.write(b'3')
2378                 buf = bytearray(1)
2379                 f.readinto(buf)
2380                 self.assertEqual(buf, b'f')
2381                 f.write(b'4')
2382                 self.assertEqual(f.peek(1), b'h')
2383                 f.flush()
2384                 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2385 
2386         with self.BytesIO(b'abc') as raw:
2387             with self.tp(raw, 100) as f:
2388                 self.assertEqual(f.read(1), b'a')
2389                 f.write(b"2")
2390                 self.assertEqual(f.read(1), b'c')
2391                 f.flush()
2392                 self.assertEqual(raw.getvalue(), b'a2c')
2393 
2394     def test_interleaved_readline_write(self):
2395         with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2396             with self.tp(raw) as f:
2397                 f.write(b'1')
2398                 self.assertEqual(f.readline(), b'b\n')
2399                 f.write(b'2')
2400                 self.assertEqual(f.readline(), b'def\n')
2401                 f.write(b'3')
2402                 self.assertEqual(f.readline(), b'\n')
2403                 f.flush()
2404                 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2405 
2406     # You can't construct a BufferedRandom over a non-seekable stream.
2407     test_unseekable = None
2408 
2409     # writable() returns True, so there's no point to test it over
2410     # a writable stream.
2411     test_truncate_on_read_only = None
2412 
2413 
2414 class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
2415     tp = io.BufferedRandom
2416 
2417     @unittest.skipIf(MEMORY_SANITIZER or ADDRESS_SANITIZER, "sanitizer defaults to crashing "
2418                      "instead of returning NULL for malloc failure.")
2419     def test_constructor(self):
2420         BufferedRandomTest.test_constructor(self)
2421         # The allocation can succeed on 32-bit builds, e.g. with more
2422         # than 2 GiB RAM and a 64-bit kernel.
2423         if sys.maxsize > 0x7FFFFFFF:
2424             rawio = self.MockRawIO()
2425             bufio = self.tp(rawio)
2426             self.assertRaises((OverflowError, MemoryError, ValueError),
2427                 bufio.__init__, rawio, sys.maxsize)
2428 
2429     def test_garbage_collection(self):
2430         CBufferedReaderTest.test_garbage_collection(self)
2431         CBufferedWriterTest.test_garbage_collection(self)
2432 
2433     def test_args_error(self):
2434         # Issue #17275
2435         with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2436             self.tp(io.BytesIO(), 1024, 1024, 1024)
2437 
2438 
2439 class PyBufferedRandomTest(BufferedRandomTest):
2440     tp = pyio.BufferedRandom
2441 
2442 
2443 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2444 # properties:
2445 #   - A single output character can correspond to many bytes of input.
2446 #   - The number of input bytes to complete the character can be
2447 #     undetermined until the last input byte is received.
2448 #   - The number of input bytes can vary depending on previous input.
2449 #   - A single input byte can correspond to many characters of output.
2450 #   - The number of output characters can be undetermined until the
2451 #     last input byte is received.
2452 #   - The number of output characters can vary depending on previous input.
2453 
2454 class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2455     """
2456     For testing seek/tell behavior with a stateful, buffering decoder.
2457 
2458     Input is a sequence of words.  Words may be fixed-length (length set
2459     by input) or variable-length (period-terminated).  In variable-length
2460     mode, extra periods are ignored.  Possible words are:
2461       - 'i' followed by a number sets the input length, I (maximum 99).
2462         When I is set to 0, words are space-terminated.
2463       - 'o' followed by a number sets the output length, O (maximum 99).
2464       - Any other word is converted into a word followed by a period on
2465         the output.  The output word consists of the input word truncated
2466         or padded out with hyphens to make its length equal to O.  If O
2467         is 0, the word is output verbatim without truncating or padding.
2468     I and O are initially set to 1.  When I changes, any buffered input is
2469     re-scanned according to the new I.  EOF also terminates the last word.
2470     """
2471 
2472     def __init__(self, errors='strict'):
2473         codecs.IncrementalDecoder.__init__(self, errors)
2474         self.reset()
2475 
2476     def __repr__(self):
2477         return '<SID %x>' % id(self)
2478 
2479     def reset(self):
2480         self.i = 1
2481         self.o = 1
2482         self.buffer = bytearray()
2483 
2484     def getstate(self):
2485         i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2486         return bytes(self.buffer), i*100 + o
2487 
2488     def setstate(self, state):
2489         buffer, io = state
2490         self.buffer = bytearray(buffer)
2491         i, o = divmod(io, 100)
2492         self.i, self.o = i ^ 1, o ^ 1
2493 
2494     def decode(self, input, final=False):
2495         output = ''
2496         for b in input:
2497             if self.i == 0: # variable-length, terminated with period
2498                 if b == ord('.'):
2499                     if self.buffer:
2500                         output += self.process_word()
2501                 else:
2502                     self.buffer.append(b)
2503             else: # fixed-length, terminate after self.i bytes
2504                 self.buffer.append(b)
2505                 if len(self.buffer) == self.i:
2506                     output += self.process_word()
2507         if final and self.buffer: # EOF terminates the last word
2508             output += self.process_word()
2509         return output
2510 
2511     def process_word(self):
2512         output = ''
2513         if self.buffer[0] == ord('i'):
2514             self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2515         elif self.buffer[0] == ord('o'):
2516             self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2517         else:
2518             output = self.buffer.decode('ascii')
2519             if len(output) < self.o:
2520                 output += '-'*self.o # pad out with hyphens
2521             if self.o:
2522                 output = output[:self.o] # truncate to output length
2523             output += '.'
2524         self.buffer = bytearray()
2525         return output
2526 
2527     codecEnabled = False
2528 
2529 
2530 # bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak
2531 # when registering codecs and cleanup functions.
2532 def lookupTestDecoder(name):
2533     if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder':
2534         latin1 = codecs.lookup('latin-1')
2535         return codecs.CodecInfo(
2536             name='test_decoder', encode=latin1.encode, decode=None,
2537             incrementalencoder=None,
2538             streamreader=None, streamwriter=None,
2539             incrementaldecoder=StatefulIncrementalDecoder)
2540 
2541 
2542 class StatefulIncrementalDecoderTest(unittest.TestCase):
2543     """
2544     Make sure the StatefulIncrementalDecoder actually works.
2545     """
2546 
2547     test_cases = [
2548         # I=1, O=1 (fixed-length input == fixed-length output)
2549         (b'abcd', False, 'a.b.c.d.'),
2550         # I=0, O=0 (variable-length input, variable-length output)
2551         (b'oiabcd', True, 'abcd.'),
2552         # I=0, O=0 (should ignore extra periods)
2553         (b'oi...abcd...', True, 'abcd.'),
2554         # I=0, O=6 (variable-length input, fixed-length output)
2555         (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2556         # I=2, O=6 (fixed-length input < fixed-length output)
2557         (b'i.i2.o6xyz', True, 'xy----.z-----.'),
2558         # I=6, O=3 (fixed-length input > fixed-length output)
2559         (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2560         # I=0, then 3; O=29, then 15 (with longer output)
2561         (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2562          'a----------------------------.' +
2563          'b----------------------------.' +
2564          'cde--------------------------.' +
2565          'abcdefghijabcde.' +
2566          'a.b------------.' +
2567          '.c.------------.' +
2568          'd.e------------.' +
2569          'k--------------.' +
2570          'l--------------.' +
2571          'm--------------.')
2572     ]
2573 
2574     def test_decoder(self):
2575         # Try a few one-shot test cases.
2576         for input, eof, output in self.test_cases:
2577             d = StatefulIncrementalDecoder()
2578             self.assertEqual(d.decode(input, eof), output)
2579 
2580         # Also test an unfinished decode, followed by forcing EOF.
2581         d = StatefulIncrementalDecoder()
2582         self.assertEqual(d.decode(b'oiabcd'), '')
2583         self.assertEqual(d.decode(b'', 1), 'abcd.')
2584 
2585 class TextIOWrapperTest(unittest.TestCase):
2586 
2587     def setUp(self):
2588         self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2589         self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
2590         os_helper.unlink(os_helper.TESTFN)
2591         codecs.register(lookupTestDecoder)
2592         self.addCleanup(codecs.unregister, lookupTestDecoder)
2593 
2594     def tearDown(self):
2595         os_helper.unlink(os_helper.TESTFN)
2596 
2597     def test_constructor(self):
2598         r = self.BytesIO(b"\xc3\xa9\n\n")
2599         b = self.BufferedReader(r, 1000)
2600         t = self.TextIOWrapper(b, encoding="utf-8")
2601         t.__init__(b, encoding="latin-1", newline="\r\n")
2602         self.assertEqual(t.encoding, "latin-1")
2603         self.assertEqual(t.line_buffering, False)
2604         t.__init__(b, encoding="utf-8", line_buffering=True)
2605         self.assertEqual(t.encoding, "utf-8")
2606         self.assertEqual(t.line_buffering, True)
2607         self.assertEqual("\xe9\n", t.readline())
2608         self.assertRaises(TypeError, t.__init__, b, encoding="utf-8", newline=42)
2609         self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
2610 
2611     def test_uninitialized(self):
2612         t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2613         del t
2614         t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2615         self.assertRaises(Exception, repr, t)
2616         self.assertRaisesRegex((ValueError, AttributeError),
2617                                'uninitialized|has no attribute',
2618                                t.read, 0)
2619         t.__init__(self.MockRawIO(), encoding="utf-8")
2620         self.assertEqual(t.read(0), '')
2621 
2622     def test_non_text_encoding_codecs_are_rejected(self):
2623         # Ensure the constructor complains if passed a codec that isn't
2624         # marked as a text encoding
2625         # http://bugs.python.org/issue20404
2626         r = self.BytesIO()
2627         b = self.BufferedWriter(r)
2628         with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2629             self.TextIOWrapper(b, encoding="hex")
2630 
2631     def test_detach(self):
2632         r = self.BytesIO()
2633         b = self.BufferedWriter(r)
2634         t = self.TextIOWrapper(b, encoding="ascii")
2635         self.assertIs(t.detach(), b)
2636 
2637         t = self.TextIOWrapper(b, encoding="ascii")
2638         t.write("howdy")
2639         self.assertFalse(r.getvalue())
2640         t.detach()
2641         self.assertEqual(r.getvalue(), b"howdy")
2642         self.assertRaises(ValueError, t.detach)
2643 
2644         # Operations independent of the detached stream should still work
2645         repr(t)
2646         self.assertEqual(t.encoding, "ascii")
2647         self.assertEqual(t.errors, "strict")
2648         self.assertFalse(t.line_buffering)
2649         self.assertFalse(t.write_through)
2650 
2651     def test_repr(self):
2652         raw = self.BytesIO("hello".encode("utf-8"))
2653         b = self.BufferedReader(raw)
2654         t = self.TextIOWrapper(b, encoding="utf-8")
2655         modname = self.TextIOWrapper.__module__
2656         self.assertRegex(repr(t),
2657                          r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
2658         raw.name = "dummy"
2659         self.assertRegex(repr(t),
2660                          r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2661         t.mode = "r"
2662         self.assertRegex(repr(t),
2663                          r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
2664         raw.name = b"dummy"
2665         self.assertRegex(repr(t),
2666                          r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
2667 
2668         t.buffer.detach()
2669         repr(t)  # Should not raise an exception
2670 
2671     def test_recursive_repr(self):
2672         # Issue #25455
2673         raw = self.BytesIO()
2674         t = self.TextIOWrapper(raw, encoding="utf-8")
2675         with support.swap_attr(raw, 'name', t):
2676             try:
2677                 repr(t)  # Should not crash
2678             except RuntimeError:
2679                 pass
2680 
2681     def test_line_buffering(self):
2682         r = self.BytesIO()
2683         b = self.BufferedWriter(r, 1000)
2684         t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True)
2685         t.write("X")
2686         self.assertEqual(r.getvalue(), b"")  # No flush happened
2687         t.write("Y\nZ")
2688         self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
2689         t.write("A\rB")
2690         self.assertEqual(r.getvalue(), b"XY\nZA\rB")
2691 
2692     def test_reconfigure_line_buffering(self):
2693         r = self.BytesIO()
2694         b = self.BufferedWriter(r, 1000)
2695         t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False)
2696         t.write("AB\nC")
2697         self.assertEqual(r.getvalue(), b"")
2698 
2699         t.reconfigure(line_buffering=True)   # implicit flush
2700         self.assertEqual(r.getvalue(), b"AB\nC")
2701         t.write("DEF\nG")
2702         self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2703         t.write("H")
2704         self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2705         t.reconfigure(line_buffering=False)   # implicit flush
2706         self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2707         t.write("IJ")
2708         self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2709 
2710         # Keeping default value
2711         t.reconfigure()
2712         t.reconfigure(line_buffering=None)
2713         self.assertEqual(t.line_buffering, False)
2714         t.reconfigure(line_buffering=True)
2715         t.reconfigure()
2716         t.reconfigure(line_buffering=None)
2717         self.assertEqual(t.line_buffering, True)
2718 
2719     @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
2720     def test_default_encoding(self):
2721         old_environ = dict(os.environ)
2722         try:
2723             # try to get a user preferred encoding different than the current
2724             # locale encoding to check that TextIOWrapper() uses the current
2725             # locale encoding and not the user preferred encoding
2726             for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2727                 if key in os.environ:
2728                     del os.environ[key]
2729 
2730             current_locale_encoding = locale.getpreferredencoding(False)
2731             b = self.BytesIO()
2732             with warnings.catch_warnings():
2733                 warnings.simplefilter("ignore", EncodingWarning)
2734                 t = self.TextIOWrapper(b)
2735             self.assertEqual(t.encoding, current_locale_encoding)
2736         finally:
2737             os.environ.clear()
2738             os.environ.update(old_environ)
2739 
2740     @support.cpython_only
2741     @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
2742     def test_device_encoding(self):
2743         # Issue 15989
2744         import _testcapi
2745         b = self.BytesIO()
2746         b.fileno = lambda: _testcapi.INT_MAX + 1
2747         self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale")
2748         b.fileno = lambda: _testcapi.UINT_MAX + 1
2749         self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale")
2750 
2751     def test_encoding(self):
2752         # Check the encoding attribute is always set, and valid
2753         b = self.BytesIO()
2754         t = self.TextIOWrapper(b, encoding="utf-8")
2755         self.assertEqual(t.encoding, "utf-8")
2756         with warnings.catch_warnings():
2757             warnings.simplefilter("ignore", EncodingWarning)
2758             t = self.TextIOWrapper(b)
2759         self.assertIsNotNone(t.encoding)
2760         codecs.lookup(t.encoding)
2761 
2762     def test_encoding_errors_reading(self):
2763         # (1) default
2764         b = self.BytesIO(b"abc\n\xff\n")
2765         t = self.TextIOWrapper(b, encoding="ascii")
2766         self.assertRaises(UnicodeError, t.read)
2767         # (2) explicit strict
2768         b = self.BytesIO(b"abc\n\xff\n")
2769         t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2770         self.assertRaises(UnicodeError, t.read)
2771         # (3) ignore
2772         b = self.BytesIO(b"abc\n\xff\n")
2773         t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
2774         self.assertEqual(t.read(), "abc\n\n")
2775         # (4) replace
2776         b = self.BytesIO(b"abc\n\xff\n")
2777         t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
2778         self.assertEqual(t.read(), "abc\n\ufffd\n")
2779 
2780     def test_encoding_errors_writing(self):
2781         # (1) default
2782         b = self.BytesIO()
2783         t = self.TextIOWrapper(b, encoding="ascii")
2784         self.assertRaises(UnicodeError, t.write, "\xff")
2785         # (2) explicit strict
2786         b = self.BytesIO()
2787         t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2788         self.assertRaises(UnicodeError, t.write, "\xff")
2789         # (3) ignore
2790         b = self.BytesIO()
2791         t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
2792                              newline="\n")
2793         t.write("abc\xffdef\n")
2794         t.flush()
2795         self.assertEqual(b.getvalue(), b"abcdef\n")
2796         # (4) replace
2797         b = self.BytesIO()
2798         t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
2799                              newline="\n")
2800         t.write("abc\xffdef\n")
2801         t.flush()
2802         self.assertEqual(b.getvalue(), b"abc?def\n")
2803 
2804     def test_newlines(self):
2805         input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2806 
2807         tests = [
2808             [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2809             [ '', input_lines ],
2810             [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2811             [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2812             [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2813         ]
2814         encodings = (
2815             'utf-8', 'latin-1',
2816             'utf-16', 'utf-16-le', 'utf-16-be',
2817             'utf-32', 'utf-32-le', 'utf-32-be',
2818         )
2819 
2820         # Try a range of buffer sizes to test the case where \r is the last
2821         # character in TextIOWrapper._pending_line.
2822         for encoding in encodings:
2823             # XXX: str.encode() should return bytes
2824             data = bytes(''.join(input_lines).encode(encoding))
2825             for do_reads in (False, True):
2826                 for bufsize in range(1, 10):
2827                     for newline, exp_lines in tests:
2828                         bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2829                         textio = self.TextIOWrapper(bufio, newline=newline,
2830                                                   encoding=encoding)
2831                         if do_reads:
2832                             got_lines = []
2833                             while True:
2834                                 c2 = textio.read(2)
2835                                 if c2 == '':
2836                                     break
2837                                 self.assertEqual(len(c2), 2)
2838                                 got_lines.append(c2 + textio.readline())
2839                         else:
2840                             got_lines = list(textio)
2841 
2842                         for got_line, exp_line in zip(got_lines, exp_lines):
2843                             self.assertEqual(got_line, exp_line)
2844                         self.assertEqual(len(got_lines), len(exp_lines))
2845 
2846     def test_newlines_input(self):
2847         testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
2848         normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2849         for newline, expected in [
2850             (None, normalized.decode("ascii").splitlines(keepends=True)),
2851             ("", testdata.decode("ascii").splitlines(keepends=True)),
2852             ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2853             ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2854             ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
2855             ]:
2856             buf = self.BytesIO(testdata)
2857             txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2858             self.assertEqual(txt.readlines(), expected)
2859             txt.seek(0)
2860             self.assertEqual(txt.read(), "".join(expected))
2861 
2862     def test_newlines_output(self):
2863         testdict = {
2864             "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2865             "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2866             "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2867             "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2868             }
2869         tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2870         for newline, expected in tests:
2871             buf = self.BytesIO()
2872             txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2873             txt.write("AAA\nB")
2874             txt.write("BB\nCCC\n")
2875             txt.write("X\rY\r\nZ")
2876             txt.flush()
2877             self.assertEqual(buf.closed, False)
2878             self.assertEqual(buf.getvalue(), expected)
2879 
2880     def test_destructor(self):
2881         l = []
2882         base = self.BytesIO
2883         class MyBytesIO(base):
2884             def close(self):
2885                 l.append(self.getvalue())
2886                 base.close(self)
2887         b = MyBytesIO()
2888         t = self.TextIOWrapper(b, encoding="ascii")
2889         t.write("abc")
2890         del t
2891         support.gc_collect()
2892         self.assertEqual([b"abc"], l)
2893 
2894     def test_override_destructor(self):
2895         record = []
2896         class MyTextIO(self.TextIOWrapper):
2897             def __del__(self):
2898                 record.append(1)
2899                 try:
2900                     f = super().__del__
2901                 except AttributeError:
2902                     pass
2903                 else:
2904                     f()
2905             def close(self):
2906                 record.append(2)
2907                 super().close()
2908             def flush(self):
2909                 record.append(3)
2910                 super().flush()
2911         b = self.BytesIO()
2912         t = MyTextIO(b, encoding="ascii")
2913         del t
2914         support.gc_collect()
2915         self.assertEqual(record, [1, 2, 3])
2916 
2917     def test_error_through_destructor(self):
2918         # Test that the exception state is not modified by a destructor,
2919         # even if close() fails.
2920         rawio = self.CloseFailureIO()
2921         with support.catch_unraisable_exception() as cm:
2922             with self.assertRaises(AttributeError):
2923                 self.TextIOWrapper(rawio, encoding="utf-8").xyzzy
2924 
2925             if not IOBASE_EMITS_UNRAISABLE:
2926                 self.assertIsNone(cm.unraisable)
2927             elif cm.unraisable is not None:
2928                 self.assertEqual(cm.unraisable.exc_type, OSError)
2929 
2930     # Systematic tests of the text I/O API
2931 
2932     def test_basic_io(self):
2933         for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2934             for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
2935                 f = self.open(os_helper.TESTFN, "w+", encoding=enc)
2936                 f._CHUNK_SIZE = chunksize
2937                 self.assertEqual(f.write("abc"), 3)
2938                 f.close()
2939                 f = self.open(os_helper.TESTFN, "r+", encoding=enc)
2940                 f._CHUNK_SIZE = chunksize
2941                 self.assertEqual(f.tell(), 0)
2942                 self.assertEqual(f.read(), "abc")
2943                 cookie = f.tell()
2944                 self.assertEqual(f.seek(0), 0)
2945                 self.assertEqual(f.read(None), "abc")
2946                 f.seek(0)
2947                 self.assertEqual(f.read(2), "ab")
2948                 self.assertEqual(f.read(1), "c")
2949                 self.assertEqual(f.read(1), "")
2950                 self.assertEqual(f.read(), "")
2951                 self.assertEqual(f.tell(), cookie)
2952                 self.assertEqual(f.seek(0), 0)
2953                 self.assertEqual(f.seek(0, 2), cookie)
2954                 self.assertEqual(f.write("def"), 3)
2955                 self.assertEqual(f.seek(cookie), cookie)
2956                 self.assertEqual(f.read(), "def")
2957                 if enc.startswith("utf"):
2958                     self.multi_line_test(f, enc)
2959                 f.close()
2960 
2961     def multi_line_test(self, f, enc):
2962         f.seek(0)
2963         f.truncate()
2964         sample = "s\xff\u0fff\uffff"
2965         wlines = []
2966         for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2967             chars = []
2968             for i in range(size):
2969                 chars.append(sample[i % len(sample)])
2970             line = "".join(chars) + "\n"
2971             wlines.append((f.tell(), line))
2972             f.write(line)
2973         f.seek(0)
2974         rlines = []
2975         while True:
2976             pos = f.tell()
2977             line = f.readline()
2978             if not line:
2979                 break
2980             rlines.append((pos, line))
2981         self.assertEqual(rlines, wlines)
2982 
2983     def test_telling(self):
2984         f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
2985         p0 = f.tell()
2986         f.write("\xff\n")
2987         p1 = f.tell()
2988         f.write("\xff\n")
2989         p2 = f.tell()
2990         f.seek(0)
2991         self.assertEqual(f.tell(), p0)
2992         self.assertEqual(f.readline(), "\xff\n")
2993         self.assertEqual(f.tell(), p1)
2994         self.assertEqual(f.readline(), "\xff\n")
2995         self.assertEqual(f.tell(), p2)
2996         f.seek(0)
2997         for line in f:
2998             self.assertEqual(line, "\xff\n")
2999             self.assertRaises(OSError, f.tell)
3000         self.assertEqual(f.tell(), p2)
3001         f.close()
3002 
3003     def test_seeking(self):
3004         chunk_size = _default_chunk_size()
3005         prefix_size = chunk_size - 2
3006         u_prefix = "a" * prefix_size
3007         prefix = bytes(u_prefix.encode("utf-8"))
3008         self.assertEqual(len(u_prefix), len(prefix))
3009         u_suffix = "\u8888\n"
3010         suffix = bytes(u_suffix.encode("utf-8"))
3011         line = prefix + suffix
3012         with self.open(os_helper.TESTFN, "wb") as f:
3013             f.write(line*2)
3014         with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
3015             s = f.read(prefix_size)
3016             self.assertEqual(s, str(prefix, "ascii"))
3017             self.assertEqual(f.tell(), prefix_size)
3018             self.assertEqual(f.readline(), u_suffix)
3019 
3020     def test_seeking_too(self):
3021         # Regression test for a specific bug
3022         data = b'\xe0\xbf\xbf\n'
3023         with self.open(os_helper.TESTFN, "wb") as f:
3024             f.write(data)
3025         with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
3026             f._CHUNK_SIZE  # Just test that it exists
3027             f._CHUNK_SIZE = 2
3028             f.readline()
3029             f.tell()
3030 
3031     def test_seek_and_tell(self):
3032         #Test seek/tell using the StatefulIncrementalDecoder.
3033         # Make test faster by doing smaller seeks
3034         CHUNK_SIZE = 128
3035 
3036         def test_seek_and_tell_with_data(data, min_pos=0):
3037             """Tell/seek to various points within a data stream and ensure
3038             that the decoded data returned by read() is consistent."""
3039             f = self.open(os_helper.TESTFN, 'wb')
3040             f.write(data)
3041             f.close()
3042             f = self.open(os_helper.TESTFN, encoding='test_decoder')
3043             f._CHUNK_SIZE = CHUNK_SIZE
3044             decoded = f.read()
3045             f.close()
3046 
3047             for i in range(min_pos, len(decoded) + 1): # seek positions
3048                 for j in [1, 5, len(decoded) - i]: # read lengths
3049                     f = self.open(os_helper.TESTFN, encoding='test_decoder')
3050                     self.assertEqual(f.read(i), decoded[:i])
3051                     cookie = f.tell()
3052                     self.assertEqual(f.read(j), decoded[i:i + j])
3053                     f.seek(cookie)
3054                     self.assertEqual(f.read(), decoded[i:])
3055                     f.close()
3056 
3057         # Enable the test decoder.
3058         StatefulIncrementalDecoder.codecEnabled = 1
3059 
3060         # Run the tests.
3061         try:
3062             # Try each test case.
3063             for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3064                 test_seek_and_tell_with_data(input)
3065 
3066             # Position each test case so that it crosses a chunk boundary.
3067             for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3068                 offset = CHUNK_SIZE - len(input)//2
3069                 prefix = b'.'*offset
3070                 # Don't bother seeking into the prefix (takes too long).
3071                 min_pos = offset*2
3072                 test_seek_and_tell_with_data(prefix + input, min_pos)
3073 
3074         # Ensure our test decoder won't interfere with subsequent tests.
3075         finally:
3076             StatefulIncrementalDecoder.codecEnabled = 0
3077 
3078     def test_multibyte_seek_and_tell(self):
3079         f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
3080         f.write("AB\n\u3046\u3048\n")
3081         f.close()
3082 
3083         f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
3084         self.assertEqual(f.readline(), "AB\n")
3085         p0 = f.tell()
3086         self.assertEqual(f.readline(), "\u3046\u3048\n")
3087         p1 = f.tell()
3088         f.seek(p0)
3089         self.assertEqual(f.readline(), "\u3046\u3048\n")
3090         self.assertEqual(f.tell(), p1)
3091         f.close()
3092 
3093     def test_seek_with_encoder_state(self):
3094         f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
3095         f.write("\u00e6\u0300")
3096         p0 = f.tell()
3097         f.write("\u00e6")
3098         f.seek(p0)
3099         f.write("\u0300")
3100         f.close()
3101 
3102         f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
3103         self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3104         f.close()
3105 
3106     def test_encoded_writes(self):
3107         data = "1234567890"
3108         tests = ("utf-16",
3109                  "utf-16-le",
3110                  "utf-16-be",
3111                  "utf-32",
3112                  "utf-32-le",
3113                  "utf-32-be")
3114         for encoding in tests:
3115             buf = self.BytesIO()
3116             f = self.TextIOWrapper(buf, encoding=encoding)
3117             # Check if the BOM is written only once (see issue1753).
3118             f.write(data)
3119             f.write(data)
3120             f.seek(0)
3121             self.assertEqual(f.read(), data * 2)
3122             f.seek(0)
3123             self.assertEqual(f.read(), data * 2)
3124             self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
3125 
3126     def test_unreadable(self):
3127         class UnReadable(self.BytesIO):
3128             def readable(self):
3129                 return False
3130         txt = self.TextIOWrapper(UnReadable(), encoding="utf-8")
3131         self.assertRaises(OSError, txt.read)
3132 
3133     def test_read_one_by_one(self):
3134         txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8")
3135         reads = ""
3136         while True:
3137             c = txt.read(1)
3138             if not c:
3139                 break
3140             reads += c
3141         self.assertEqual(reads, "AA\nBB")
3142 
3143     def test_readlines(self):
3144         txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8")
3145         self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3146         txt.seek(0)
3147         self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3148         txt.seek(0)
3149         self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3150 
3151     # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
3152     def test_read_by_chunk(self):
3153         # make sure "\r\n" straddles 128 char boundary.
3154         txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8")
3155         reads = ""
3156         while True:
3157             c = txt.read(128)
3158             if not c:
3159                 break
3160             reads += c
3161         self.assertEqual(reads, "A"*127+"\nB")
3162 
3163     def test_writelines(self):
3164         l = ['ab', 'cd', 'ef']
3165         buf = self.BytesIO()
3166         txt = self.TextIOWrapper(buf, encoding="utf-8")
3167         txt.writelines(l)
3168         txt.flush()
3169         self.assertEqual(buf.getvalue(), b'abcdef')
3170 
3171     def test_writelines_userlist(self):
3172         l = UserList(['ab', 'cd', 'ef'])
3173         buf = self.BytesIO()
3174         txt = self.TextIOWrapper(buf, encoding="utf-8")
3175         txt.writelines(l)
3176         txt.flush()
3177         self.assertEqual(buf.getvalue(), b'abcdef')
3178 
3179     def test_writelines_error(self):
3180         txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8")
3181         self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3182         self.assertRaises(TypeError, txt.writelines, None)
3183         self.assertRaises(TypeError, txt.writelines, b'abc')
3184 
3185     def test_issue1395_1(self):
3186         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3187 
3188         # read one char at a time
3189         reads = ""
3190         while True:
3191             c = txt.read(1)
3192             if not c:
3193                 break
3194             reads += c
3195         self.assertEqual(reads, self.normalized)
3196 
3197     def test_issue1395_2(self):
3198         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3199         txt._CHUNK_SIZE = 4
3200 
3201         reads = ""
3202         while True:
3203             c = txt.read(4)
3204             if not c:
3205                 break
3206             reads += c
3207         self.assertEqual(reads, self.normalized)
3208 
3209     def test_issue1395_3(self):
3210         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3211         txt._CHUNK_SIZE = 4
3212 
3213         reads = txt.read(4)
3214         reads += txt.read(4)
3215         reads += txt.readline()
3216         reads += txt.readline()
3217         reads += txt.readline()
3218         self.assertEqual(reads, self.normalized)
3219 
3220     def test_issue1395_4(self):
3221         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3222         txt._CHUNK_SIZE = 4
3223 
3224         reads = txt.read(4)
3225         reads += txt.read()
3226         self.assertEqual(reads, self.normalized)
3227 
3228     def test_issue1395_5(self):
3229         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3230         txt._CHUNK_SIZE = 4
3231 
3232         reads = txt.read(4)
3233         pos = txt.tell()
3234         txt.seek(0)
3235         txt.seek(pos)
3236         self.assertEqual(txt.read(4), "BBB\n")
3237 
3238     def test_issue2282(self):
3239         buffer = self.BytesIO(self.testdata)
3240         txt = self.TextIOWrapper(buffer, encoding="ascii")
3241 
3242         self.assertEqual(buffer.seekable(), txt.seekable())
3243 
3244     def test_append_bom(self):
3245         # The BOM is not written again when appending to a non-empty file
3246         filename = os_helper.TESTFN
3247         for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3248             with self.open(filename, 'w', encoding=charset) as f:
3249                 f.write('aaa')
3250                 pos = f.tell()
3251             with self.open(filename, 'rb') as f:
3252                 self.assertEqual(f.read(), 'aaa'.encode(charset))
3253 
3254             with self.open(filename, 'a', encoding=charset) as f:
3255                 f.write('xxx')
3256             with self.open(filename, 'rb') as f:
3257                 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3258 
3259     def test_seek_bom(self):
3260         # Same test, but when seeking manually
3261         filename = os_helper.TESTFN
3262         for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3263             with self.open(filename, 'w', encoding=charset) as f:
3264                 f.write('aaa')
3265                 pos = f.tell()
3266             with self.open(filename, 'r+', encoding=charset) as f:
3267                 f.seek(pos)
3268                 f.write('zzz')
3269                 f.seek(0)
3270                 f.write('bbb')
3271             with self.open(filename, 'rb') as f:
3272                 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
3273 
3274     def test_seek_append_bom(self):
3275         # Same test, but first seek to the start and then to the end
3276         filename = os_helper.TESTFN
3277         for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3278             with self.open(filename, 'w', encoding=charset) as f:
3279                 f.write('aaa')
3280             with self.open(filename, 'a', encoding=charset) as f:
3281                 f.seek(0)
3282                 f.seek(0, self.SEEK_END)
3283                 f.write('xxx')
3284             with self.open(filename, 'rb') as f:
3285                 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3286 
3287     def test_errors_property(self):
3288         with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
3289             self.assertEqual(f.errors, "strict")
3290         with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f:
3291             self.assertEqual(f.errors, "replace")
3292 
3293     @support.no_tracing
3294     def test_threads_write(self):
3295         # Issue6750: concurrent writes could duplicate data
3296         event = threading.Event()
3297         with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f:
3298             def run(n):
3299                 text = "Thread%03d\n" % n
3300                 event.wait()
3301                 f.write(text)
3302             threads = [threading.Thread(target=run, args=(x,))
3303                        for x in range(20)]
3304             with threading_helper.start_threads(threads, event.set):
3305                 time.sleep(0.02)
3306         with self.open(os_helper.TESTFN, encoding="utf-8") as f:
3307             content = f.read()
3308             for n in range(20):
3309                 self.assertEqual(content.count("Thread%03d\n" % n), 1)
3310 
3311     def test_flush_error_on_close(self):
3312         # Test that text file is closed despite failed flush
3313         # and that flush() is called before file closed.
3314         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3315         closed = []
3316         def bad_flush():
3317             closed[:] = [txt.closed, txt.buffer.closed]
3318             raise OSError()
3319         txt.flush = bad_flush
3320         self.assertRaises(OSError, txt.close) # exception not swallowed
3321         self.assertTrue(txt.closed)
3322         self.assertTrue(txt.buffer.closed)
3323         self.assertTrue(closed)      # flush() called
3324         self.assertFalse(closed[0])  # flush() called before file closed
3325         self.assertFalse(closed[1])
3326         txt.flush = lambda: None  # break reference loop
3327 
3328     def test_close_error_on_close(self):
3329         buffer = self.BytesIO(self.testdata)
3330         def bad_flush():
3331             raise OSError('flush')
3332         def bad_close():
3333             raise OSError('close')
3334         buffer.close = bad_close
3335         txt = self.TextIOWrapper(buffer, encoding="ascii")
3336         txt.flush = bad_flush
3337         with self.assertRaises(OSError) as err: # exception not swallowed
3338             txt.close()
3339         self.assertEqual(err.exception.args, ('close',))
3340         self.assertIsInstance(err.exception.__context__, OSError)
3341         self.assertEqual(err.exception.__context__.args, ('flush',))
3342         self.assertFalse(txt.closed)
3343 
3344         # Silence destructor error
3345         buffer.close = lambda: None
3346         txt.flush = lambda: None
3347 
3348     def test_nonnormalized_close_error_on_close(self):
3349         # Issue #21677
3350         buffer = self.BytesIO(self.testdata)
3351         def bad_flush():
3352             raise non_existing_flush
3353         def bad_close():
3354             raise non_existing_close
3355         buffer.close = bad_close
3356         txt = self.TextIOWrapper(buffer, encoding="ascii")
3357         txt.flush = bad_flush
3358         with self.assertRaises(NameError) as err: # exception not swallowed
3359             txt.close()
3360         self.assertIn('non_existing_close', str(err.exception))
3361         self.assertIsInstance(err.exception.__context__, NameError)
3362         self.assertIn('non_existing_flush', str(err.exception.__context__))
3363         self.assertFalse(txt.closed)
3364 
3365         # Silence destructor error
3366         buffer.close = lambda: None
3367         txt.flush = lambda: None
3368 
3369     def test_multi_close(self):
3370         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3371         txt.close()
3372         txt.close()
3373         txt.close()
3374         self.assertRaises(ValueError, txt.flush)
3375 
3376     def test_unseekable(self):
3377         txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8")
3378         self.assertRaises(self.UnsupportedOperation, txt.tell)
3379         self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3380 
3381     def test_readonly_attributes(self):
3382         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3383         buf = self.BytesIO(self.testdata)
3384         with self.assertRaises(AttributeError):
3385             txt.buffer = buf
3386 
3387     def test_rawio(self):
3388         # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3389         # that subprocess.Popen() can have the required unbuffered
3390         # semantics with universal_newlines=True.
3391         raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3392         txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3393         # Reads
3394         self.assertEqual(txt.read(4), 'abcd')
3395         self.assertEqual(txt.readline(), 'efghi\n')
3396         self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3397 
3398     def test_rawio_write_through(self):
3399         # Issue #12591: with write_through=True, writes don't need a flush
3400         raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3401         txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3402                                  write_through=True)
3403         txt.write('1')
3404         txt.write('23\n4')
3405         txt.write('5')
3406         self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3407 
3408     def test_bufio_write_through(self):
3409         # Issue #21396: write_through=True doesn't force a flush()
3410         # on the underlying binary buffered object.
3411         flush_called, write_called = [], []
3412         class BufferedWriter(self.BufferedWriter):
3413             def flush(self, *args, **kwargs):
3414                 flush_called.append(True)
3415                 return super().flush(*args, **kwargs)
3416             def write(self, *args, **kwargs):
3417                 write_called.append(True)
3418                 return super().write(*args, **kwargs)
3419 
3420         rawio = self.BytesIO()
3421         data = b"a"
3422         bufio = BufferedWriter(rawio, len(data)*2)
3423         textio = self.TextIOWrapper(bufio, encoding='ascii',
3424                                     write_through=True)
3425         # write to the buffered io but don't overflow the buffer
3426         text = data.decode('ascii')
3427         textio.write(text)
3428 
3429         # buffer.flush is not called with write_through=True
3430         self.assertFalse(flush_called)
3431         # buffer.write *is* called with write_through=True
3432         self.assertTrue(write_called)
3433         self.assertEqual(rawio.getvalue(), b"") # no flush
3434 
3435         write_called = [] # reset
3436         textio.write(text * 10) # total content is larger than bufio buffer
3437         self.assertTrue(write_called)
3438         self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3439 
3440     def test_reconfigure_write_through(self):
3441         raw = self.MockRawIO([])
3442         t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3443         t.write('1')
3444         t.reconfigure(write_through=True)  # implied flush
3445         self.assertEqual(t.write_through, True)
3446         self.assertEqual(b''.join(raw._write_stack), b'1')
3447         t.write('23')
3448         self.assertEqual(b''.join(raw._write_stack), b'123')
3449         t.reconfigure(write_through=False)
3450         self.assertEqual(t.write_through, False)
3451         t.write('45')
3452         t.flush()
3453         self.assertEqual(b''.join(raw._write_stack), b'12345')
3454         # Keeping default value
3455         t.reconfigure()
3456         t.reconfigure(write_through=None)
3457         self.assertEqual(t.write_through, False)
3458         t.reconfigure(write_through=True)
3459         t.reconfigure()
3460         t.reconfigure(write_through=None)
3461         self.assertEqual(t.write_through, True)
3462 
3463     def test_read_nonbytes(self):
3464         # Issue #17106
3465         # Crash when underlying read() returns non-bytes
3466         t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
3467         self.assertRaises(TypeError, t.read, 1)
3468         t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
3469         self.assertRaises(TypeError, t.readline)
3470         t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
3471         self.assertRaises(TypeError, t.read)
3472 
3473     def test_illegal_encoder(self):
3474         # Issue 31271: Calling write() while the return value of encoder's
3475         # encode() is invalid shouldn't cause an assertion failure.
3476         rot13 = codecs.lookup("rot13")
3477         with support.swap_attr(rot13, '_is_text_encoding', True):
3478             t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3479         self.assertRaises(TypeError, t.write, 'bar')
3480 
3481     def test_illegal_decoder(self):
3482         # Issue #17106
3483         # Bypass the early encoding check added in issue 20404
3484         def _make_illegal_wrapper():
3485             quopri = codecs.lookup("quopri")
3486             quopri._is_text_encoding = True
3487             try:
3488                 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3489                                        newline='\n', encoding="quopri")
3490             finally:
3491                 quopri._is_text_encoding = False
3492             return t
3493         # Crash when decoder returns non-string
3494         t = _make_illegal_wrapper()
3495         self.assertRaises(TypeError, t.read, 1)
3496         t = _make_illegal_wrapper()
3497         self.assertRaises(TypeError, t.readline)
3498         t = _make_illegal_wrapper()
3499         self.assertRaises(TypeError, t.read)
3500 
3501         # Issue 31243: calling read() while the return value of decoder's
3502         # getstate() is invalid should neither crash the interpreter nor
3503         # raise a SystemError.
3504         def _make_very_illegal_wrapper(getstate_ret_val):
3505             class BadDecoder:
3506                 def getstate(self):
3507                     return getstate_ret_val
3508             def _get_bad_decoder(dummy):
3509                 return BadDecoder()
3510             quopri = codecs.lookup("quopri")
3511             with support.swap_attr(quopri, 'incrementaldecoder',
3512                                    _get_bad_decoder):
3513                 return _make_illegal_wrapper()
3514         t = _make_very_illegal_wrapper(42)
3515         self.assertRaises(TypeError, t.read, 42)
3516         t = _make_very_illegal_wrapper(())
3517         self.assertRaises(TypeError, t.read, 42)
3518         t = _make_very_illegal_wrapper((1, 2))
3519         self.assertRaises(TypeError, t.read, 42)
3520 
3521     def _check_create_at_shutdown(self, **kwargs):
3522         # Issue #20037: creating a TextIOWrapper at shutdown
3523         # shouldn't crash the interpreter.
3524         iomod = self.io.__name__
3525         code = """if 1:
3526             import codecs
3527             import {iomod} as io
3528 
3529             # Avoid looking up codecs at shutdown
3530             codecs.lookup('utf-8')
3531 
3532             class C:
3533                 def __init__(self):
3534                     self.buf = io.BytesIO()
3535                 def __del__(self):
3536                     io.TextIOWrapper(self.buf, **{kwargs})
3537                     print("ok")
3538             c = C()
3539             """.format(iomod=iomod, kwargs=kwargs)
3540         return assert_python_ok("-c", code)
3541 
3542     def test_create_at_shutdown_without_encoding(self):
3543         rc, out, err = self._check_create_at_shutdown()
3544         if err:
3545             # Can error out with a RuntimeError if the module state
3546             # isn't found.
3547             self.assertIn(self.shutdown_error, err.decode())
3548         else:
3549             self.assertEqual("ok", out.decode().strip())
3550 
3551     def test_create_at_shutdown_with_encoding(self):
3552         rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3553                                                       errors='strict')
3554         self.assertFalse(err)
3555         self.assertEqual("ok", out.decode().strip())
3556 
3557     def test_read_byteslike(self):
3558         r = MemviewBytesIO(b'Just some random string\n')
3559         t = self.TextIOWrapper(r, 'utf-8')
3560 
3561         # TextIOwrapper will not read the full string, because
3562         # we truncate it to a multiple of the native int size
3563         # so that we can construct a more complex memoryview.
3564         bytes_val =  _to_memoryview(r.getvalue()).tobytes()
3565 
3566         self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3567 
3568     def test_issue22849(self):
3569         class F(object):
3570             def readable(self): return True
3571             def writable(self): return True
3572             def seekable(self): return True
3573 
3574         for i in range(10):
3575             try:
3576                 self.TextIOWrapper(F(), encoding='utf-8')
3577             except Exception:
3578                 pass
3579 
3580         F.tell = lambda x: 0
3581         t = self.TextIOWrapper(F(), encoding='utf-8')
3582 
3583     def test_reconfigure_encoding_read(self):
3584         # latin1 -> utf8
3585         # (latin1 can decode utf-8 encoded string)
3586         data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3587         raw = self.BytesIO(data)
3588         txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3589         self.assertEqual(txt.readline(), 'abc\xe9\n')
3590         with self.assertRaises(self.UnsupportedOperation):
3591             txt.reconfigure(encoding='utf-8')
3592         with self.assertRaises(self.UnsupportedOperation):
3593             txt.reconfigure(newline=None)
3594 
3595     def test_reconfigure_write_fromascii(self):
3596         # ascii has a specific encodefunc in the C implementation,
3597         # but utf-8-sig has not. Make sure that we get rid of the
3598         # cached encodefunc when we switch encoders.
3599         raw = self.BytesIO()
3600         txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3601         txt.write('foo\n')
3602         txt.reconfigure(encoding='utf-8-sig')
3603         txt.write('\xe9\n')
3604         txt.flush()
3605         self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3606 
3607     def test_reconfigure_write(self):
3608         # latin -> utf8
3609         raw = self.BytesIO()
3610         txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3611         txt.write('abc\xe9\n')
3612         txt.reconfigure(encoding='utf-8')
3613         self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3614         txt.write('d\xe9f\n')
3615         txt.flush()
3616         self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3617 
3618         # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3619         # the file
3620         raw = self.BytesIO()
3621         txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3622         txt.write('abc\n')
3623         txt.reconfigure(encoding='utf-8-sig')
3624         txt.write('d\xe9f\n')
3625         txt.flush()
3626         self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3627 
3628     def test_reconfigure_write_non_seekable(self):
3629         raw = self.BytesIO()
3630         raw.seekable = lambda: False
3631         raw.seek = None
3632         txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3633         txt.write('abc\n')
3634         txt.reconfigure(encoding='utf-8-sig')
3635         txt.write('d\xe9f\n')
3636         txt.flush()
3637 
3638         # If the raw stream is not seekable, there'll be a BOM
3639         self.assertEqual(raw.getvalue(),  b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3640 
3641     def test_reconfigure_defaults(self):
3642         txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3643         txt.reconfigure(encoding=None)
3644         self.assertEqual(txt.encoding, 'ascii')
3645         self.assertEqual(txt.errors, 'replace')
3646         txt.write('LF\n')
3647 
3648         txt.reconfigure(newline='\r\n')
3649         self.assertEqual(txt.encoding, 'ascii')
3650         self.assertEqual(txt.errors, 'replace')
3651 
3652         txt.reconfigure(errors='ignore')
3653         self.assertEqual(txt.encoding, 'ascii')
3654         self.assertEqual(txt.errors, 'ignore')
3655         txt.write('CRLF\n')
3656 
3657         txt.reconfigure(encoding='utf-8', newline=None)
3658         self.assertEqual(txt.errors, 'strict')
3659         txt.seek(0)
3660         self.assertEqual(txt.read(), 'LF\nCRLF\n')
3661 
3662         self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3663 
3664     def test_reconfigure_newline(self):
3665         raw = self.BytesIO(b'CR\rEOF')
3666         txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3667         txt.reconfigure(newline=None)
3668         self.assertEqual(txt.readline(), 'CR\n')
3669         raw = self.BytesIO(b'CR\rEOF')
3670         txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3671         txt.reconfigure(newline='')
3672         self.assertEqual(txt.readline(), 'CR\r')
3673         raw = self.BytesIO(b'CR\rLF\nEOF')
3674         txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3675         txt.reconfigure(newline='\n')
3676         self.assertEqual(txt.readline(), 'CR\rLF\n')
3677         raw = self.BytesIO(b'LF\nCR\rEOF')
3678         txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3679         txt.reconfigure(newline='\r')
3680         self.assertEqual(txt.readline(), 'LF\nCR\r')
3681         raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3682         txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3683         txt.reconfigure(newline='\r\n')
3684         self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3685 
3686         txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3687         txt.reconfigure(newline=None)
3688         txt.write('linesep\n')
3689         txt.reconfigure(newline='')
3690         txt.write('LF\n')
3691         txt.reconfigure(newline='\n')
3692         txt.write('LF\n')
3693         txt.reconfigure(newline='\r')
3694         txt.write('CR\n')
3695         txt.reconfigure(newline='\r\n')
3696         txt.write('CRLF\n')
3697         expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3698         self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3699 
3700     def test_issue25862(self):
3701         # Assertion failures occurred in tell() after read() and write().
3702         t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3703         t.read(1)
3704         t.read()
3705         t.tell()
3706         t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3707         t.read(1)
3708         t.write('x')
3709         t.tell()
3710 
3711 
3712 class MemviewBytesIO(io.BytesIO):
3713     '''A BytesIO object whose read method returns memoryviews
3714        rather than bytes'''
3715 
3716     def read1(self, len_):
3717         return _to_memoryview(super().read1(len_))
3718 
3719     def read(self, len_):
3720         return _to_memoryview(super().read(len_))
3721 
3722 def _to_memoryview(buf):
3723     '''Convert bytes-object *buf* to a non-trivial memoryview'''
3724 
3725     arr = array.array('i')
3726     idx = len(buf) - len(buf) % arr.itemsize
3727     arr.frombytes(buf[:idx])
3728     return memoryview(arr)
3729 
3730 
3731 class CTextIOWrapperTest(TextIOWrapperTest):
3732     io = io
3733     shutdown_error = "LookupError: unknown encoding: ascii"
3734 
3735     def test_initialization(self):
3736         r = self.BytesIO(b"\xc3\xa9\n\n")
3737         b = self.BufferedReader(r, 1000)
3738         t = self.TextIOWrapper(b, encoding="utf-8")
3739         self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
3740         self.assertRaises(ValueError, t.read)
3741 
3742         t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3743         self.assertRaises(Exception, repr, t)
3744 
3745     def test_garbage_collection(self):
3746         # C TextIOWrapper objects are collected, and collecting them flushes
3747         # all data to disk.
3748         # The Python version has __del__, so it ends in gc.garbage instead.
3749         with warnings_helper.check_warnings(('', ResourceWarning)):
3750             rawio = io.FileIO(os_helper.TESTFN, "wb")
3751             b = self.BufferedWriter(rawio)
3752             t = self.TextIOWrapper(b, encoding="ascii")
3753             t.write("456def")
3754             t.x = t
3755             wr = weakref.ref(t)
3756             del t
3757             support.gc_collect()
3758         self.assertIsNone(wr(), wr)
3759         with self.open(os_helper.TESTFN, "rb") as f:
3760             self.assertEqual(f.read(), b"456def")
3761 
3762     def test_rwpair_cleared_before_textio(self):
3763         # Issue 13070: TextIOWrapper's finalization would crash when called
3764         # after the reference to the underlying BufferedRWPair's writer got
3765         # cleared by the GC.
3766         for i in range(1000):
3767             b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3768             t1 = self.TextIOWrapper(b1, encoding="ascii")
3769             b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3770             t2 = self.TextIOWrapper(b2, encoding="ascii")
3771             # circular references
3772             t1.buddy = t2
3773             t2.buddy = t1
3774         support.gc_collect()
3775 
3776     def test_del__CHUNK_SIZE_SystemError(self):
3777         t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3778         with self.assertRaises(AttributeError):
3779             del t._CHUNK_SIZE
3780 
3781     def test_internal_buffer_size(self):
3782         # bpo-43260: TextIOWrapper's internal buffer should not store
3783         # data larger than chunk size.
3784         chunk_size = 8192  # default chunk size, updated later
3785 
3786         class MockIO(self.MockRawIO):
3787             def write(self, data):
3788                 if len(data) > chunk_size:
3789                     raise RuntimeError
3790                 return super().write(data)
3791 
3792         buf = MockIO()
3793         t = self.TextIOWrapper(buf, encoding="ascii")
3794         chunk_size = t._CHUNK_SIZE
3795         t.write("abc")
3796         t.write("def")
3797         # default chunk size is 8192 bytes so t don't write data to buf.
3798         self.assertEqual([], buf._write_stack)
3799 
3800         with self.assertRaises(RuntimeError):
3801             t.write("x"*(chunk_size+1))
3802 
3803         self.assertEqual([b"abcdef"], buf._write_stack)
3804         t.write("ghi")
3805         t.write("x"*chunk_size)
3806         self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack)
3807 
3808 
3809 class PyTextIOWrapperTest(TextIOWrapperTest):
3810     io = pyio
3811     shutdown_error = "LookupError: unknown encoding: ascii"
3812 
3813 
3814 class IncrementalNewlineDecoderTest(unittest.TestCase):
3815 
3816     def check_newline_decoding_utf8(self, decoder):
3817         # UTF-8 specific tests for a newline decoder
3818         def _check_decode(b, s, **kwargs):
3819             # We exercise getstate() / setstate() as well as decode()
3820             state = decoder.getstate()
3821             self.assertEqual(decoder.decode(b, **kwargs), s)
3822             decoder.setstate(state)
3823             self.assertEqual(decoder.decode(b, **kwargs), s)
3824 
3825         _check_decode(b'\xe8\xa2\x88', "\u8888")
3826 
3827         _check_decode(b'\xe8', "")
3828         _check_decode(b'\xa2', "")
3829         _check_decode(b'\x88', "\u8888")
3830 
3831         _check_decode(b'\xe8', "")
3832         _check_decode(b'\xa2', "")
3833         _check_decode(b'\x88', "\u8888")
3834 
3835         _check_decode(b'\xe8', "")
3836         self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3837 
3838         decoder.reset()
3839         _check_decode(b'\n', "\n")
3840         _check_decode(b'\r', "")
3841         _check_decode(b'', "\n", final=True)
3842         _check_decode(b'\r', "\n", final=True)
3843 
3844         _check_decode(b'\r', "")
3845         _check_decode(b'a', "\na")
3846 
3847         _check_decode(b'\r\r\n', "\n\n")
3848         _check_decode(b'\r', "")
3849         _check_decode(b'\r', "\n")
3850         _check_decode(b'\na', "\na")
3851 
3852         _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3853         _check_decode(b'\xe8\xa2\x88', "\u8888")
3854         _check_decode(b'\n', "\n")
3855         _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3856         _check_decode(b'\n', "\n")
3857 
3858     def check_newline_decoding(self, decoder, encoding):
3859         result = []
3860         if encoding is not None:
3861             encoder = codecs.getincrementalencoder(encoding)()
3862             def _decode_bytewise(s):
3863                 # Decode one byte at a time
3864                 for b in encoder.encode(s):
3865                     result.append(decoder.decode(bytes([b])))
3866         else:
3867             encoder = None
3868             def _decode_bytewise(s):
3869                 # Decode one char at a time
3870                 for c in s:
3871                     result.append(decoder.decode(c))
3872         self.assertEqual(decoder.newlines, None)
3873         _decode_bytewise("abc\n\r")
3874         self.assertEqual(decoder.newlines, '\n')
3875         _decode_bytewise("\nabc")
3876         self.assertEqual(decoder.newlines, ('\n', '\r\n'))
3877         _decode_bytewise("abc\r")
3878         self.assertEqual(decoder.newlines, ('\n', '\r\n'))
3879         _decode_bytewise("abc")
3880         self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
3881         _decode_bytewise("abc\r")
3882         self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
3883         decoder.reset()
3884         input = "abc"
3885         if encoder is not None:
3886             encoder.reset()
3887             input = encoder.encode(input)
3888         self.assertEqual(decoder.decode(input), "abc")
3889         self.assertEqual(decoder.newlines, None)
3890 
3891     def test_newline_decoder(self):
3892         encodings = (
3893             # None meaning the IncrementalNewlineDecoder takes unicode input
3894             # rather than bytes input
3895             None, 'utf-8', 'latin-1',
3896             'utf-16', 'utf-16-le', 'utf-16-be',
3897             'utf-32', 'utf-32-le', 'utf-32-be',
3898         )
3899         for enc in encodings:
3900             decoder = enc and codecs.getincrementaldecoder(enc)()
3901             decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3902             self.check_newline_decoding(decoder, enc)
3903         decoder = codecs.getincrementaldecoder("utf-8")()
3904         decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3905         self.check_newline_decoding_utf8(decoder)
3906         self.assertRaises(TypeError, decoder.setstate, 42)
3907 
3908     def test_newline_bytes(self):
3909         # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3910         def _check(dec):
3911             self.assertEqual(dec.newlines, None)
3912             self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3913             self.assertEqual(dec.newlines, None)
3914             self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3915             self.assertEqual(dec.newlines, None)
3916         dec = self.IncrementalNewlineDecoder(None, translate=False)
3917         _check(dec)
3918         dec = self.IncrementalNewlineDecoder(None, translate=True)
3919         _check(dec)
3920 
3921     def test_translate(self):
3922         # issue 35062
3923         for translate in (-2, -1, 1, 2):
3924             decoder = codecs.getincrementaldecoder("utf-8")()
3925             decoder = self.IncrementalNewlineDecoder(decoder, translate)
3926             self.check_newline_decoding_utf8(decoder)
3927         decoder = codecs.getincrementaldecoder("utf-8")()
3928         decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3929         self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3930 
3931 class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3932     pass
3933 
3934 class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3935     pass
3936 
3937 
3938 # XXX Tests for open()
3939 
3940 class MiscIOTest(unittest.TestCase):
3941 
3942     def tearDown(self):
3943         os_helper.unlink(os_helper.TESTFN)
3944 
3945     def test___all__(self):
3946         for name in self.io.__all__:
3947             obj = getattr(self.io, name, None)
3948             self.assertIsNotNone(obj, name)
3949             if name in ("open", "open_code"):
3950                 continue
3951             elif "error" in name.lower() or name == "UnsupportedOperation":
3952                 self.assertTrue(issubclass(obj, Exception), name)
3953             elif not name.startswith("SEEK_"):
3954                 self.assertTrue(issubclass(obj, self.IOBase))
3955 
3956     def test_attributes(self):
3957         f = self.open(os_helper.TESTFN, "wb", buffering=0)
3958         self.assertEqual(f.mode, "wb")
3959         f.close()
3960 
3961         with warnings_helper.check_warnings(('', DeprecationWarning)):
3962             f = self.open(os_helper.TESTFN, "U", encoding="utf-8")
3963         self.assertEqual(f.name,            os_helper.TESTFN)
3964         self.assertEqual(f.buffer.name,     os_helper.TESTFN)
3965         self.assertEqual(f.buffer.raw.name, os_helper.TESTFN)
3966         self.assertEqual(f.mode,            "U")
3967         self.assertEqual(f.buffer.mode,     "rb")
3968         self.assertEqual(f.buffer.raw.mode, "rb")
3969         f.close()
3970 
3971         f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
3972         self.assertEqual(f.mode,            "w+")
3973         self.assertEqual(f.buffer.mode,     "rb+") # Does it really matter?
3974         self.assertEqual(f.buffer.raw.mode, "rb+")
3975 
3976         g = self.open(f.fileno(), "wb", closefd=False)
3977         self.assertEqual(g.mode,     "wb")
3978         self.assertEqual(g.raw.mode, "wb")
3979         self.assertEqual(g.name,     f.fileno())
3980         self.assertEqual(g.raw.name, f.fileno())
3981         f.close()
3982         g.close()
3983 
3984     def test_open_pipe_with_append(self):
3985         # bpo-27805: Ignore ESPIPE from lseek() in open().
3986         r, w = os.pipe()
3987         self.addCleanup(os.close, r)
3988         f = self.open(w, 'a', encoding="utf-8")
3989         self.addCleanup(f.close)
3990         # Check that the file is marked non-seekable. On Windows, however, lseek
3991         # somehow succeeds on pipes.
3992         if sys.platform != 'win32':
3993             self.assertFalse(f.seekable())
3994 
3995     def test_io_after_close(self):
3996         for kwargs in [
3997                 {"mode": "w"},
3998                 {"mode": "wb"},
3999                 {"mode": "w", "buffering": 1},
4000                 {"mode": "w", "buffering": 2},
4001                 {"mode": "wb", "buffering": 0},
4002                 {"mode": "r"},
4003                 {"mode": "rb"},
4004                 {"mode": "r", "buffering": 1},
4005                 {"mode": "r", "buffering": 2},
4006                 {"mode": "rb", "buffering": 0},
4007                 {"mode": "w+"},
4008                 {"mode": "w+b"},
4009                 {"mode": "w+", "buffering": 1},
4010                 {"mode": "w+", "buffering": 2},
4011                 {"mode": "w+b", "buffering": 0},
4012             ]:
4013             if "b" not in kwargs["mode"]:
4014                 kwargs["encoding"] = "utf-8"
4015             f = self.open(os_helper.TESTFN, **kwargs)
4016             f.close()
4017             self.assertRaises(ValueError, f.flush)
4018             self.assertRaises(ValueError, f.fileno)
4019             self.assertRaises(ValueError, f.isatty)
4020             self.assertRaises(ValueError, f.__iter__)
4021             if hasattr(f, "peek"):
4022                 self.assertRaises(ValueError, f.peek, 1)
4023             self.assertRaises(ValueError, f.read)
4024             if hasattr(f, "read1"):
4025                 self.assertRaises(ValueError, f.read1, 1024)
4026                 self.assertRaises(ValueError, f.read1)
4027             if hasattr(f, "readall"):
4028                 self.assertRaises(ValueError, f.readall)
4029             if hasattr(f, "readinto"):
4030                 self.assertRaises(ValueError, f.readinto, bytearray(1024))
4031             if hasattr(f, "readinto1"):
4032                 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
4033             self.assertRaises(ValueError, f.readline)
4034             self.assertRaises(ValueError, f.readlines)
4035             self.assertRaises(ValueError, f.readlines, 1)
4036             self.assertRaises(ValueError, f.seek, 0)
4037             self.assertRaises(ValueError, f.tell)
4038             self.assertRaises(ValueError, f.truncate)
4039             self.assertRaises(ValueError, f.write,
4040                               b"" if "b" in kwargs['mode'] else "")
4041             self.assertRaises(ValueError, f.writelines, [])
4042             self.assertRaises(ValueError, next, f)
4043 
4044     def test_blockingioerror(self):
4045         # Various BlockingIOError issues
4046         class C(str):
4047             pass
4048         c = C("")
4049         b = self.BlockingIOError(1, c)
4050         c.b = b
4051         b.c = c
4052         wr = weakref.ref(c)
4053         del c, b
4054         support.gc_collect()
4055         self.assertIsNone(wr(), wr)
4056 
4057     def test_abcs(self):
4058         # Test the visible base classes are ABCs.
4059         self.assertIsInstance(self.IOBase, abc.ABCMeta)
4060         self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
4061         self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
4062         self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
4063 
4064     def _check_abc_inheritance(self, abcmodule):
4065         with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
4066             self.assertIsInstance(f, abcmodule.IOBase)
4067             self.assertIsInstance(f, abcmodule.RawIOBase)
4068             self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4069             self.assertNotIsInstance(f, abcmodule.TextIOBase)
4070         with self.open(os_helper.TESTFN, "wb") as f:
4071             self.assertIsInstance(f, abcmodule.IOBase)
4072             self.assertNotIsInstance(f, abcmodule.RawIOBase)
4073             self.assertIsInstance(f, abcmodule.BufferedIOBase)
4074             self.assertNotIsInstance(f, abcmodule.TextIOBase)
4075         with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
4076             self.assertIsInstance(f, abcmodule.IOBase)
4077             self.assertNotIsInstance(f, abcmodule.RawIOBase)
4078             self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4079             self.assertIsInstance(f, abcmodule.TextIOBase)
4080 
4081     def test_abc_inheritance(self):
4082         # Test implementations inherit from their respective ABCs
4083         self._check_abc_inheritance(self)
4084 
4085     def test_abc_inheritance_official(self):
4086         # Test implementations inherit from the official ABCs of the
4087         # baseline "io" module.
4088         self._check_abc_inheritance(io)
4089 
4090     def _check_warn_on_dealloc(self, *args, **kwargs):
4091         f = open(*args, **kwargs)
4092         r = repr(f)
4093         with self.assertWarns(ResourceWarning) as cm:
4094             f = None
4095             support.gc_collect()
4096         self.assertIn(r, str(cm.warning.args[0]))
4097 
4098     def test_warn_on_dealloc(self):
4099         self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0)
4100         self._check_warn_on_dealloc(os_helper.TESTFN, "wb")
4101         self._check_warn_on_dealloc(os_helper.TESTFN, "w", encoding="utf-8")
4102 
4103     def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4104         fds = []
4105         def cleanup_fds():
4106             for fd in fds:
4107                 try:
4108                     os.close(fd)
4109                 except OSError as e:
4110                     if e.errno != errno.EBADF:
4111                         raise
4112         self.addCleanup(cleanup_fds)
4113         r, w = os.pipe()
4114         fds += r, w
4115         self._check_warn_on_dealloc(r, *args, **kwargs)
4116         # When using closefd=False, there's no warning
4117         r, w = os.pipe()
4118         fds += r, w
4119         with warnings_helper.check_no_resource_warning(self):
4120             open(r, *args, closefd=False, **kwargs)
4121 
4122     def test_warn_on_dealloc_fd(self):
4123         self._check_warn_on_dealloc_fd("rb", buffering=0)
4124         self._check_warn_on_dealloc_fd("rb")
4125         self._check_warn_on_dealloc_fd("r", encoding="utf-8")
4126 
4127 
4128     def test_pickling(self):
4129         # Pickling file objects is forbidden
4130         for kwargs in [
4131                 {"mode": "w"},
4132                 {"mode": "wb"},
4133                 {"mode": "wb", "buffering": 0},
4134                 {"mode": "r"},
4135                 {"mode": "rb"},
4136                 {"mode": "rb", "buffering": 0},
4137                 {"mode": "w+"},
4138                 {"mode": "w+b"},
4139                 {"mode": "w+b", "buffering": 0},
4140             ]:
4141             if "b" not in kwargs["mode"]:
4142                 kwargs["encoding"] = "utf-8"
4143             for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
4144                 with self.open(os_helper.TESTFN, **kwargs) as f:
4145                     self.assertRaises(TypeError, pickle.dumps, f, protocol)
4146 
4147     def test_nonblock_pipe_write_bigbuf(self):
4148         self._test_nonblock_pipe_write(16*1024)
4149 
4150     def test_nonblock_pipe_write_smallbuf(self):
4151         self._test_nonblock_pipe_write(1024)
4152 
4153     @unittest.skipUnless(hasattr(os, 'set_blocking'),
4154                          'os.set_blocking() required for this test')
4155     def _test_nonblock_pipe_write(self, bufsize):
4156         sent = []
4157         received = []
4158         r, w = os.pipe()
4159         os.set_blocking(r, False)
4160         os.set_blocking(w, False)
4161 
4162         # To exercise all code paths in the C implementation we need
4163         # to play with buffer sizes.  For instance, if we choose a
4164         # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4165         # then we will never get a partial write of the buffer.
4166         rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4167         wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4168 
4169         with rf, wf:
4170             for N in 9999, 73, 7574:
4171                 try:
4172                     i = 0
4173                     while True:
4174                         msg = bytes([i % 26 + 97]) * N
4175                         sent.append(msg)
4176                         wf.write(msg)
4177                         i += 1
4178 
4179                 except self.BlockingIOError as e:
4180                     self.assertEqual(e.args[0], errno.EAGAIN)
4181                     self.assertEqual(e.args[2], e.characters_written)
4182                     sent[-1] = sent[-1][:e.characters_written]
4183                     received.append(rf.read())
4184                     msg = b'BLOCKED'
4185                     wf.write(msg)
4186                     sent.append(msg)
4187 
4188             while True:
4189                 try:
4190                     wf.flush()
4191                     break
4192                 except self.BlockingIOError as e:
4193                     self.assertEqual(e.args[0], errno.EAGAIN)
4194                     self.assertEqual(e.args[2], e.characters_written)
4195                     self.assertEqual(e.characters_written, 0)
4196                     received.append(rf.read())
4197 
4198             received += iter(rf.read, None)
4199 
4200         sent, received = b''.join(sent), b''.join(received)
4201         self.assertEqual(sent, received)
4202         self.assertTrue(wf.closed)
4203         self.assertTrue(rf.closed)
4204 
4205     def test_create_fail(self):
4206         # 'x' mode fails if file is existing
4207         with self.open(os_helper.TESTFN, 'w', encoding="utf-8"):
4208             pass
4209         self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x', encoding="utf-8")
4210 
4211     def test_create_writes(self):
4212         # 'x' mode opens for writing
4213         with self.open(os_helper.TESTFN, 'xb') as f:
4214             f.write(b"spam")
4215         with self.open(os_helper.TESTFN, 'rb') as f:
4216             self.assertEqual(b"spam", f.read())
4217 
4218     def test_open_allargs(self):
4219         # there used to be a buffer overflow in the parser for rawmode
4220         self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+', encoding="utf-8")
4221 
4222     def test_check_encoding_errors(self):
4223         # bpo-37388: open() and TextIOWrapper must check encoding and errors
4224         # arguments in dev mode
4225         mod = self.io.__name__
4226         filename = __file__
4227         invalid = 'Boom, Shaka Laka, Boom!'
4228         code = textwrap.dedent(f'''
4229             import sys
4230             from {mod} import open, TextIOWrapper
4231 
4232             try:
4233                 open({filename!r}, encoding={invalid!r})
4234             except LookupError:
4235                 pass
4236             else:
4237                 sys.exit(21)
4238 
4239             try:
4240                 open({filename!r}, errors={invalid!r})
4241             except LookupError:
4242                 pass
4243             else:
4244                 sys.exit(22)
4245 
4246             fp = open({filename!r}, "rb")
4247             with fp:
4248                 try:
4249                     TextIOWrapper(fp, encoding={invalid!r})
4250                 except LookupError:
4251                     pass
4252                 else:
4253                     sys.exit(23)
4254 
4255                 try:
4256                     TextIOWrapper(fp, errors={invalid!r})
4257                 except LookupError:
4258                     pass
4259                 else:
4260                     sys.exit(24)
4261 
4262             sys.exit(10)
4263         ''')
4264         proc = assert_python_failure('-X', 'dev', '-c', code)
4265         self.assertEqual(proc.rc, 10, proc)
4266 
4267     def test_check_encoding_warning(self):
4268         # PEP 597: Raise warning when encoding is not specified
4269         # and sys.flags.warn_default_encoding is set.
4270         mod = self.io.__name__
4271         filename = __file__
4272         code = textwrap.dedent(f'''\
4273             import sys
4274             from {mod} import open, TextIOWrapper
4275             import pathlib
4276 
4277             with open({filename!r}) as f:           # line 5
4278                 pass
4279 
4280             pathlib.Path({filename!r}).read_text()  # line 8
4281         ''')
4282         proc = assert_python_ok('-X', 'warn_default_encoding', '-c', code)
4283         warnings = proc.err.splitlines()
4284         self.assertEqual(len(warnings), 2)
4285         self.assertTrue(
4286             warnings[0].startswith(b"<string>:5: EncodingWarning: "))
4287         self.assertTrue(
4288             warnings[1].startswith(b"<string>:8: EncodingWarning: "))
4289 
4290     @support.cpython_only
4291     # Depending if OpenWrapper was already created or not, the warning is
4292     # emitted or not. For example, the attribute is already created when this
4293     # test is run multiple times.
4294     @warnings_helper.ignore_warnings(category=DeprecationWarning)
4295     def test_openwrapper(self):
4296         self.assertIs(self.io.OpenWrapper, self.io.open)
4297 
4298 
4299 class CMiscIOTest(MiscIOTest):
4300     io = io
4301 
4302     def test_readinto_buffer_overflow(self):
4303         # Issue #18025
4304         class BadReader(self.io.BufferedIOBase):
4305             def read(self, n=-1):
4306                 return b'x' * 10**6
4307         bufio = BadReader()
4308         b = bytearray(2)
4309         self.assertRaises(ValueError, bufio.readinto, b)
4310 
4311     def check_daemon_threads_shutdown_deadlock(self, stream_name):
4312         # Issue #23309: deadlocks at shutdown should be avoided when a
4313         # daemon thread and the main thread both write to a file.
4314         code = """if 1:
4315             import sys
4316             import time
4317             import threading
4318             from test.support import SuppressCrashReport
4319 
4320             file = sys.{stream_name}
4321 
4322             def run():
4323                 while True:
4324                     file.write('.')
4325                     file.flush()
4326 
4327             crash = SuppressCrashReport()
4328             crash.__enter__()
4329             # don't call __exit__(): the crash occurs at Python shutdown
4330 
4331             thread = threading.Thread(target=run)
4332             thread.daemon = True
4333             thread.start()
4334 
4335             time.sleep(0.5)
4336             file.write('!')
4337             file.flush()
4338             """.format_map(locals())
4339         res, _ = run_python_until_end("-c", code)
4340         err = res.err.decode()
4341         if res.rc != 0:
4342             # Failure: should be a fatal error
4343             pattern = (r"Fatal Python error: _enter_buffered_busy: "
4344                        r"could not acquire lock "
4345                        r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4346                        r"at interpreter shutdown, possibly due to "
4347                        r"daemon threads".format_map(locals()))
4348             self.assertRegex(err, pattern)
4349         else:
4350             self.assertFalse(err.strip('.!'))
4351 
4352     def test_daemon_threads_shutdown_stdout_deadlock(self):
4353         self.check_daemon_threads_shutdown_deadlock('stdout')
4354 
4355     def test_daemon_threads_shutdown_stderr_deadlock(self):
4356         self.check_daemon_threads_shutdown_deadlock('stderr')
4357 
4358 
4359 class PyMiscIOTest(MiscIOTest):
4360     io = pyio
4361 
4362 
4363 @unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4364 class SignalsTest(unittest.TestCase):
4365 
4366     def setUp(self):
4367         self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4368 
4369     def tearDown(self):
4370         signal.signal(signal.SIGALRM, self.oldalrm)
4371 
4372     def alarm_interrupt(self, sig, frame):
4373         1/0
4374 
4375     def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4376         """Check that a partial write, when it gets interrupted, properly
4377         invokes the signal handler, and bubbles up the exception raised
4378         in the latter."""
4379 
4380         # XXX This test has three flaws that appear when objects are
4381         # XXX not reference counted.
4382 
4383         # - if wio.write() happens to trigger a garbage collection,
4384         #   the signal exception may be raised when some __del__
4385         #   method is running; it will not reach the assertRaises()
4386         #   call.
4387 
4388         # - more subtle, if the wio object is not destroyed at once
4389         #   and survives this function, the next opened file is likely
4390         #   to have the same fileno (since the file descriptor was
4391         #   actively closed).  When wio.__del__ is finally called, it
4392         #   will close the other's test file...  To trigger this with
4393         #   CPython, try adding "global wio" in this function.
4394 
4395         # - This happens only for streams created by the _pyio module,
4396         #   because a wio.close() that fails still consider that the
4397         #   file needs to be closed again.  You can try adding an
4398         #   "assert wio.closed" at the end of the function.
4399 
4400         # Fortunately, a little gc.collect() seems to be enough to
4401         # work around all these issues.
4402         support.gc_collect()  # For PyPy or other GCs.
4403 
4404         read_results = []
4405         def _read():
4406             s = os.read(r, 1)
4407             read_results.append(s)
4408 
4409         t = threading.Thread(target=_read)
4410         t.daemon = True
4411         r, w = os.pipe()
4412         fdopen_kwargs["closefd"] = False
4413         large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
4414         try:
4415             wio = self.io.open(w, **fdopen_kwargs)
4416             if hasattr(signal, 'pthread_sigmask'):
4417                 # create the thread with SIGALRM signal blocked
4418                 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4419                 t.start()
4420                 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4421             else:
4422                 t.start()
4423 
4424             # Fill the pipe enough that the write will be blocking.
4425             # It will be interrupted by the timer armed above.  Since the
4426             # other thread has read one byte, the low-level write will
4427             # return with a successful (partial) result rather than an EINTR.
4428             # The buffered IO layer must check for pending signal
4429             # handlers, which in this case will invoke alarm_interrupt().
4430             signal.alarm(1)
4431             try:
4432                 self.assertRaises(ZeroDivisionError, wio.write, large_data)
4433             finally:
4434                 signal.alarm(0)
4435                 t.join()
4436             # We got one byte, get another one and check that it isn't a
4437             # repeat of the first one.
4438             read_results.append(os.read(r, 1))
4439             self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4440         finally:
4441             os.close(w)
4442             os.close(r)
4443             # This is deliberate. If we didn't close the file descriptor
4444             # before closing wio, wio would try to flush its internal
4445             # buffer, and block again.
4446             try:
4447                 wio.close()
4448             except OSError as e:
4449                 if e.errno != errno.EBADF:
4450                     raise
4451 
4452     def test_interrupted_write_unbuffered(self):
4453         self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4454 
4455     def test_interrupted_write_buffered(self):
4456         self.check_interrupted_write(b"xy", b"xy", mode="wb")
4457 
4458     def test_interrupted_write_text(self):
4459         self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4460 
4461     @support.no_tracing
4462     def check_reentrant_write(self, data, **fdopen_kwargs):
4463         def on_alarm(*args):
4464             # Will be called reentrantly from the same thread
4465             wio.write(data)
4466             1/0
4467         signal.signal(signal.SIGALRM, on_alarm)
4468         r, w = os.pipe()
4469         wio = self.io.open(w, **fdopen_kwargs)
4470         try:
4471             signal.alarm(1)
4472             # Either the reentrant call to wio.write() fails with RuntimeError,
4473             # or the signal handler raises ZeroDivisionError.
4474             with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4475                 while 1:
4476                     for i in range(100):
4477                         wio.write(data)
4478                         wio.flush()
4479                     # Make sure the buffer doesn't fill up and block further writes
4480                     os.read(r, len(data) * 100)
4481             exc = cm.exception
4482             if isinstance(exc, RuntimeError):
4483                 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4484         finally:
4485             signal.alarm(0)
4486             wio.close()
4487             os.close(r)
4488 
4489     def test_reentrant_write_buffered(self):
4490         self.check_reentrant_write(b"xy", mode="wb")
4491 
4492     def test_reentrant_write_text(self):
4493         self.check_reentrant_write("xy", mode="w", encoding="ascii")
4494 
4495     def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4496         """Check that a buffered read, when it gets interrupted (either
4497         returning a partial result or EINTR), properly invokes the signal
4498         handler and retries if the latter returned successfully."""
4499         r, w = os.pipe()
4500         fdopen_kwargs["closefd"] = False
4501         def alarm_handler(sig, frame):
4502             os.write(w, b"bar")
4503         signal.signal(signal.SIGALRM, alarm_handler)
4504         try:
4505             rio = self.io.open(r, **fdopen_kwargs)
4506             os.write(w, b"foo")
4507             signal.alarm(1)
4508             # Expected behaviour:
4509             # - first raw read() returns partial b"foo"
4510             # - second raw read() returns EINTR
4511             # - third raw read() returns b"bar"
4512             self.assertEqual(decode(rio.read(6)), "foobar")
4513         finally:
4514             signal.alarm(0)
4515             rio.close()
4516             os.close(w)
4517             os.close(r)
4518 
4519     def test_interrupted_read_retry_buffered(self):
4520         self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4521                                           mode="rb")
4522 
4523     def test_interrupted_read_retry_text(self):
4524         self.check_interrupted_read_retry(lambda x: x,
4525                                           mode="r", encoding="latin1")
4526 
4527     def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4528         """Check that a buffered write, when it gets interrupted (either
4529         returning a partial result or EINTR), properly invokes the signal
4530         handler and retries if the latter returned successfully."""
4531         select = import_helper.import_module("select")
4532 
4533         # A quantity that exceeds the buffer size of an anonymous pipe's
4534         # write end.
4535         N = support.PIPE_MAX_SIZE
4536         r, w = os.pipe()
4537         fdopen_kwargs["closefd"] = False
4538 
4539         # We need a separate thread to read from the pipe and allow the
4540         # write() to finish.  This thread is started after the SIGALRM is
4541         # received (forcing a first EINTR in write()).
4542         read_results = []
4543         write_finished = False
4544         error = None
4545         def _read():
4546             try:
4547                 while not write_finished:
4548                     while r in select.select([r], [], [], 1.0)[0]:
4549                         s = os.read(r, 1024)
4550                         read_results.append(s)
4551             except BaseException as exc:
4552                 nonlocal error
4553                 error = exc
4554         t = threading.Thread(target=_read)
4555         t.daemon = True
4556         def alarm1(sig, frame):
4557             signal.signal(signal.SIGALRM, alarm2)
4558             signal.alarm(1)
4559         def alarm2(sig, frame):
4560             t.start()
4561 
4562         large_data = item * N
4563         signal.signal(signal.SIGALRM, alarm1)
4564         try:
4565             wio = self.io.open(w, **fdopen_kwargs)
4566             signal.alarm(1)
4567             # Expected behaviour:
4568             # - first raw write() is partial (because of the limited pipe buffer
4569             #   and the first alarm)
4570             # - second raw write() returns EINTR (because of the second alarm)
4571             # - subsequent write()s are successful (either partial or complete)
4572             written = wio.write(large_data)
4573             self.assertEqual(N, written)
4574 
4575             wio.flush()
4576             write_finished = True
4577             t.join()
4578 
4579             self.assertIsNone(error)
4580             self.assertEqual(N, sum(len(x) for x in read_results))
4581         finally:
4582             signal.alarm(0)
4583             write_finished = True
4584             os.close(w)
4585             os.close(r)
4586             # This is deliberate. If we didn't close the file descriptor
4587             # before closing wio, wio would try to flush its internal
4588             # buffer, and could block (in case of failure).
4589             try:
4590                 wio.close()
4591             except OSError as e:
4592                 if e.errno != errno.EBADF:
4593                     raise
4594 
4595     def test_interrupted_write_retry_buffered(self):
4596         self.check_interrupted_write_retry(b"x", mode="wb")
4597 
4598     def test_interrupted_write_retry_text(self):
4599         self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4600 
4601 
4602 class CSignalsTest(SignalsTest):
4603     io = io
4604 
4605 class PySignalsTest(SignalsTest):
4606     io = pyio
4607 
4608     # Handling reentrancy issues would slow down _pyio even more, so the
4609     # tests are disabled.
4610     test_reentrant_write_buffered = None
4611     test_reentrant_write_text = None
4612 
4613 
4614 def load_tests(*args):
4615     tests = (CIOTest, PyIOTest, APIMismatchTest,
4616              CBufferedReaderTest, PyBufferedReaderTest,
4617              CBufferedWriterTest, PyBufferedWriterTest,
4618              CBufferedRWPairTest, PyBufferedRWPairTest,
4619              CBufferedRandomTest, PyBufferedRandomTest,
4620              StatefulIncrementalDecoderTest,
4621              CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4622              CTextIOWrapperTest, PyTextIOWrapperTest,
4623              CMiscIOTest, PyMiscIOTest,
4624              CSignalsTest, PySignalsTest,
4625              )
4626 
4627     # Put the namespaces of the IO module we are testing and some useful mock
4628     # classes in the __dict__ of each test.
4629     mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
4630              MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4631              SlowFlushRawIO)
4632     all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4633     c_io_ns = {name : getattr(io, name) for name in all_members}
4634     py_io_ns = {name : getattr(pyio, name) for name in all_members}
4635     globs = globals()
4636     c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4637     py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4638     for test in tests:
4639         if test.__name__.startswith("C"):
4640             for name, obj in c_io_ns.items():
4641                 setattr(test, name, obj)
4642         elif test.__name__.startswith("Py"):
4643             for name, obj in py_io_ns.items():
4644                 setattr(test, name, obj)
4645 
4646     suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4647     return suite
4648 
4649 if __name__ == "__main__":
4650     unittest.main()
4651