1"""Unit tests for the io module.""" 2 3# Tests of io are scattered over the test suite: 4# * test_bufio - tests file buffering 5# * test_memoryio - tests BytesIO and StringIO 6# * test_fileio - tests FileIO 7# * test_file - tests the file interface 8# * test_io - tests everything else in the io module 9# * test_univnewlines - tests universal newline support 10# * test_largefile - tests operations on a file greater than 2**32 bytes 11# (only enabled with -ulargefile) 12 13################################################################################ 14# ATTENTION TEST WRITERS!!! 15################################################################################ 16# When writing tests for io, it's important to test both the C and Python 17# implementations. This is usually done by writing a base test that refers to 18# the type it is testing as an attribute. Then it provides custom subclasses to 19# test both implementations. This file has lots of examples. 20################################################################################ 21 22import abc 23import array 24import errno 25import locale 26import os 27import pickle 28import random 29import signal 30import sys 31import sysconfig 32import textwrap 33import threading 34import time 35import unittest 36import warnings 37import weakref 38from collections import deque, UserList 39from itertools import cycle, count 40from test import support 41from test.support.script_helper import ( 42 assert_python_ok, assert_python_failure, run_python_until_end) 43from test.support import import_helper 44from test.support import os_helper 45from test.support import threading_helper 46from test.support import warnings_helper 47from test.support.os_helper import FakePath 48 49import codecs 50import io # C implementation of io 51import _pyio as pyio # Python implementation of io 52 53try: 54 import ctypes 55except ImportError: 56 def byteslike(*pos, **kw): 57 return array.array("b", bytes(*pos, **kw)) 58else: 59 def byteslike(*pos, **kw): 60 """Create a bytes-like object having no string or sequence methods""" 61 data = bytes(*pos, **kw) 62 obj = EmptyStruct() 63 ctypes.resize(obj, len(data)) 64 memoryview(obj).cast("B")[:] = data 65 return obj 66 class EmptyStruct(ctypes.Structure): 67 pass 68 69_cflags = sysconfig.get_config_var('CFLAGS') or '' 70_config_args = sysconfig.get_config_var('CONFIG_ARGS') or '' 71MEMORY_SANITIZER = ( 72 '-fsanitize=memory' in _cflags or 73 '--with-memory-sanitizer' in _config_args 74) 75 76ADDRESS_SANITIZER = ( 77 '-fsanitize=address' in _cflags 78) 79 80# Does io.IOBase finalizer log the exception if the close() method fails? 81# The exception is ignored silently by default in release build. 82IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode) 83 84 85def _default_chunk_size(): 86 """Get the default TextIOWrapper chunk size""" 87 with open(__file__, "r", encoding="latin-1") as f: 88 return f._CHUNK_SIZE 89 90 91class MockRawIOWithoutRead: 92 """A RawIO implementation without read(), so as to exercise the default 93 RawIO.read() which calls readinto().""" 94 95 def __init__(self, read_stack=()): 96 self._read_stack = list(read_stack) 97 self._write_stack = [] 98 self._reads = 0 99 self._extraneous_reads = 0 100 101 def write(self, b): 102 self._write_stack.append(bytes(b)) 103 return len(b) 104 105 def writable(self): 106 return True 107 108 def fileno(self): 109 return 42 110 111 def readable(self): 112 return True 113 114 def seekable(self): 115 return True 116 117 def seek(self, pos, whence): 118 return 0 # wrong but we gotta return something 119 120 def tell(self): 121 return 0 # same comment as above 122 123 def readinto(self, buf): 124 self._reads += 1 125 max_len = len(buf) 126 try: 127 data = self._read_stack[0] 128 except IndexError: 129 self._extraneous_reads += 1 130 return 0 131 if data is None: 132 del self._read_stack[0] 133 return None 134 n = len(data) 135 if len(data) <= max_len: 136 del self._read_stack[0] 137 buf[:n] = data 138 return n 139 else: 140 buf[:] = data[:max_len] 141 self._read_stack[0] = data[max_len:] 142 return max_len 143 144 def truncate(self, pos=None): 145 return pos 146 147class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase): 148 pass 149 150class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase): 151 pass 152 153 154class MockRawIO(MockRawIOWithoutRead): 155 156 def read(self, n=None): 157 self._reads += 1 158 try: 159 return self._read_stack.pop(0) 160 except: 161 self._extraneous_reads += 1 162 return b"" 163 164class CMockRawIO(MockRawIO, io.RawIOBase): 165 pass 166 167class PyMockRawIO(MockRawIO, pyio.RawIOBase): 168 pass 169 170 171class MisbehavedRawIO(MockRawIO): 172 def write(self, b): 173 return super().write(b) * 2 174 175 def read(self, n=None): 176 return super().read(n) * 2 177 178 def seek(self, pos, whence): 179 return -123 180 181 def tell(self): 182 return -456 183 184 def readinto(self, buf): 185 super().readinto(buf) 186 return len(buf) * 5 187 188class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase): 189 pass 190 191class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase): 192 pass 193 194 195class SlowFlushRawIO(MockRawIO): 196 def __init__(self): 197 super().__init__() 198 self.in_flush = threading.Event() 199 200 def flush(self): 201 self.in_flush.set() 202 time.sleep(0.25) 203 204class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase): 205 pass 206 207class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase): 208 pass 209 210 211class CloseFailureIO(MockRawIO): 212 closed = 0 213 214 def close(self): 215 if not self.closed: 216 self.closed = 1 217 raise OSError 218 219class CCloseFailureIO(CloseFailureIO, io.RawIOBase): 220 pass 221 222class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase): 223 pass 224 225 226class MockFileIO: 227 228 def __init__(self, data): 229 self.read_history = [] 230 super().__init__(data) 231 232 def read(self, n=None): 233 res = super().read(n) 234 self.read_history.append(None if res is None else len(res)) 235 return res 236 237 def readinto(self, b): 238 res = super().readinto(b) 239 self.read_history.append(res) 240 return res 241 242class CMockFileIO(MockFileIO, io.BytesIO): 243 pass 244 245class PyMockFileIO(MockFileIO, pyio.BytesIO): 246 pass 247 248 249class MockUnseekableIO: 250 def seekable(self): 251 return False 252 253 def seek(self, *args): 254 raise self.UnsupportedOperation("not seekable") 255 256 def tell(self, *args): 257 raise self.UnsupportedOperation("not seekable") 258 259 def truncate(self, *args): 260 raise self.UnsupportedOperation("not seekable") 261 262class CMockUnseekableIO(MockUnseekableIO, io.BytesIO): 263 UnsupportedOperation = io.UnsupportedOperation 264 265class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO): 266 UnsupportedOperation = pyio.UnsupportedOperation 267 268 269class MockNonBlockWriterIO: 270 271 def __init__(self): 272 self._write_stack = [] 273 self._blocker_char = None 274 275 def pop_written(self): 276 s = b"".join(self._write_stack) 277 self._write_stack[:] = [] 278 return s 279 280 def block_on(self, char): 281 """Block when a given char is encountered.""" 282 self._blocker_char = char 283 284 def readable(self): 285 return True 286 287 def seekable(self): 288 return True 289 290 def seek(self, pos, whence=0): 291 # naive implementation, enough for tests 292 return 0 293 294 def writable(self): 295 return True 296 297 def write(self, b): 298 b = bytes(b) 299 n = -1 300 if self._blocker_char: 301 try: 302 n = b.index(self._blocker_char) 303 except ValueError: 304 pass 305 else: 306 if n > 0: 307 # write data up to the first blocker 308 self._write_stack.append(b[:n]) 309 return n 310 else: 311 # cancel blocker and indicate would block 312 self._blocker_char = None 313 return None 314 self._write_stack.append(b) 315 return len(b) 316 317class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase): 318 BlockingIOError = io.BlockingIOError 319 320class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): 321 BlockingIOError = pyio.BlockingIOError 322 323 324class IOTest(unittest.TestCase): 325 326 def setUp(self): 327 os_helper.unlink(os_helper.TESTFN) 328 329 def tearDown(self): 330 os_helper.unlink(os_helper.TESTFN) 331 332 def write_ops(self, f): 333 self.assertEqual(f.write(b"blah."), 5) 334 f.truncate(0) 335 self.assertEqual(f.tell(), 5) 336 f.seek(0) 337 338 self.assertEqual(f.write(b"blah."), 5) 339 self.assertEqual(f.seek(0), 0) 340 self.assertEqual(f.write(b"Hello."), 6) 341 self.assertEqual(f.tell(), 6) 342 self.assertEqual(f.seek(-1, 1), 5) 343 self.assertEqual(f.tell(), 5) 344 buffer = bytearray(b" world\n\n\n") 345 self.assertEqual(f.write(buffer), 9) 346 buffer[:] = b"*" * 9 # Overwrite our copy of the data 347 self.assertEqual(f.seek(0), 0) 348 self.assertEqual(f.write(b"h"), 1) 349 self.assertEqual(f.seek(-1, 2), 13) 350 self.assertEqual(f.tell(), 13) 351 352 self.assertEqual(f.truncate(12), 12) 353 self.assertEqual(f.tell(), 13) 354 self.assertRaises(TypeError, f.seek, 0.0) 355 356 def read_ops(self, f, buffered=False): 357 data = f.read(5) 358 self.assertEqual(data, b"hello") 359 data = byteslike(data) 360 self.assertEqual(f.readinto(data), 5) 361 self.assertEqual(bytes(data), b" worl") 362 data = bytearray(5) 363 self.assertEqual(f.readinto(data), 2) 364 self.assertEqual(len(data), 5) 365 self.assertEqual(data[:2], b"d\n") 366 self.assertEqual(f.seek(0), 0) 367 self.assertEqual(f.read(20), b"hello world\n") 368 self.assertEqual(f.read(1), b"") 369 self.assertEqual(f.readinto(byteslike(b"x")), 0) 370 self.assertEqual(f.seek(-6, 2), 6) 371 self.assertEqual(f.read(5), b"world") 372 self.assertEqual(f.read(0), b"") 373 self.assertEqual(f.readinto(byteslike()), 0) 374 self.assertEqual(f.seek(-6, 1), 5) 375 self.assertEqual(f.read(5), b" worl") 376 self.assertEqual(f.tell(), 10) 377 self.assertRaises(TypeError, f.seek, 0.0) 378 if buffered: 379 f.seek(0) 380 self.assertEqual(f.read(), b"hello world\n") 381 f.seek(6) 382 self.assertEqual(f.read(), b"world\n") 383 self.assertEqual(f.read(), b"") 384 f.seek(0) 385 data = byteslike(5) 386 self.assertEqual(f.readinto1(data), 5) 387 self.assertEqual(bytes(data), b"hello") 388 389 LARGE = 2**31 390 391 def large_file_ops(self, f): 392 assert f.readable() 393 assert f.writable() 394 try: 395 self.assertEqual(f.seek(self.LARGE), self.LARGE) 396 except (OverflowError, ValueError): 397 self.skipTest("no largefile support") 398 self.assertEqual(f.tell(), self.LARGE) 399 self.assertEqual(f.write(b"xxx"), 3) 400 self.assertEqual(f.tell(), self.LARGE + 3) 401 self.assertEqual(f.seek(-1, 1), self.LARGE + 2) 402 self.assertEqual(f.truncate(), self.LARGE + 2) 403 self.assertEqual(f.tell(), self.LARGE + 2) 404 self.assertEqual(f.seek(0, 2), self.LARGE + 2) 405 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1) 406 self.assertEqual(f.tell(), self.LARGE + 2) 407 self.assertEqual(f.seek(0, 2), self.LARGE + 1) 408 self.assertEqual(f.seek(-1, 2), self.LARGE) 409 self.assertEqual(f.read(2), b"x") 410 411 def test_invalid_operations(self): 412 # Try writing on a file opened in read mode and vice-versa. 413 exc = self.UnsupportedOperation 414 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as fp: 415 self.assertRaises(exc, fp.read) 416 self.assertRaises(exc, fp.readline) 417 with self.open(os_helper.TESTFN, "wb") as fp: 418 self.assertRaises(exc, fp.read) 419 self.assertRaises(exc, fp.readline) 420 with self.open(os_helper.TESTFN, "wb", buffering=0) as fp: 421 self.assertRaises(exc, fp.read) 422 self.assertRaises(exc, fp.readline) 423 with self.open(os_helper.TESTFN, "rb", buffering=0) as fp: 424 self.assertRaises(exc, fp.write, b"blah") 425 self.assertRaises(exc, fp.writelines, [b"blah\n"]) 426 with self.open(os_helper.TESTFN, "rb") as fp: 427 self.assertRaises(exc, fp.write, b"blah") 428 self.assertRaises(exc, fp.writelines, [b"blah\n"]) 429 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as fp: 430 self.assertRaises(exc, fp.write, "blah") 431 self.assertRaises(exc, fp.writelines, ["blah\n"]) 432 # Non-zero seeking from current or end pos 433 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR) 434 self.assertRaises(exc, fp.seek, -1, self.SEEK_END) 435 436 def test_optional_abilities(self): 437 # Test for OSError when optional APIs are not supported 438 # The purpose of this test is to try fileno(), reading, writing and 439 # seeking operations with various objects that indicate they do not 440 # support these operations. 441 442 def pipe_reader(): 443 [r, w] = os.pipe() 444 os.close(w) # So that read() is harmless 445 return self.FileIO(r, "r") 446 447 def pipe_writer(): 448 [r, w] = os.pipe() 449 self.addCleanup(os.close, r) 450 # Guarantee that we can write into the pipe without blocking 451 thread = threading.Thread(target=os.read, args=(r, 100)) 452 thread.start() 453 self.addCleanup(thread.join) 454 return self.FileIO(w, "w") 455 456 def buffered_reader(): 457 return self.BufferedReader(self.MockUnseekableIO()) 458 459 def buffered_writer(): 460 return self.BufferedWriter(self.MockUnseekableIO()) 461 462 def buffered_random(): 463 return self.BufferedRandom(self.BytesIO()) 464 465 def buffered_rw_pair(): 466 return self.BufferedRWPair(self.MockUnseekableIO(), 467 self.MockUnseekableIO()) 468 469 def text_reader(): 470 class UnseekableReader(self.MockUnseekableIO): 471 writable = self.BufferedIOBase.writable 472 write = self.BufferedIOBase.write 473 return self.TextIOWrapper(UnseekableReader(), "ascii") 474 475 def text_writer(): 476 class UnseekableWriter(self.MockUnseekableIO): 477 readable = self.BufferedIOBase.readable 478 read = self.BufferedIOBase.read 479 return self.TextIOWrapper(UnseekableWriter(), "ascii") 480 481 tests = ( 482 (pipe_reader, "fr"), (pipe_writer, "fw"), 483 (buffered_reader, "r"), (buffered_writer, "w"), 484 (buffered_random, "rws"), (buffered_rw_pair, "rw"), 485 (text_reader, "r"), (text_writer, "w"), 486 (self.BytesIO, "rws"), (self.StringIO, "rws"), 487 ) 488 for [test, abilities] in tests: 489 with self.subTest(test), test() as obj: 490 readable = "r" in abilities 491 self.assertEqual(obj.readable(), readable) 492 writable = "w" in abilities 493 self.assertEqual(obj.writable(), writable) 494 495 if isinstance(obj, self.TextIOBase): 496 data = "3" 497 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)): 498 data = b"3" 499 else: 500 self.fail("Unknown base class") 501 502 if "f" in abilities: 503 obj.fileno() 504 else: 505 self.assertRaises(OSError, obj.fileno) 506 507 if readable: 508 obj.read(1) 509 obj.read() 510 else: 511 self.assertRaises(OSError, obj.read, 1) 512 self.assertRaises(OSError, obj.read) 513 514 if writable: 515 obj.write(data) 516 else: 517 self.assertRaises(OSError, obj.write, data) 518 519 if sys.platform.startswith("win") and test in ( 520 pipe_reader, pipe_writer): 521 # Pipes seem to appear as seekable on Windows 522 continue 523 seekable = "s" in abilities 524 self.assertEqual(obj.seekable(), seekable) 525 526 if seekable: 527 obj.tell() 528 obj.seek(0) 529 else: 530 self.assertRaises(OSError, obj.tell) 531 self.assertRaises(OSError, obj.seek, 0) 532 533 if writable and seekable: 534 obj.truncate() 535 obj.truncate(0) 536 else: 537 self.assertRaises(OSError, obj.truncate) 538 self.assertRaises(OSError, obj.truncate, 0) 539 540 def test_open_handles_NUL_chars(self): 541 fn_with_NUL = 'foo\0bar' 542 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w', encoding="utf-8") 543 544 bytes_fn = bytes(fn_with_NUL, 'ascii') 545 with warnings.catch_warnings(): 546 warnings.simplefilter("ignore", DeprecationWarning) 547 self.assertRaises(ValueError, self.open, bytes_fn, 'w', encoding="utf-8") 548 549 def test_raw_file_io(self): 550 with self.open(os_helper.TESTFN, "wb", buffering=0) as f: 551 self.assertEqual(f.readable(), False) 552 self.assertEqual(f.writable(), True) 553 self.assertEqual(f.seekable(), True) 554 self.write_ops(f) 555 with self.open(os_helper.TESTFN, "rb", buffering=0) as f: 556 self.assertEqual(f.readable(), True) 557 self.assertEqual(f.writable(), False) 558 self.assertEqual(f.seekable(), True) 559 self.read_ops(f) 560 561 def test_buffered_file_io(self): 562 with self.open(os_helper.TESTFN, "wb") as f: 563 self.assertEqual(f.readable(), False) 564 self.assertEqual(f.writable(), True) 565 self.assertEqual(f.seekable(), True) 566 self.write_ops(f) 567 with self.open(os_helper.TESTFN, "rb") as f: 568 self.assertEqual(f.readable(), True) 569 self.assertEqual(f.writable(), False) 570 self.assertEqual(f.seekable(), True) 571 self.read_ops(f, True) 572 573 def test_readline(self): 574 with self.open(os_helper.TESTFN, "wb") as f: 575 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line") 576 with self.open(os_helper.TESTFN, "rb") as f: 577 self.assertEqual(f.readline(), b"abc\n") 578 self.assertEqual(f.readline(10), b"def\n") 579 self.assertEqual(f.readline(2), b"xy") 580 self.assertEqual(f.readline(4), b"zzy\n") 581 self.assertEqual(f.readline(), b"foo\x00bar\n") 582 self.assertEqual(f.readline(None), b"another line") 583 self.assertRaises(TypeError, f.readline, 5.3) 584 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: 585 self.assertRaises(TypeError, f.readline, 5.3) 586 587 def test_readline_nonsizeable(self): 588 # Issue #30061 589 # Crash when readline() returns an object without __len__ 590 class R(self.IOBase): 591 def readline(self): 592 return None 593 self.assertRaises((TypeError, StopIteration), next, R()) 594 595 def test_next_nonsizeable(self): 596 # Issue #30061 597 # Crash when __next__() returns an object without __len__ 598 class R(self.IOBase): 599 def __next__(self): 600 return None 601 self.assertRaises(TypeError, R().readlines, 1) 602 603 def test_raw_bytes_io(self): 604 f = self.BytesIO() 605 self.write_ops(f) 606 data = f.getvalue() 607 self.assertEqual(data, b"hello world\n") 608 f = self.BytesIO(data) 609 self.read_ops(f, True) 610 611 def test_large_file_ops(self): 612 # On Windows and Mac OSX this test consumes large resources; It takes 613 # a long time to build the >2 GiB file and takes >2 GiB of disk space 614 # therefore the resource must be enabled to run this test. 615 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 616 support.requires( 617 'largefile', 618 'test requires %s bytes and a long time to run' % self.LARGE) 619 with self.open(os_helper.TESTFN, "w+b", 0) as f: 620 self.large_file_ops(f) 621 with self.open(os_helper.TESTFN, "w+b") as f: 622 self.large_file_ops(f) 623 624 def test_with_open(self): 625 for bufsize in (0, 100): 626 f = None 627 with self.open(os_helper.TESTFN, "wb", bufsize) as f: 628 f.write(b"xxx") 629 self.assertEqual(f.closed, True) 630 f = None 631 try: 632 with self.open(os_helper.TESTFN, "wb", bufsize) as f: 633 1/0 634 except ZeroDivisionError: 635 self.assertEqual(f.closed, True) 636 else: 637 self.fail("1/0 didn't raise an exception") 638 639 # issue 5008 640 def test_append_mode_tell(self): 641 with self.open(os_helper.TESTFN, "wb") as f: 642 f.write(b"xxx") 643 with self.open(os_helper.TESTFN, "ab", buffering=0) as f: 644 self.assertEqual(f.tell(), 3) 645 with self.open(os_helper.TESTFN, "ab") as f: 646 self.assertEqual(f.tell(), 3) 647 with self.open(os_helper.TESTFN, "a", encoding="utf-8") as f: 648 self.assertGreater(f.tell(), 0) 649 650 def test_destructor(self): 651 record = [] 652 class MyFileIO(self.FileIO): 653 def __del__(self): 654 record.append(1) 655 try: 656 f = super().__del__ 657 except AttributeError: 658 pass 659 else: 660 f() 661 def close(self): 662 record.append(2) 663 super().close() 664 def flush(self): 665 record.append(3) 666 super().flush() 667 with warnings_helper.check_warnings(('', ResourceWarning)): 668 f = MyFileIO(os_helper.TESTFN, "wb") 669 f.write(b"xxx") 670 del f 671 support.gc_collect() 672 self.assertEqual(record, [1, 2, 3]) 673 with self.open(os_helper.TESTFN, "rb") as f: 674 self.assertEqual(f.read(), b"xxx") 675 676 def _check_base_destructor(self, base): 677 record = [] 678 class MyIO(base): 679 def __init__(self): 680 # This exercises the availability of attributes on object 681 # destruction. 682 # (in the C version, close() is called by the tp_dealloc 683 # function, not by __del__) 684 self.on_del = 1 685 self.on_close = 2 686 self.on_flush = 3 687 def __del__(self): 688 record.append(self.on_del) 689 try: 690 f = super().__del__ 691 except AttributeError: 692 pass 693 else: 694 f() 695 def close(self): 696 record.append(self.on_close) 697 super().close() 698 def flush(self): 699 record.append(self.on_flush) 700 super().flush() 701 f = MyIO() 702 del f 703 support.gc_collect() 704 self.assertEqual(record, [1, 2, 3]) 705 706 def test_IOBase_destructor(self): 707 self._check_base_destructor(self.IOBase) 708 709 def test_RawIOBase_destructor(self): 710 self._check_base_destructor(self.RawIOBase) 711 712 def test_BufferedIOBase_destructor(self): 713 self._check_base_destructor(self.BufferedIOBase) 714 715 def test_TextIOBase_destructor(self): 716 self._check_base_destructor(self.TextIOBase) 717 718 def test_close_flushes(self): 719 with self.open(os_helper.TESTFN, "wb") as f: 720 f.write(b"xxx") 721 with self.open(os_helper.TESTFN, "rb") as f: 722 self.assertEqual(f.read(), b"xxx") 723 724 def test_array_writes(self): 725 a = array.array('i', range(10)) 726 n = len(a.tobytes()) 727 def check(f): 728 with f: 729 self.assertEqual(f.write(a), n) 730 f.writelines((a,)) 731 check(self.BytesIO()) 732 check(self.FileIO(os_helper.TESTFN, "w")) 733 check(self.BufferedWriter(self.MockRawIO())) 734 check(self.BufferedRandom(self.MockRawIO())) 735 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())) 736 737 def test_closefd(self): 738 self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w', 739 encoding="utf-8", closefd=False) 740 741 def test_read_closed(self): 742 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: 743 f.write("egg\n") 744 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: 745 file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False) 746 self.assertEqual(file.read(), "egg\n") 747 file.seek(0) 748 file.close() 749 self.assertRaises(ValueError, file.read) 750 with self.open(os_helper.TESTFN, "rb") as f: 751 file = self.open(f.fileno(), "rb", closefd=False) 752 self.assertEqual(file.read()[:3], b"egg") 753 file.close() 754 self.assertRaises(ValueError, file.readinto, bytearray(1)) 755 756 def test_no_closefd_with_filename(self): 757 # can't use closefd in combination with a file name 758 self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r", 759 encoding="utf-8", closefd=False) 760 761 def test_closefd_attr(self): 762 with self.open(os_helper.TESTFN, "wb") as f: 763 f.write(b"egg\n") 764 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: 765 self.assertEqual(f.buffer.raw.closefd, True) 766 file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False) 767 self.assertEqual(file.buffer.raw.closefd, False) 768 769 def test_garbage_collection(self): 770 # FileIO objects are collected, and collecting them flushes 771 # all data to disk. 772 with warnings_helper.check_warnings(('', ResourceWarning)): 773 f = self.FileIO(os_helper.TESTFN, "wb") 774 f.write(b"abcxxx") 775 f.f = f 776 wr = weakref.ref(f) 777 del f 778 support.gc_collect() 779 self.assertIsNone(wr(), wr) 780 with self.open(os_helper.TESTFN, "rb") as f: 781 self.assertEqual(f.read(), b"abcxxx") 782 783 def test_unbounded_file(self): 784 # Issue #1174606: reading from an unbounded stream such as /dev/zero. 785 zero = "/dev/zero" 786 if not os.path.exists(zero): 787 self.skipTest("{0} does not exist".format(zero)) 788 if sys.maxsize > 0x7FFFFFFF: 789 self.skipTest("test can only run in a 32-bit address space") 790 if support.real_max_memuse < support._2G: 791 self.skipTest("test requires at least 2 GiB of memory") 792 with self.open(zero, "rb", buffering=0) as f: 793 self.assertRaises(OverflowError, f.read) 794 with self.open(zero, "rb") as f: 795 self.assertRaises(OverflowError, f.read) 796 with self.open(zero, "r") as f: 797 self.assertRaises(OverflowError, f.read) 798 799 def check_flush_error_on_close(self, *args, **kwargs): 800 # Test that the file is closed despite failed flush 801 # and that flush() is called before file closed. 802 f = self.open(*args, **kwargs) 803 closed = [] 804 def bad_flush(): 805 closed[:] = [f.closed] 806 raise OSError() 807 f.flush = bad_flush 808 self.assertRaises(OSError, f.close) # exception not swallowed 809 self.assertTrue(f.closed) 810 self.assertTrue(closed) # flush() called 811 self.assertFalse(closed[0]) # flush() called before file closed 812 f.flush = lambda: None # break reference loop 813 814 def test_flush_error_on_close(self): 815 # raw file 816 # Issue #5700: io.FileIO calls flush() after file closed 817 self.check_flush_error_on_close(os_helper.TESTFN, 'wb', buffering=0) 818 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) 819 self.check_flush_error_on_close(fd, 'wb', buffering=0) 820 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) 821 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False) 822 os.close(fd) 823 # buffered io 824 self.check_flush_error_on_close(os_helper.TESTFN, 'wb') 825 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) 826 self.check_flush_error_on_close(fd, 'wb') 827 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) 828 self.check_flush_error_on_close(fd, 'wb', closefd=False) 829 os.close(fd) 830 # text io 831 self.check_flush_error_on_close(os_helper.TESTFN, 'w', encoding="utf-8") 832 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) 833 self.check_flush_error_on_close(fd, 'w', encoding="utf-8") 834 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) 835 self.check_flush_error_on_close(fd, 'w', encoding="utf-8", closefd=False) 836 os.close(fd) 837 838 def test_multi_close(self): 839 f = self.open(os_helper.TESTFN, "wb", buffering=0) 840 f.close() 841 f.close() 842 f.close() 843 self.assertRaises(ValueError, f.flush) 844 845 def test_RawIOBase_read(self): 846 # Exercise the default limited RawIOBase.read(n) implementation (which 847 # calls readinto() internally). 848 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None)) 849 self.assertEqual(rawio.read(2), b"ab") 850 self.assertEqual(rawio.read(2), b"c") 851 self.assertEqual(rawio.read(2), b"d") 852 self.assertEqual(rawio.read(2), None) 853 self.assertEqual(rawio.read(2), b"ef") 854 self.assertEqual(rawio.read(2), b"g") 855 self.assertEqual(rawio.read(2), None) 856 self.assertEqual(rawio.read(2), b"") 857 858 def test_types_have_dict(self): 859 test = ( 860 self.IOBase(), 861 self.RawIOBase(), 862 self.TextIOBase(), 863 self.StringIO(), 864 self.BytesIO() 865 ) 866 for obj in test: 867 self.assertTrue(hasattr(obj, "__dict__")) 868 869 def test_opener(self): 870 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: 871 f.write("egg\n") 872 fd = os.open(os_helper.TESTFN, os.O_RDONLY) 873 def opener(path, flags): 874 return fd 875 with self.open("non-existent", "r", encoding="utf-8", opener=opener) as f: 876 self.assertEqual(f.read(), "egg\n") 877 878 def test_bad_opener_negative_1(self): 879 # Issue #27066. 880 def badopener(fname, flags): 881 return -1 882 with self.assertRaises(ValueError) as cm: 883 open('non-existent', 'r', opener=badopener) 884 self.assertEqual(str(cm.exception), 'opener returned -1') 885 886 def test_bad_opener_other_negative(self): 887 # Issue #27066. 888 def badopener(fname, flags): 889 return -2 890 with self.assertRaises(ValueError) as cm: 891 open('non-existent', 'r', opener=badopener) 892 self.assertEqual(str(cm.exception), 'opener returned -2') 893 894 def test_fileio_closefd(self): 895 # Issue #4841 896 with self.open(__file__, 'rb') as f1, \ 897 self.open(__file__, 'rb') as f2: 898 fileio = self.FileIO(f1.fileno(), closefd=False) 899 # .__init__() must not close f1 900 fileio.__init__(f2.fileno(), closefd=False) 901 f1.readline() 902 # .close() must not close f2 903 fileio.close() 904 f2.readline() 905 906 def test_nonbuffered_textio(self): 907 with warnings_helper.check_no_resource_warning(self): 908 with self.assertRaises(ValueError): 909 self.open(os_helper.TESTFN, 'w', encoding="utf-8", buffering=0) 910 911 def test_invalid_newline(self): 912 with warnings_helper.check_no_resource_warning(self): 913 with self.assertRaises(ValueError): 914 self.open(os_helper.TESTFN, 'w', encoding="utf-8", newline='invalid') 915 916 def test_buffered_readinto_mixin(self): 917 # Test the implementation provided by BufferedIOBase 918 class Stream(self.BufferedIOBase): 919 def read(self, size): 920 return b"12345" 921 read1 = read 922 stream = Stream() 923 for method in ("readinto", "readinto1"): 924 with self.subTest(method): 925 buffer = byteslike(5) 926 self.assertEqual(getattr(stream, method)(buffer), 5) 927 self.assertEqual(bytes(buffer), b"12345") 928 929 def test_fspath_support(self): 930 def check_path_succeeds(path): 931 with self.open(path, "w", encoding="utf-8") as f: 932 f.write("egg\n") 933 934 with self.open(path, "r", encoding="utf-8") as f: 935 self.assertEqual(f.read(), "egg\n") 936 937 check_path_succeeds(FakePath(os_helper.TESTFN)) 938 check_path_succeeds(FakePath(os.fsencode(os_helper.TESTFN))) 939 940 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: 941 bad_path = FakePath(f.fileno()) 942 with self.assertRaises(TypeError): 943 self.open(bad_path, 'w', encoding="utf-8") 944 945 bad_path = FakePath(None) 946 with self.assertRaises(TypeError): 947 self.open(bad_path, 'w', encoding="utf-8") 948 949 bad_path = FakePath(FloatingPointError) 950 with self.assertRaises(FloatingPointError): 951 self.open(bad_path, 'w', encoding="utf-8") 952 953 # ensure that refcounting is correct with some error conditions 954 with self.assertRaisesRegex(ValueError, 'read/write/append mode'): 955 self.open(FakePath(os_helper.TESTFN), 'rwxa', encoding="utf-8") 956 957 def test_RawIOBase_readall(self): 958 # Exercise the default unlimited RawIOBase.read() and readall() 959 # implementations. 960 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg")) 961 self.assertEqual(rawio.read(), b"abcdefg") 962 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg")) 963 self.assertEqual(rawio.readall(), b"abcdefg") 964 965 def test_BufferedIOBase_readinto(self): 966 # Exercise the default BufferedIOBase.readinto() and readinto1() 967 # implementations (which call read() or read1() internally). 968 class Reader(self.BufferedIOBase): 969 def __init__(self, avail): 970 self.avail = avail 971 def read(self, size): 972 result = self.avail[:size] 973 self.avail = self.avail[size:] 974 return result 975 def read1(self, size): 976 """Returns no more than 5 bytes at once""" 977 return self.read(min(size, 5)) 978 tests = ( 979 # (test method, total data available, read buffer size, expected 980 # read size) 981 ("readinto", 10, 5, 5), 982 ("readinto", 10, 6, 6), # More than read1() can return 983 ("readinto", 5, 6, 5), # Buffer larger than total available 984 ("readinto", 6, 7, 6), 985 ("readinto", 10, 0, 0), # Empty buffer 986 ("readinto1", 10, 5, 5), # Result limited to single read1() call 987 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return 988 ("readinto1", 5, 6, 5), # Buffer larger than total available 989 ("readinto1", 6, 7, 5), 990 ("readinto1", 10, 0, 0), # Empty buffer 991 ) 992 UNUSED_BYTE = 0x81 993 for test in tests: 994 with self.subTest(test): 995 method, avail, request, result = test 996 reader = Reader(bytes(range(avail))) 997 buffer = bytearray((UNUSED_BYTE,) * request) 998 method = getattr(reader, method) 999 self.assertEqual(method(buffer), result) 1000 self.assertEqual(len(buffer), request) 1001 self.assertSequenceEqual(buffer[:result], range(result)) 1002 unused = (UNUSED_BYTE,) * (request - result) 1003 self.assertSequenceEqual(buffer[result:], unused) 1004 self.assertEqual(len(reader.avail), avail - result) 1005 1006 def test_close_assert(self): 1007 class R(self.IOBase): 1008 def __setattr__(self, name, value): 1009 pass 1010 def flush(self): 1011 raise OSError() 1012 f = R() 1013 # This would cause an assertion failure. 1014 self.assertRaises(OSError, f.close) 1015 1016 # Silence destructor error 1017 R.flush = lambda self: None 1018 1019 1020class CIOTest(IOTest): 1021 1022 def test_IOBase_finalize(self): 1023 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a 1024 # class which inherits IOBase and an object of this class are caught 1025 # in a reference cycle and close() is already in the method cache. 1026 class MyIO(self.IOBase): 1027 def close(self): 1028 pass 1029 1030 # create an instance to populate the method cache 1031 MyIO() 1032 obj = MyIO() 1033 obj.obj = obj 1034 wr = weakref.ref(obj) 1035 del MyIO 1036 del obj 1037 support.gc_collect() 1038 self.assertIsNone(wr(), wr) 1039 1040class PyIOTest(IOTest): 1041 pass 1042 1043 1044@support.cpython_only 1045class APIMismatchTest(unittest.TestCase): 1046 1047 def test_RawIOBase_io_in_pyio_match(self): 1048 """Test that pyio RawIOBase class has all c RawIOBase methods""" 1049 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase, 1050 ignore=('__weakref__',)) 1051 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods') 1052 1053 def test_RawIOBase_pyio_in_io_match(self): 1054 """Test that c RawIOBase class has all pyio RawIOBase methods""" 1055 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase) 1056 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods') 1057 1058 1059class CommonBufferedTests: 1060 # Tests common to BufferedReader, BufferedWriter and BufferedRandom 1061 1062 def test_detach(self): 1063 raw = self.MockRawIO() 1064 buf = self.tp(raw) 1065 self.assertIs(buf.detach(), raw) 1066 self.assertRaises(ValueError, buf.detach) 1067 1068 repr(buf) # Should still work 1069 1070 def test_fileno(self): 1071 rawio = self.MockRawIO() 1072 bufio = self.tp(rawio) 1073 1074 self.assertEqual(42, bufio.fileno()) 1075 1076 def test_invalid_args(self): 1077 rawio = self.MockRawIO() 1078 bufio = self.tp(rawio) 1079 # Invalid whence 1080 self.assertRaises(ValueError, bufio.seek, 0, -1) 1081 self.assertRaises(ValueError, bufio.seek, 0, 9) 1082 1083 def test_override_destructor(self): 1084 tp = self.tp 1085 record = [] 1086 class MyBufferedIO(tp): 1087 def __del__(self): 1088 record.append(1) 1089 try: 1090 f = super().__del__ 1091 except AttributeError: 1092 pass 1093 else: 1094 f() 1095 def close(self): 1096 record.append(2) 1097 super().close() 1098 def flush(self): 1099 record.append(3) 1100 super().flush() 1101 rawio = self.MockRawIO() 1102 bufio = MyBufferedIO(rawio) 1103 del bufio 1104 support.gc_collect() 1105 self.assertEqual(record, [1, 2, 3]) 1106 1107 def test_context_manager(self): 1108 # Test usability as a context manager 1109 rawio = self.MockRawIO() 1110 bufio = self.tp(rawio) 1111 def _with(): 1112 with bufio: 1113 pass 1114 _with() 1115 # bufio should now be closed, and using it a second time should raise 1116 # a ValueError. 1117 self.assertRaises(ValueError, _with) 1118 1119 def test_error_through_destructor(self): 1120 # Test that the exception state is not modified by a destructor, 1121 # even if close() fails. 1122 rawio = self.CloseFailureIO() 1123 with support.catch_unraisable_exception() as cm: 1124 with self.assertRaises(AttributeError): 1125 self.tp(rawio).xyzzy 1126 1127 if not IOBASE_EMITS_UNRAISABLE: 1128 self.assertIsNone(cm.unraisable) 1129 elif cm.unraisable is not None: 1130 self.assertEqual(cm.unraisable.exc_type, OSError) 1131 1132 def test_repr(self): 1133 raw = self.MockRawIO() 1134 b = self.tp(raw) 1135 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__) 1136 self.assertRegex(repr(b), "<%s>" % clsname) 1137 raw.name = "dummy" 1138 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname) 1139 raw.name = b"dummy" 1140 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname) 1141 1142 def test_recursive_repr(self): 1143 # Issue #25455 1144 raw = self.MockRawIO() 1145 b = self.tp(raw) 1146 with support.swap_attr(raw, 'name', b): 1147 try: 1148 repr(b) # Should not crash 1149 except RuntimeError: 1150 pass 1151 1152 def test_flush_error_on_close(self): 1153 # Test that buffered file is closed despite failed flush 1154 # and that flush() is called before file closed. 1155 raw = self.MockRawIO() 1156 closed = [] 1157 def bad_flush(): 1158 closed[:] = [b.closed, raw.closed] 1159 raise OSError() 1160 raw.flush = bad_flush 1161 b = self.tp(raw) 1162 self.assertRaises(OSError, b.close) # exception not swallowed 1163 self.assertTrue(b.closed) 1164 self.assertTrue(raw.closed) 1165 self.assertTrue(closed) # flush() called 1166 self.assertFalse(closed[0]) # flush() called before file closed 1167 self.assertFalse(closed[1]) 1168 raw.flush = lambda: None # break reference loop 1169 1170 def test_close_error_on_close(self): 1171 raw = self.MockRawIO() 1172 def bad_flush(): 1173 raise OSError('flush') 1174 def bad_close(): 1175 raise OSError('close') 1176 raw.close = bad_close 1177 b = self.tp(raw) 1178 b.flush = bad_flush 1179 with self.assertRaises(OSError) as err: # exception not swallowed 1180 b.close() 1181 self.assertEqual(err.exception.args, ('close',)) 1182 self.assertIsInstance(err.exception.__context__, OSError) 1183 self.assertEqual(err.exception.__context__.args, ('flush',)) 1184 self.assertFalse(b.closed) 1185 1186 # Silence destructor error 1187 raw.close = lambda: None 1188 b.flush = lambda: None 1189 1190 def test_nonnormalized_close_error_on_close(self): 1191 # Issue #21677 1192 raw = self.MockRawIO() 1193 def bad_flush(): 1194 raise non_existing_flush 1195 def bad_close(): 1196 raise non_existing_close 1197 raw.close = bad_close 1198 b = self.tp(raw) 1199 b.flush = bad_flush 1200 with self.assertRaises(NameError) as err: # exception not swallowed 1201 b.close() 1202 self.assertIn('non_existing_close', str(err.exception)) 1203 self.assertIsInstance(err.exception.__context__, NameError) 1204 self.assertIn('non_existing_flush', str(err.exception.__context__)) 1205 self.assertFalse(b.closed) 1206 1207 # Silence destructor error 1208 b.flush = lambda: None 1209 raw.close = lambda: None 1210 1211 def test_multi_close(self): 1212 raw = self.MockRawIO() 1213 b = self.tp(raw) 1214 b.close() 1215 b.close() 1216 b.close() 1217 self.assertRaises(ValueError, b.flush) 1218 1219 def test_unseekable(self): 1220 bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) 1221 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1222 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1223 1224 def test_readonly_attributes(self): 1225 raw = self.MockRawIO() 1226 buf = self.tp(raw) 1227 x = self.MockRawIO() 1228 with self.assertRaises(AttributeError): 1229 buf.raw = x 1230 1231 1232class SizeofTest: 1233 1234 @support.cpython_only 1235 def test_sizeof(self): 1236 bufsize1 = 4096 1237 bufsize2 = 8192 1238 rawio = self.MockRawIO() 1239 bufio = self.tp(rawio, buffer_size=bufsize1) 1240 size = sys.getsizeof(bufio) - bufsize1 1241 rawio = self.MockRawIO() 1242 bufio = self.tp(rawio, buffer_size=bufsize2) 1243 self.assertEqual(sys.getsizeof(bufio), size + bufsize2) 1244 1245 @support.cpython_only 1246 def test_buffer_freeing(self) : 1247 bufsize = 4096 1248 rawio = self.MockRawIO() 1249 bufio = self.tp(rawio, buffer_size=bufsize) 1250 size = sys.getsizeof(bufio) - bufsize 1251 bufio.close() 1252 self.assertEqual(sys.getsizeof(bufio), size) 1253 1254class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): 1255 read_mode = "rb" 1256 1257 def test_constructor(self): 1258 rawio = self.MockRawIO([b"abc"]) 1259 bufio = self.tp(rawio) 1260 bufio.__init__(rawio) 1261 bufio.__init__(rawio, buffer_size=1024) 1262 bufio.__init__(rawio, buffer_size=16) 1263 self.assertEqual(b"abc", bufio.read()) 1264 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1265 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1266 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1267 rawio = self.MockRawIO([b"abc"]) 1268 bufio.__init__(rawio) 1269 self.assertEqual(b"abc", bufio.read()) 1270 1271 def test_uninitialized(self): 1272 bufio = self.tp.__new__(self.tp) 1273 del bufio 1274 bufio = self.tp.__new__(self.tp) 1275 self.assertRaisesRegex((ValueError, AttributeError), 1276 'uninitialized|has no attribute', 1277 bufio.read, 0) 1278 bufio.__init__(self.MockRawIO()) 1279 self.assertEqual(bufio.read(0), b'') 1280 1281 def test_read(self): 1282 for arg in (None, 7): 1283 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1284 bufio = self.tp(rawio) 1285 self.assertEqual(b"abcdefg", bufio.read(arg)) 1286 # Invalid args 1287 self.assertRaises(ValueError, bufio.read, -2) 1288 1289 def test_read1(self): 1290 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1291 bufio = self.tp(rawio) 1292 self.assertEqual(b"a", bufio.read(1)) 1293 self.assertEqual(b"b", bufio.read1(1)) 1294 self.assertEqual(rawio._reads, 1) 1295 self.assertEqual(b"", bufio.read1(0)) 1296 self.assertEqual(b"c", bufio.read1(100)) 1297 self.assertEqual(rawio._reads, 1) 1298 self.assertEqual(b"d", bufio.read1(100)) 1299 self.assertEqual(rawio._reads, 2) 1300 self.assertEqual(b"efg", bufio.read1(100)) 1301 self.assertEqual(rawio._reads, 3) 1302 self.assertEqual(b"", bufio.read1(100)) 1303 self.assertEqual(rawio._reads, 4) 1304 1305 def test_read1_arbitrary(self): 1306 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1307 bufio = self.tp(rawio) 1308 self.assertEqual(b"a", bufio.read(1)) 1309 self.assertEqual(b"bc", bufio.read1()) 1310 self.assertEqual(b"d", bufio.read1()) 1311 self.assertEqual(b"efg", bufio.read1(-1)) 1312 self.assertEqual(rawio._reads, 3) 1313 self.assertEqual(b"", bufio.read1()) 1314 self.assertEqual(rawio._reads, 4) 1315 1316 def test_readinto(self): 1317 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1318 bufio = self.tp(rawio) 1319 b = bytearray(2) 1320 self.assertEqual(bufio.readinto(b), 2) 1321 self.assertEqual(b, b"ab") 1322 self.assertEqual(bufio.readinto(b), 2) 1323 self.assertEqual(b, b"cd") 1324 self.assertEqual(bufio.readinto(b), 2) 1325 self.assertEqual(b, b"ef") 1326 self.assertEqual(bufio.readinto(b), 1) 1327 self.assertEqual(b, b"gf") 1328 self.assertEqual(bufio.readinto(b), 0) 1329 self.assertEqual(b, b"gf") 1330 rawio = self.MockRawIO((b"abc", None)) 1331 bufio = self.tp(rawio) 1332 self.assertEqual(bufio.readinto(b), 2) 1333 self.assertEqual(b, b"ab") 1334 self.assertEqual(bufio.readinto(b), 1) 1335 self.assertEqual(b, b"cb") 1336 1337 def test_readinto1(self): 1338 buffer_size = 10 1339 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl")) 1340 bufio = self.tp(rawio, buffer_size=buffer_size) 1341 b = bytearray(2) 1342 self.assertEqual(bufio.peek(3), b'abc') 1343 self.assertEqual(rawio._reads, 1) 1344 self.assertEqual(bufio.readinto1(b), 2) 1345 self.assertEqual(b, b"ab") 1346 self.assertEqual(rawio._reads, 1) 1347 self.assertEqual(bufio.readinto1(b), 1) 1348 self.assertEqual(b[:1], b"c") 1349 self.assertEqual(rawio._reads, 1) 1350 self.assertEqual(bufio.readinto1(b), 2) 1351 self.assertEqual(b, b"de") 1352 self.assertEqual(rawio._reads, 2) 1353 b = bytearray(2*buffer_size) 1354 self.assertEqual(bufio.peek(3), b'fgh') 1355 self.assertEqual(rawio._reads, 3) 1356 self.assertEqual(bufio.readinto1(b), 6) 1357 self.assertEqual(b[:6], b"fghjkl") 1358 self.assertEqual(rawio._reads, 4) 1359 1360 def test_readinto_array(self): 1361 buffer_size = 60 1362 data = b"a" * 26 1363 rawio = self.MockRawIO((data,)) 1364 bufio = self.tp(rawio, buffer_size=buffer_size) 1365 1366 # Create an array with element size > 1 byte 1367 b = array.array('i', b'x' * 32) 1368 assert len(b) != 16 1369 1370 # Read into it. We should get as many *bytes* as we can fit into b 1371 # (which is more than the number of elements) 1372 n = bufio.readinto(b) 1373 self.assertGreater(n, len(b)) 1374 1375 # Check that old contents of b are preserved 1376 bm = memoryview(b).cast('B') 1377 self.assertLess(n, len(bm)) 1378 self.assertEqual(bm[:n], data[:n]) 1379 self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) 1380 1381 def test_readinto1_array(self): 1382 buffer_size = 60 1383 data = b"a" * 26 1384 rawio = self.MockRawIO((data,)) 1385 bufio = self.tp(rawio, buffer_size=buffer_size) 1386 1387 # Create an array with element size > 1 byte 1388 b = array.array('i', b'x' * 32) 1389 assert len(b) != 16 1390 1391 # Read into it. We should get as many *bytes* as we can fit into b 1392 # (which is more than the number of elements) 1393 n = bufio.readinto1(b) 1394 self.assertGreater(n, len(b)) 1395 1396 # Check that old contents of b are preserved 1397 bm = memoryview(b).cast('B') 1398 self.assertLess(n, len(bm)) 1399 self.assertEqual(bm[:n], data[:n]) 1400 self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) 1401 1402 def test_readlines(self): 1403 def bufio(): 1404 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) 1405 return self.tp(rawio) 1406 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"]) 1407 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"]) 1408 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"]) 1409 1410 def test_buffering(self): 1411 data = b"abcdefghi" 1412 dlen = len(data) 1413 1414 tests = [ 1415 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ], 1416 [ 100, [ 3, 3, 3], [ dlen ] ], 1417 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ], 1418 ] 1419 1420 for bufsize, buf_read_sizes, raw_read_sizes in tests: 1421 rawio = self.MockFileIO(data) 1422 bufio = self.tp(rawio, buffer_size=bufsize) 1423 pos = 0 1424 for nbytes in buf_read_sizes: 1425 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes]) 1426 pos += nbytes 1427 # this is mildly implementation-dependent 1428 self.assertEqual(rawio.read_history, raw_read_sizes) 1429 1430 def test_read_non_blocking(self): 1431 # Inject some None's in there to simulate EWOULDBLOCK 1432 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None)) 1433 bufio = self.tp(rawio) 1434 self.assertEqual(b"abcd", bufio.read(6)) 1435 self.assertEqual(b"e", bufio.read(1)) 1436 self.assertEqual(b"fg", bufio.read()) 1437 self.assertEqual(b"", bufio.peek(1)) 1438 self.assertIsNone(bufio.read()) 1439 self.assertEqual(b"", bufio.read()) 1440 1441 rawio = self.MockRawIO((b"a", None, None)) 1442 self.assertEqual(b"a", rawio.readall()) 1443 self.assertIsNone(rawio.readall()) 1444 1445 def test_read_past_eof(self): 1446 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1447 bufio = self.tp(rawio) 1448 1449 self.assertEqual(b"abcdefg", bufio.read(9000)) 1450 1451 def test_read_all(self): 1452 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1453 bufio = self.tp(rawio) 1454 1455 self.assertEqual(b"abcdefg", bufio.read()) 1456 1457 @support.requires_resource('cpu') 1458 def test_threads(self): 1459 try: 1460 # Write out many bytes with exactly the same number of 0's, 1461 # 1's... 255's. This will help us check that concurrent reading 1462 # doesn't duplicate or forget contents. 1463 N = 1000 1464 l = list(range(256)) * N 1465 random.shuffle(l) 1466 s = bytes(bytearray(l)) 1467 with self.open(os_helper.TESTFN, "wb") as f: 1468 f.write(s) 1469 with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw: 1470 bufio = self.tp(raw, 8) 1471 errors = [] 1472 results = [] 1473 def f(): 1474 try: 1475 # Intra-buffer read then buffer-flushing read 1476 for n in cycle([1, 19]): 1477 s = bufio.read(n) 1478 if not s: 1479 break 1480 # list.append() is atomic 1481 results.append(s) 1482 except Exception as e: 1483 errors.append(e) 1484 raise 1485 threads = [threading.Thread(target=f) for x in range(20)] 1486 with threading_helper.start_threads(threads): 1487 time.sleep(0.02) # yield 1488 self.assertFalse(errors, 1489 "the following exceptions were caught: %r" % errors) 1490 s = b''.join(results) 1491 for i in range(256): 1492 c = bytes(bytearray([i])) 1493 self.assertEqual(s.count(c), N) 1494 finally: 1495 os_helper.unlink(os_helper.TESTFN) 1496 1497 def test_unseekable(self): 1498 bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) 1499 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1500 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1501 bufio.read(1) 1502 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1503 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1504 1505 def test_misbehaved_io(self): 1506 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1507 bufio = self.tp(rawio) 1508 self.assertRaises(OSError, bufio.seek, 0) 1509 self.assertRaises(OSError, bufio.tell) 1510 1511 # Silence destructor error 1512 bufio.close = lambda: None 1513 1514 def test_no_extraneous_read(self): 1515 # Issue #9550; when the raw IO object has satisfied the read request, 1516 # we should not issue any additional reads, otherwise it may block 1517 # (e.g. socket). 1518 bufsize = 16 1519 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2): 1520 rawio = self.MockRawIO([b"x" * n]) 1521 bufio = self.tp(rawio, bufsize) 1522 self.assertEqual(bufio.read(n), b"x" * n) 1523 # Simple case: one raw read is enough to satisfy the request. 1524 self.assertEqual(rawio._extraneous_reads, 0, 1525 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1526 # A more complex case where two raw reads are needed to satisfy 1527 # the request. 1528 rawio = self.MockRawIO([b"x" * (n - 1), b"x"]) 1529 bufio = self.tp(rawio, bufsize) 1530 self.assertEqual(bufio.read(n), b"x" * n) 1531 self.assertEqual(rawio._extraneous_reads, 0, 1532 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1533 1534 def test_read_on_closed(self): 1535 # Issue #23796 1536 b = io.BufferedReader(io.BytesIO(b"12")) 1537 b.read(1) 1538 b.close() 1539 self.assertRaises(ValueError, b.peek) 1540 self.assertRaises(ValueError, b.read1, 1) 1541 1542 def test_truncate_on_read_only(self): 1543 rawio = self.MockFileIO(b"abc") 1544 bufio = self.tp(rawio) 1545 self.assertFalse(bufio.writable()) 1546 self.assertRaises(self.UnsupportedOperation, bufio.truncate) 1547 self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0) 1548 1549 1550class CBufferedReaderTest(BufferedReaderTest, SizeofTest): 1551 tp = io.BufferedReader 1552 1553 @unittest.skipIf(MEMORY_SANITIZER or ADDRESS_SANITIZER, "sanitizer defaults to crashing " 1554 "instead of returning NULL for malloc failure.") 1555 def test_constructor(self): 1556 BufferedReaderTest.test_constructor(self) 1557 # The allocation can succeed on 32-bit builds, e.g. with more 1558 # than 2 GiB RAM and a 64-bit kernel. 1559 if sys.maxsize > 0x7FFFFFFF: 1560 rawio = self.MockRawIO() 1561 bufio = self.tp(rawio) 1562 self.assertRaises((OverflowError, MemoryError, ValueError), 1563 bufio.__init__, rawio, sys.maxsize) 1564 1565 def test_initialization(self): 1566 rawio = self.MockRawIO([b"abc"]) 1567 bufio = self.tp(rawio) 1568 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1569 self.assertRaises(ValueError, bufio.read) 1570 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1571 self.assertRaises(ValueError, bufio.read) 1572 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1573 self.assertRaises(ValueError, bufio.read) 1574 1575 def test_misbehaved_io_read(self): 1576 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1577 bufio = self.tp(rawio) 1578 # _pyio.BufferedReader seems to implement reading different, so that 1579 # checking this is not so easy. 1580 self.assertRaises(OSError, bufio.read, 10) 1581 1582 def test_garbage_collection(self): 1583 # C BufferedReader objects are collected. 1584 # The Python version has __del__, so it ends into gc.garbage instead 1585 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1586 with warnings_helper.check_warnings(('', ResourceWarning)): 1587 rawio = self.FileIO(os_helper.TESTFN, "w+b") 1588 f = self.tp(rawio) 1589 f.f = f 1590 wr = weakref.ref(f) 1591 del f 1592 support.gc_collect() 1593 self.assertIsNone(wr(), wr) 1594 1595 def test_args_error(self): 1596 # Issue #17275 1597 with self.assertRaisesRegex(TypeError, "BufferedReader"): 1598 self.tp(io.BytesIO(), 1024, 1024, 1024) 1599 1600 def test_bad_readinto_value(self): 1601 rawio = io.BufferedReader(io.BytesIO(b"12")) 1602 rawio.readinto = lambda buf: -1 1603 bufio = self.tp(rawio) 1604 with self.assertRaises(OSError) as cm: 1605 bufio.readline() 1606 self.assertIsNone(cm.exception.__cause__) 1607 1608 def test_bad_readinto_type(self): 1609 rawio = io.BufferedReader(io.BytesIO(b"12")) 1610 rawio.readinto = lambda buf: b'' 1611 bufio = self.tp(rawio) 1612 with self.assertRaises(OSError) as cm: 1613 bufio.readline() 1614 self.assertIsInstance(cm.exception.__cause__, TypeError) 1615 1616 1617class PyBufferedReaderTest(BufferedReaderTest): 1618 tp = pyio.BufferedReader 1619 1620 1621class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): 1622 write_mode = "wb" 1623 1624 def test_constructor(self): 1625 rawio = self.MockRawIO() 1626 bufio = self.tp(rawio) 1627 bufio.__init__(rawio) 1628 bufio.__init__(rawio, buffer_size=1024) 1629 bufio.__init__(rawio, buffer_size=16) 1630 self.assertEqual(3, bufio.write(b"abc")) 1631 bufio.flush() 1632 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1633 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1634 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1635 bufio.__init__(rawio) 1636 self.assertEqual(3, bufio.write(b"ghi")) 1637 bufio.flush() 1638 self.assertEqual(b"".join(rawio._write_stack), b"abcghi") 1639 1640 def test_uninitialized(self): 1641 bufio = self.tp.__new__(self.tp) 1642 del bufio 1643 bufio = self.tp.__new__(self.tp) 1644 self.assertRaisesRegex((ValueError, AttributeError), 1645 'uninitialized|has no attribute', 1646 bufio.write, b'') 1647 bufio.__init__(self.MockRawIO()) 1648 self.assertEqual(bufio.write(b''), 0) 1649 1650 def test_detach_flush(self): 1651 raw = self.MockRawIO() 1652 buf = self.tp(raw) 1653 buf.write(b"howdy!") 1654 self.assertFalse(raw._write_stack) 1655 buf.detach() 1656 self.assertEqual(raw._write_stack, [b"howdy!"]) 1657 1658 def test_write(self): 1659 # Write to the buffered IO but don't overflow the buffer. 1660 writer = self.MockRawIO() 1661 bufio = self.tp(writer, 8) 1662 bufio.write(b"abc") 1663 self.assertFalse(writer._write_stack) 1664 buffer = bytearray(b"def") 1665 bufio.write(buffer) 1666 buffer[:] = b"***" # Overwrite our copy of the data 1667 bufio.flush() 1668 self.assertEqual(b"".join(writer._write_stack), b"abcdef") 1669 1670 def test_write_overflow(self): 1671 writer = self.MockRawIO() 1672 bufio = self.tp(writer, 8) 1673 contents = b"abcdefghijklmnop" 1674 for n in range(0, len(contents), 3): 1675 bufio.write(contents[n:n+3]) 1676 flushed = b"".join(writer._write_stack) 1677 # At least (total - 8) bytes were implicitly flushed, perhaps more 1678 # depending on the implementation. 1679 self.assertTrue(flushed.startswith(contents[:-8]), flushed) 1680 1681 def check_writes(self, intermediate_func): 1682 # Lots of writes, test the flushed output is as expected. 1683 contents = bytes(range(256)) * 1000 1684 n = 0 1685 writer = self.MockRawIO() 1686 bufio = self.tp(writer, 13) 1687 # Generator of write sizes: repeat each N 15 times then proceed to N+1 1688 def gen_sizes(): 1689 for size in count(1): 1690 for i in range(15): 1691 yield size 1692 sizes = gen_sizes() 1693 while n < len(contents): 1694 size = min(next(sizes), len(contents) - n) 1695 self.assertEqual(bufio.write(contents[n:n+size]), size) 1696 intermediate_func(bufio) 1697 n += size 1698 bufio.flush() 1699 self.assertEqual(contents, b"".join(writer._write_stack)) 1700 1701 def test_writes(self): 1702 self.check_writes(lambda bufio: None) 1703 1704 def test_writes_and_flushes(self): 1705 self.check_writes(lambda bufio: bufio.flush()) 1706 1707 def test_writes_and_seeks(self): 1708 def _seekabs(bufio): 1709 pos = bufio.tell() 1710 bufio.seek(pos + 1, 0) 1711 bufio.seek(pos - 1, 0) 1712 bufio.seek(pos, 0) 1713 self.check_writes(_seekabs) 1714 def _seekrel(bufio): 1715 pos = bufio.seek(0, 1) 1716 bufio.seek(+1, 1) 1717 bufio.seek(-1, 1) 1718 bufio.seek(pos, 0) 1719 self.check_writes(_seekrel) 1720 1721 def test_writes_and_truncates(self): 1722 self.check_writes(lambda bufio: bufio.truncate(bufio.tell())) 1723 1724 def test_write_non_blocking(self): 1725 raw = self.MockNonBlockWriterIO() 1726 bufio = self.tp(raw, 8) 1727 1728 self.assertEqual(bufio.write(b"abcd"), 4) 1729 self.assertEqual(bufio.write(b"efghi"), 5) 1730 # 1 byte will be written, the rest will be buffered 1731 raw.block_on(b"k") 1732 self.assertEqual(bufio.write(b"jklmn"), 5) 1733 1734 # 8 bytes will be written, 8 will be buffered and the rest will be lost 1735 raw.block_on(b"0") 1736 try: 1737 bufio.write(b"opqrwxyz0123456789") 1738 except self.BlockingIOError as e: 1739 written = e.characters_written 1740 else: 1741 self.fail("BlockingIOError should have been raised") 1742 self.assertEqual(written, 16) 1743 self.assertEqual(raw.pop_written(), 1744 b"abcdefghijklmnopqrwxyz") 1745 1746 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9) 1747 s = raw.pop_written() 1748 # Previously buffered bytes were flushed 1749 self.assertTrue(s.startswith(b"01234567A"), s) 1750 1751 def test_write_and_rewind(self): 1752 raw = io.BytesIO() 1753 bufio = self.tp(raw, 4) 1754 self.assertEqual(bufio.write(b"abcdef"), 6) 1755 self.assertEqual(bufio.tell(), 6) 1756 bufio.seek(0, 0) 1757 self.assertEqual(bufio.write(b"XY"), 2) 1758 bufio.seek(6, 0) 1759 self.assertEqual(raw.getvalue(), b"XYcdef") 1760 self.assertEqual(bufio.write(b"123456"), 6) 1761 bufio.flush() 1762 self.assertEqual(raw.getvalue(), b"XYcdef123456") 1763 1764 def test_flush(self): 1765 writer = self.MockRawIO() 1766 bufio = self.tp(writer, 8) 1767 bufio.write(b"abc") 1768 bufio.flush() 1769 self.assertEqual(b"abc", writer._write_stack[0]) 1770 1771 def test_writelines(self): 1772 l = [b'ab', b'cd', b'ef'] 1773 writer = self.MockRawIO() 1774 bufio = self.tp(writer, 8) 1775 bufio.writelines(l) 1776 bufio.flush() 1777 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1778 1779 def test_writelines_userlist(self): 1780 l = UserList([b'ab', b'cd', b'ef']) 1781 writer = self.MockRawIO() 1782 bufio = self.tp(writer, 8) 1783 bufio.writelines(l) 1784 bufio.flush() 1785 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1786 1787 def test_writelines_error(self): 1788 writer = self.MockRawIO() 1789 bufio = self.tp(writer, 8) 1790 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3]) 1791 self.assertRaises(TypeError, bufio.writelines, None) 1792 self.assertRaises(TypeError, bufio.writelines, 'abc') 1793 1794 def test_destructor(self): 1795 writer = self.MockRawIO() 1796 bufio = self.tp(writer, 8) 1797 bufio.write(b"abc") 1798 del bufio 1799 support.gc_collect() 1800 self.assertEqual(b"abc", writer._write_stack[0]) 1801 1802 def test_truncate(self): 1803 # Truncate implicitly flushes the buffer. 1804 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1805 with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw: 1806 bufio = self.tp(raw, 8) 1807 bufio.write(b"abcdef") 1808 self.assertEqual(bufio.truncate(3), 3) 1809 self.assertEqual(bufio.tell(), 6) 1810 with self.open(os_helper.TESTFN, "rb", buffering=0) as f: 1811 self.assertEqual(f.read(), b"abc") 1812 1813 def test_truncate_after_write(self): 1814 # Ensure that truncate preserves the file position after 1815 # writes longer than the buffer size. 1816 # Issue: https://bugs.python.org/issue32228 1817 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1818 with self.open(os_helper.TESTFN, "wb") as f: 1819 # Fill with some buffer 1820 f.write(b'\x00' * 10000) 1821 buffer_sizes = [8192, 4096, 200] 1822 for buffer_size in buffer_sizes: 1823 with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f: 1824 f.write(b'\x00' * (buffer_size + 1)) 1825 # After write write_pos and write_end are set to 0 1826 f.read(1) 1827 # read operation makes sure that pos != raw_pos 1828 f.truncate() 1829 self.assertEqual(f.tell(), buffer_size + 2) 1830 1831 @support.requires_resource('cpu') 1832 def test_threads(self): 1833 try: 1834 # Write out many bytes from many threads and test they were 1835 # all flushed. 1836 N = 1000 1837 contents = bytes(range(256)) * N 1838 sizes = cycle([1, 19]) 1839 n = 0 1840 queue = deque() 1841 while n < len(contents): 1842 size = next(sizes) 1843 queue.append(contents[n:n+size]) 1844 n += size 1845 del contents 1846 # We use a real file object because it allows us to 1847 # exercise situations where the GIL is released before 1848 # writing the buffer to the raw streams. This is in addition 1849 # to concurrency issues due to switching threads in the middle 1850 # of Python code. 1851 with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw: 1852 bufio = self.tp(raw, 8) 1853 errors = [] 1854 def f(): 1855 try: 1856 while True: 1857 try: 1858 s = queue.popleft() 1859 except IndexError: 1860 return 1861 bufio.write(s) 1862 except Exception as e: 1863 errors.append(e) 1864 raise 1865 threads = [threading.Thread(target=f) for x in range(20)] 1866 with threading_helper.start_threads(threads): 1867 time.sleep(0.02) # yield 1868 self.assertFalse(errors, 1869 "the following exceptions were caught: %r" % errors) 1870 bufio.close() 1871 with self.open(os_helper.TESTFN, "rb") as f: 1872 s = f.read() 1873 for i in range(256): 1874 self.assertEqual(s.count(bytes([i])), N) 1875 finally: 1876 os_helper.unlink(os_helper.TESTFN) 1877 1878 def test_misbehaved_io(self): 1879 rawio = self.MisbehavedRawIO() 1880 bufio = self.tp(rawio, 5) 1881 self.assertRaises(OSError, bufio.seek, 0) 1882 self.assertRaises(OSError, bufio.tell) 1883 self.assertRaises(OSError, bufio.write, b"abcdef") 1884 1885 # Silence destructor error 1886 bufio.close = lambda: None 1887 1888 def test_max_buffer_size_removal(self): 1889 with self.assertRaises(TypeError): 1890 self.tp(self.MockRawIO(), 8, 12) 1891 1892 def test_write_error_on_close(self): 1893 raw = self.MockRawIO() 1894 def bad_write(b): 1895 raise OSError() 1896 raw.write = bad_write 1897 b = self.tp(raw) 1898 b.write(b'spam') 1899 self.assertRaises(OSError, b.close) # exception not swallowed 1900 self.assertTrue(b.closed) 1901 1902 def test_slow_close_from_thread(self): 1903 # Issue #31976 1904 rawio = self.SlowFlushRawIO() 1905 bufio = self.tp(rawio, 8) 1906 t = threading.Thread(target=bufio.close) 1907 t.start() 1908 rawio.in_flush.wait() 1909 self.assertRaises(ValueError, bufio.write, b'spam') 1910 self.assertTrue(bufio.closed) 1911 t.join() 1912 1913 1914 1915class CBufferedWriterTest(BufferedWriterTest, SizeofTest): 1916 tp = io.BufferedWriter 1917 1918 @unittest.skipIf(MEMORY_SANITIZER or ADDRESS_SANITIZER, "sanitizer defaults to crashing " 1919 "instead of returning NULL for malloc failure.") 1920 def test_constructor(self): 1921 BufferedWriterTest.test_constructor(self) 1922 # The allocation can succeed on 32-bit builds, e.g. with more 1923 # than 2 GiB RAM and a 64-bit kernel. 1924 if sys.maxsize > 0x7FFFFFFF: 1925 rawio = self.MockRawIO() 1926 bufio = self.tp(rawio) 1927 self.assertRaises((OverflowError, MemoryError, ValueError), 1928 bufio.__init__, rawio, sys.maxsize) 1929 1930 def test_initialization(self): 1931 rawio = self.MockRawIO() 1932 bufio = self.tp(rawio) 1933 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1934 self.assertRaises(ValueError, bufio.write, b"def") 1935 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1936 self.assertRaises(ValueError, bufio.write, b"def") 1937 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1938 self.assertRaises(ValueError, bufio.write, b"def") 1939 1940 def test_garbage_collection(self): 1941 # C BufferedWriter objects are collected, and collecting them flushes 1942 # all data to disk. 1943 # The Python version has __del__, so it ends into gc.garbage instead 1944 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1945 with warnings_helper.check_warnings(('', ResourceWarning)): 1946 rawio = self.FileIO(os_helper.TESTFN, "w+b") 1947 f = self.tp(rawio) 1948 f.write(b"123xxx") 1949 f.x = f 1950 wr = weakref.ref(f) 1951 del f 1952 support.gc_collect() 1953 self.assertIsNone(wr(), wr) 1954 with self.open(os_helper.TESTFN, "rb") as f: 1955 self.assertEqual(f.read(), b"123xxx") 1956 1957 def test_args_error(self): 1958 # Issue #17275 1959 with self.assertRaisesRegex(TypeError, "BufferedWriter"): 1960 self.tp(io.BytesIO(), 1024, 1024, 1024) 1961 1962 1963class PyBufferedWriterTest(BufferedWriterTest): 1964 tp = pyio.BufferedWriter 1965 1966class BufferedRWPairTest(unittest.TestCase): 1967 1968 def test_constructor(self): 1969 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1970 self.assertFalse(pair.closed) 1971 1972 def test_uninitialized(self): 1973 pair = self.tp.__new__(self.tp) 1974 del pair 1975 pair = self.tp.__new__(self.tp) 1976 self.assertRaisesRegex((ValueError, AttributeError), 1977 'uninitialized|has no attribute', 1978 pair.read, 0) 1979 self.assertRaisesRegex((ValueError, AttributeError), 1980 'uninitialized|has no attribute', 1981 pair.write, b'') 1982 pair.__init__(self.MockRawIO(), self.MockRawIO()) 1983 self.assertEqual(pair.read(0), b'') 1984 self.assertEqual(pair.write(b''), 0) 1985 1986 def test_detach(self): 1987 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1988 self.assertRaises(self.UnsupportedOperation, pair.detach) 1989 1990 def test_constructor_max_buffer_size_removal(self): 1991 with self.assertRaises(TypeError): 1992 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) 1993 1994 def test_constructor_with_not_readable(self): 1995 class NotReadable(MockRawIO): 1996 def readable(self): 1997 return False 1998 1999 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO()) 2000 2001 def test_constructor_with_not_writeable(self): 2002 class NotWriteable(MockRawIO): 2003 def writable(self): 2004 return False 2005 2006 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable()) 2007 2008 def test_read(self): 2009 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 2010 2011 self.assertEqual(pair.read(3), b"abc") 2012 self.assertEqual(pair.read(1), b"d") 2013 self.assertEqual(pair.read(), b"ef") 2014 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO()) 2015 self.assertEqual(pair.read(None), b"abc") 2016 2017 def test_readlines(self): 2018 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO()) 2019 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 2020 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 2021 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"]) 2022 2023 def test_read1(self): 2024 # .read1() is delegated to the underlying reader object, so this test 2025 # can be shallow. 2026 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 2027 2028 self.assertEqual(pair.read1(3), b"abc") 2029 self.assertEqual(pair.read1(), b"def") 2030 2031 def test_readinto(self): 2032 for method in ("readinto", "readinto1"): 2033 with self.subTest(method): 2034 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 2035 2036 data = byteslike(b'\0' * 5) 2037 self.assertEqual(getattr(pair, method)(data), 5) 2038 self.assertEqual(bytes(data), b"abcde") 2039 2040 def test_write(self): 2041 w = self.MockRawIO() 2042 pair = self.tp(self.MockRawIO(), w) 2043 2044 pair.write(b"abc") 2045 pair.flush() 2046 buffer = bytearray(b"def") 2047 pair.write(buffer) 2048 buffer[:] = b"***" # Overwrite our copy of the data 2049 pair.flush() 2050 self.assertEqual(w._write_stack, [b"abc", b"def"]) 2051 2052 def test_peek(self): 2053 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 2054 2055 self.assertTrue(pair.peek(3).startswith(b"abc")) 2056 self.assertEqual(pair.read(3), b"abc") 2057 2058 def test_readable(self): 2059 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2060 self.assertTrue(pair.readable()) 2061 2062 def test_writeable(self): 2063 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2064 self.assertTrue(pair.writable()) 2065 2066 def test_seekable(self): 2067 # BufferedRWPairs are never seekable, even if their readers and writers 2068 # are. 2069 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2070 self.assertFalse(pair.seekable()) 2071 2072 # .flush() is delegated to the underlying writer object and has been 2073 # tested in the test_write method. 2074 2075 def test_close_and_closed(self): 2076 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2077 self.assertFalse(pair.closed) 2078 pair.close() 2079 self.assertTrue(pair.closed) 2080 2081 def test_reader_close_error_on_close(self): 2082 def reader_close(): 2083 reader_non_existing 2084 reader = self.MockRawIO() 2085 reader.close = reader_close 2086 writer = self.MockRawIO() 2087 pair = self.tp(reader, writer) 2088 with self.assertRaises(NameError) as err: 2089 pair.close() 2090 self.assertIn('reader_non_existing', str(err.exception)) 2091 self.assertTrue(pair.closed) 2092 self.assertFalse(reader.closed) 2093 self.assertTrue(writer.closed) 2094 2095 # Silence destructor error 2096 reader.close = lambda: None 2097 2098 def test_writer_close_error_on_close(self): 2099 def writer_close(): 2100 writer_non_existing 2101 reader = self.MockRawIO() 2102 writer = self.MockRawIO() 2103 writer.close = writer_close 2104 pair = self.tp(reader, writer) 2105 with self.assertRaises(NameError) as err: 2106 pair.close() 2107 self.assertIn('writer_non_existing', str(err.exception)) 2108 self.assertFalse(pair.closed) 2109 self.assertTrue(reader.closed) 2110 self.assertFalse(writer.closed) 2111 2112 # Silence destructor error 2113 writer.close = lambda: None 2114 writer = None 2115 2116 # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception 2117 with support.catch_unraisable_exception(): 2118 # Ignore BufferedRWPair unraisable exception 2119 with support.catch_unraisable_exception(): 2120 pair = None 2121 support.gc_collect() 2122 support.gc_collect() 2123 2124 def test_reader_writer_close_error_on_close(self): 2125 def reader_close(): 2126 reader_non_existing 2127 def writer_close(): 2128 writer_non_existing 2129 reader = self.MockRawIO() 2130 reader.close = reader_close 2131 writer = self.MockRawIO() 2132 writer.close = writer_close 2133 pair = self.tp(reader, writer) 2134 with self.assertRaises(NameError) as err: 2135 pair.close() 2136 self.assertIn('reader_non_existing', str(err.exception)) 2137 self.assertIsInstance(err.exception.__context__, NameError) 2138 self.assertIn('writer_non_existing', str(err.exception.__context__)) 2139 self.assertFalse(pair.closed) 2140 self.assertFalse(reader.closed) 2141 self.assertFalse(writer.closed) 2142 2143 # Silence destructor error 2144 reader.close = lambda: None 2145 writer.close = lambda: None 2146 2147 def test_isatty(self): 2148 class SelectableIsAtty(MockRawIO): 2149 def __init__(self, isatty): 2150 MockRawIO.__init__(self) 2151 self._isatty = isatty 2152 2153 def isatty(self): 2154 return self._isatty 2155 2156 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False)) 2157 self.assertFalse(pair.isatty()) 2158 2159 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False)) 2160 self.assertTrue(pair.isatty()) 2161 2162 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True)) 2163 self.assertTrue(pair.isatty()) 2164 2165 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True)) 2166 self.assertTrue(pair.isatty()) 2167 2168 def test_weakref_clearing(self): 2169 brw = self.tp(self.MockRawIO(), self.MockRawIO()) 2170 ref = weakref.ref(brw) 2171 brw = None 2172 ref = None # Shouldn't segfault. 2173 2174class CBufferedRWPairTest(BufferedRWPairTest): 2175 tp = io.BufferedRWPair 2176 2177class PyBufferedRWPairTest(BufferedRWPairTest): 2178 tp = pyio.BufferedRWPair 2179 2180 2181class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): 2182 read_mode = "rb+" 2183 write_mode = "wb+" 2184 2185 def test_constructor(self): 2186 BufferedReaderTest.test_constructor(self) 2187 BufferedWriterTest.test_constructor(self) 2188 2189 def test_uninitialized(self): 2190 BufferedReaderTest.test_uninitialized(self) 2191 BufferedWriterTest.test_uninitialized(self) 2192 2193 def test_read_and_write(self): 2194 raw = self.MockRawIO((b"asdf", b"ghjk")) 2195 rw = self.tp(raw, 8) 2196 2197 self.assertEqual(b"as", rw.read(2)) 2198 rw.write(b"ddd") 2199 rw.write(b"eee") 2200 self.assertFalse(raw._write_stack) # Buffer writes 2201 self.assertEqual(b"ghjk", rw.read()) 2202 self.assertEqual(b"dddeee", raw._write_stack[0]) 2203 2204 def test_seek_and_tell(self): 2205 raw = self.BytesIO(b"asdfghjkl") 2206 rw = self.tp(raw) 2207 2208 self.assertEqual(b"as", rw.read(2)) 2209 self.assertEqual(2, rw.tell()) 2210 rw.seek(0, 0) 2211 self.assertEqual(b"asdf", rw.read(4)) 2212 2213 rw.write(b"123f") 2214 rw.seek(0, 0) 2215 self.assertEqual(b"asdf123fl", rw.read()) 2216 self.assertEqual(9, rw.tell()) 2217 rw.seek(-4, 2) 2218 self.assertEqual(5, rw.tell()) 2219 rw.seek(2, 1) 2220 self.assertEqual(7, rw.tell()) 2221 self.assertEqual(b"fl", rw.read(11)) 2222 rw.flush() 2223 self.assertEqual(b"asdf123fl", raw.getvalue()) 2224 2225 self.assertRaises(TypeError, rw.seek, 0.0) 2226 2227 def check_flush_and_read(self, read_func): 2228 raw = self.BytesIO(b"abcdefghi") 2229 bufio = self.tp(raw) 2230 2231 self.assertEqual(b"ab", read_func(bufio, 2)) 2232 bufio.write(b"12") 2233 self.assertEqual(b"ef", read_func(bufio, 2)) 2234 self.assertEqual(6, bufio.tell()) 2235 bufio.flush() 2236 self.assertEqual(6, bufio.tell()) 2237 self.assertEqual(b"ghi", read_func(bufio)) 2238 raw.seek(0, 0) 2239 raw.write(b"XYZ") 2240 # flush() resets the read buffer 2241 bufio.flush() 2242 bufio.seek(0, 0) 2243 self.assertEqual(b"XYZ", read_func(bufio, 3)) 2244 2245 def test_flush_and_read(self): 2246 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args)) 2247 2248 def test_flush_and_readinto(self): 2249 def _readinto(bufio, n=-1): 2250 b = bytearray(n if n >= 0 else 9999) 2251 n = bufio.readinto(b) 2252 return bytes(b[:n]) 2253 self.check_flush_and_read(_readinto) 2254 2255 def test_flush_and_peek(self): 2256 def _peek(bufio, n=-1): 2257 # This relies on the fact that the buffer can contain the whole 2258 # raw stream, otherwise peek() can return less. 2259 b = bufio.peek(n) 2260 if n != -1: 2261 b = b[:n] 2262 bufio.seek(len(b), 1) 2263 return b 2264 self.check_flush_and_read(_peek) 2265 2266 def test_flush_and_write(self): 2267 raw = self.BytesIO(b"abcdefghi") 2268 bufio = self.tp(raw) 2269 2270 bufio.write(b"123") 2271 bufio.flush() 2272 bufio.write(b"45") 2273 bufio.flush() 2274 bufio.seek(0, 0) 2275 self.assertEqual(b"12345fghi", raw.getvalue()) 2276 self.assertEqual(b"12345fghi", bufio.read()) 2277 2278 def test_threads(self): 2279 BufferedReaderTest.test_threads(self) 2280 BufferedWriterTest.test_threads(self) 2281 2282 def test_writes_and_peek(self): 2283 def _peek(bufio): 2284 bufio.peek(1) 2285 self.check_writes(_peek) 2286 def _peek(bufio): 2287 pos = bufio.tell() 2288 bufio.seek(-1, 1) 2289 bufio.peek(1) 2290 bufio.seek(pos, 0) 2291 self.check_writes(_peek) 2292 2293 def test_writes_and_reads(self): 2294 def _read(bufio): 2295 bufio.seek(-1, 1) 2296 bufio.read(1) 2297 self.check_writes(_read) 2298 2299 def test_writes_and_read1s(self): 2300 def _read1(bufio): 2301 bufio.seek(-1, 1) 2302 bufio.read1(1) 2303 self.check_writes(_read1) 2304 2305 def test_writes_and_readintos(self): 2306 def _read(bufio): 2307 bufio.seek(-1, 1) 2308 bufio.readinto(bytearray(1)) 2309 self.check_writes(_read) 2310 2311 def test_write_after_readahead(self): 2312 # Issue #6629: writing after the buffer was filled by readahead should 2313 # first rewind the raw stream. 2314 for overwrite_size in [1, 5]: 2315 raw = self.BytesIO(b"A" * 10) 2316 bufio = self.tp(raw, 4) 2317 # Trigger readahead 2318 self.assertEqual(bufio.read(1), b"A") 2319 self.assertEqual(bufio.tell(), 1) 2320 # Overwriting should rewind the raw stream if it needs so 2321 bufio.write(b"B" * overwrite_size) 2322 self.assertEqual(bufio.tell(), overwrite_size + 1) 2323 # If the write size was smaller than the buffer size, flush() and 2324 # check that rewind happens. 2325 bufio.flush() 2326 self.assertEqual(bufio.tell(), overwrite_size + 1) 2327 s = raw.getvalue() 2328 self.assertEqual(s, 2329 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size)) 2330 2331 def test_write_rewind_write(self): 2332 # Various combinations of reading / writing / seeking backwards / writing again 2333 def mutate(bufio, pos1, pos2): 2334 assert pos2 >= pos1 2335 # Fill the buffer 2336 bufio.seek(pos1) 2337 bufio.read(pos2 - pos1) 2338 bufio.write(b'\x02') 2339 # This writes earlier than the previous write, but still inside 2340 # the buffer. 2341 bufio.seek(pos1) 2342 bufio.write(b'\x01') 2343 2344 b = b"\x80\x81\x82\x83\x84" 2345 for i in range(0, len(b)): 2346 for j in range(i, len(b)): 2347 raw = self.BytesIO(b) 2348 bufio = self.tp(raw, 100) 2349 mutate(bufio, i, j) 2350 bufio.flush() 2351 expected = bytearray(b) 2352 expected[j] = 2 2353 expected[i] = 1 2354 self.assertEqual(raw.getvalue(), expected, 2355 "failed result for i=%d, j=%d" % (i, j)) 2356 2357 def test_truncate_after_read_or_write(self): 2358 raw = self.BytesIO(b"A" * 10) 2359 bufio = self.tp(raw, 100) 2360 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled 2361 self.assertEqual(bufio.truncate(), 2) 2362 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases 2363 self.assertEqual(bufio.truncate(), 4) 2364 2365 def test_misbehaved_io(self): 2366 BufferedReaderTest.test_misbehaved_io(self) 2367 BufferedWriterTest.test_misbehaved_io(self) 2368 2369 def test_interleaved_read_write(self): 2370 # Test for issue #12213 2371 with self.BytesIO(b'abcdefgh') as raw: 2372 with self.tp(raw, 100) as f: 2373 f.write(b"1") 2374 self.assertEqual(f.read(1), b'b') 2375 f.write(b'2') 2376 self.assertEqual(f.read1(1), b'd') 2377 f.write(b'3') 2378 buf = bytearray(1) 2379 f.readinto(buf) 2380 self.assertEqual(buf, b'f') 2381 f.write(b'4') 2382 self.assertEqual(f.peek(1), b'h') 2383 f.flush() 2384 self.assertEqual(raw.getvalue(), b'1b2d3f4h') 2385 2386 with self.BytesIO(b'abc') as raw: 2387 with self.tp(raw, 100) as f: 2388 self.assertEqual(f.read(1), b'a') 2389 f.write(b"2") 2390 self.assertEqual(f.read(1), b'c') 2391 f.flush() 2392 self.assertEqual(raw.getvalue(), b'a2c') 2393 2394 def test_interleaved_readline_write(self): 2395 with self.BytesIO(b'ab\ncdef\ng\n') as raw: 2396 with self.tp(raw) as f: 2397 f.write(b'1') 2398 self.assertEqual(f.readline(), b'b\n') 2399 f.write(b'2') 2400 self.assertEqual(f.readline(), b'def\n') 2401 f.write(b'3') 2402 self.assertEqual(f.readline(), b'\n') 2403 f.flush() 2404 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n') 2405 2406 # You can't construct a BufferedRandom over a non-seekable stream. 2407 test_unseekable = None 2408 2409 # writable() returns True, so there's no point to test it over 2410 # a writable stream. 2411 test_truncate_on_read_only = None 2412 2413 2414class CBufferedRandomTest(BufferedRandomTest, SizeofTest): 2415 tp = io.BufferedRandom 2416 2417 @unittest.skipIf(MEMORY_SANITIZER or ADDRESS_SANITIZER, "sanitizer defaults to crashing " 2418 "instead of returning NULL for malloc failure.") 2419 def test_constructor(self): 2420 BufferedRandomTest.test_constructor(self) 2421 # The allocation can succeed on 32-bit builds, e.g. with more 2422 # than 2 GiB RAM and a 64-bit kernel. 2423 if sys.maxsize > 0x7FFFFFFF: 2424 rawio = self.MockRawIO() 2425 bufio = self.tp(rawio) 2426 self.assertRaises((OverflowError, MemoryError, ValueError), 2427 bufio.__init__, rawio, sys.maxsize) 2428 2429 def test_garbage_collection(self): 2430 CBufferedReaderTest.test_garbage_collection(self) 2431 CBufferedWriterTest.test_garbage_collection(self) 2432 2433 def test_args_error(self): 2434 # Issue #17275 2435 with self.assertRaisesRegex(TypeError, "BufferedRandom"): 2436 self.tp(io.BytesIO(), 1024, 1024, 1024) 2437 2438 2439class PyBufferedRandomTest(BufferedRandomTest): 2440 tp = pyio.BufferedRandom 2441 2442 2443# To fully exercise seek/tell, the StatefulIncrementalDecoder has these 2444# properties: 2445# - A single output character can correspond to many bytes of input. 2446# - The number of input bytes to complete the character can be 2447# undetermined until the last input byte is received. 2448# - The number of input bytes can vary depending on previous input. 2449# - A single input byte can correspond to many characters of output. 2450# - The number of output characters can be undetermined until the 2451# last input byte is received. 2452# - The number of output characters can vary depending on previous input. 2453 2454class StatefulIncrementalDecoder(codecs.IncrementalDecoder): 2455 """ 2456 For testing seek/tell behavior with a stateful, buffering decoder. 2457 2458 Input is a sequence of words. Words may be fixed-length (length set 2459 by input) or variable-length (period-terminated). In variable-length 2460 mode, extra periods are ignored. Possible words are: 2461 - 'i' followed by a number sets the input length, I (maximum 99). 2462 When I is set to 0, words are space-terminated. 2463 - 'o' followed by a number sets the output length, O (maximum 99). 2464 - Any other word is converted into a word followed by a period on 2465 the output. The output word consists of the input word truncated 2466 or padded out with hyphens to make its length equal to O. If O 2467 is 0, the word is output verbatim without truncating or padding. 2468 I and O are initially set to 1. When I changes, any buffered input is 2469 re-scanned according to the new I. EOF also terminates the last word. 2470 """ 2471 2472 def __init__(self, errors='strict'): 2473 codecs.IncrementalDecoder.__init__(self, errors) 2474 self.reset() 2475 2476 def __repr__(self): 2477 return '<SID %x>' % id(self) 2478 2479 def reset(self): 2480 self.i = 1 2481 self.o = 1 2482 self.buffer = bytearray() 2483 2484 def getstate(self): 2485 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset() 2486 return bytes(self.buffer), i*100 + o 2487 2488 def setstate(self, state): 2489 buffer, io = state 2490 self.buffer = bytearray(buffer) 2491 i, o = divmod(io, 100) 2492 self.i, self.o = i ^ 1, o ^ 1 2493 2494 def decode(self, input, final=False): 2495 output = '' 2496 for b in input: 2497 if self.i == 0: # variable-length, terminated with period 2498 if b == ord('.'): 2499 if self.buffer: 2500 output += self.process_word() 2501 else: 2502 self.buffer.append(b) 2503 else: # fixed-length, terminate after self.i bytes 2504 self.buffer.append(b) 2505 if len(self.buffer) == self.i: 2506 output += self.process_word() 2507 if final and self.buffer: # EOF terminates the last word 2508 output += self.process_word() 2509 return output 2510 2511 def process_word(self): 2512 output = '' 2513 if self.buffer[0] == ord('i'): 2514 self.i = min(99, int(self.buffer[1:] or 0)) # set input length 2515 elif self.buffer[0] == ord('o'): 2516 self.o = min(99, int(self.buffer[1:] or 0)) # set output length 2517 else: 2518 output = self.buffer.decode('ascii') 2519 if len(output) < self.o: 2520 output += '-'*self.o # pad out with hyphens 2521 if self.o: 2522 output = output[:self.o] # truncate to output length 2523 output += '.' 2524 self.buffer = bytearray() 2525 return output 2526 2527 codecEnabled = False 2528 2529 2530# bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak 2531# when registering codecs and cleanup functions. 2532def lookupTestDecoder(name): 2533 if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder': 2534 latin1 = codecs.lookup('latin-1') 2535 return codecs.CodecInfo( 2536 name='test_decoder', encode=latin1.encode, decode=None, 2537 incrementalencoder=None, 2538 streamreader=None, streamwriter=None, 2539 incrementaldecoder=StatefulIncrementalDecoder) 2540 2541 2542class StatefulIncrementalDecoderTest(unittest.TestCase): 2543 """ 2544 Make sure the StatefulIncrementalDecoder actually works. 2545 """ 2546 2547 test_cases = [ 2548 # I=1, O=1 (fixed-length input == fixed-length output) 2549 (b'abcd', False, 'a.b.c.d.'), 2550 # I=0, O=0 (variable-length input, variable-length output) 2551 (b'oiabcd', True, 'abcd.'), 2552 # I=0, O=0 (should ignore extra periods) 2553 (b'oi...abcd...', True, 'abcd.'), 2554 # I=0, O=6 (variable-length input, fixed-length output) 2555 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'), 2556 # I=2, O=6 (fixed-length input < fixed-length output) 2557 (b'i.i2.o6xyz', True, 'xy----.z-----.'), 2558 # I=6, O=3 (fixed-length input > fixed-length output) 2559 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'), 2560 # I=0, then 3; O=29, then 15 (with longer output) 2561 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True, 2562 'a----------------------------.' + 2563 'b----------------------------.' + 2564 'cde--------------------------.' + 2565 'abcdefghijabcde.' + 2566 'a.b------------.' + 2567 '.c.------------.' + 2568 'd.e------------.' + 2569 'k--------------.' + 2570 'l--------------.' + 2571 'm--------------.') 2572 ] 2573 2574 def test_decoder(self): 2575 # Try a few one-shot test cases. 2576 for input, eof, output in self.test_cases: 2577 d = StatefulIncrementalDecoder() 2578 self.assertEqual(d.decode(input, eof), output) 2579 2580 # Also test an unfinished decode, followed by forcing EOF. 2581 d = StatefulIncrementalDecoder() 2582 self.assertEqual(d.decode(b'oiabcd'), '') 2583 self.assertEqual(d.decode(b'', 1), 'abcd.') 2584 2585class TextIOWrapperTest(unittest.TestCase): 2586 2587 def setUp(self): 2588 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" 2589 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") 2590 os_helper.unlink(os_helper.TESTFN) 2591 codecs.register(lookupTestDecoder) 2592 self.addCleanup(codecs.unregister, lookupTestDecoder) 2593 2594 def tearDown(self): 2595 os_helper.unlink(os_helper.TESTFN) 2596 2597 def test_constructor(self): 2598 r = self.BytesIO(b"\xc3\xa9\n\n") 2599 b = self.BufferedReader(r, 1000) 2600 t = self.TextIOWrapper(b, encoding="utf-8") 2601 t.__init__(b, encoding="latin-1", newline="\r\n") 2602 self.assertEqual(t.encoding, "latin-1") 2603 self.assertEqual(t.line_buffering, False) 2604 t.__init__(b, encoding="utf-8", line_buffering=True) 2605 self.assertEqual(t.encoding, "utf-8") 2606 self.assertEqual(t.line_buffering, True) 2607 self.assertEqual("\xe9\n", t.readline()) 2608 self.assertRaises(TypeError, t.__init__, b, encoding="utf-8", newline=42) 2609 self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy') 2610 2611 def test_uninitialized(self): 2612 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2613 del t 2614 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2615 self.assertRaises(Exception, repr, t) 2616 self.assertRaisesRegex((ValueError, AttributeError), 2617 'uninitialized|has no attribute', 2618 t.read, 0) 2619 t.__init__(self.MockRawIO(), encoding="utf-8") 2620 self.assertEqual(t.read(0), '') 2621 2622 def test_non_text_encoding_codecs_are_rejected(self): 2623 # Ensure the constructor complains if passed a codec that isn't 2624 # marked as a text encoding 2625 # http://bugs.python.org/issue20404 2626 r = self.BytesIO() 2627 b = self.BufferedWriter(r) 2628 with self.assertRaisesRegex(LookupError, "is not a text encoding"): 2629 self.TextIOWrapper(b, encoding="hex") 2630 2631 def test_detach(self): 2632 r = self.BytesIO() 2633 b = self.BufferedWriter(r) 2634 t = self.TextIOWrapper(b, encoding="ascii") 2635 self.assertIs(t.detach(), b) 2636 2637 t = self.TextIOWrapper(b, encoding="ascii") 2638 t.write("howdy") 2639 self.assertFalse(r.getvalue()) 2640 t.detach() 2641 self.assertEqual(r.getvalue(), b"howdy") 2642 self.assertRaises(ValueError, t.detach) 2643 2644 # Operations independent of the detached stream should still work 2645 repr(t) 2646 self.assertEqual(t.encoding, "ascii") 2647 self.assertEqual(t.errors, "strict") 2648 self.assertFalse(t.line_buffering) 2649 self.assertFalse(t.write_through) 2650 2651 def test_repr(self): 2652 raw = self.BytesIO("hello".encode("utf-8")) 2653 b = self.BufferedReader(raw) 2654 t = self.TextIOWrapper(b, encoding="utf-8") 2655 modname = self.TextIOWrapper.__module__ 2656 self.assertRegex(repr(t), 2657 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname) 2658 raw.name = "dummy" 2659 self.assertRegex(repr(t), 2660 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname) 2661 t.mode = "r" 2662 self.assertRegex(repr(t), 2663 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname) 2664 raw.name = b"dummy" 2665 self.assertRegex(repr(t), 2666 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname) 2667 2668 t.buffer.detach() 2669 repr(t) # Should not raise an exception 2670 2671 def test_recursive_repr(self): 2672 # Issue #25455 2673 raw = self.BytesIO() 2674 t = self.TextIOWrapper(raw, encoding="utf-8") 2675 with support.swap_attr(raw, 'name', t): 2676 try: 2677 repr(t) # Should not crash 2678 except RuntimeError: 2679 pass 2680 2681 def test_line_buffering(self): 2682 r = self.BytesIO() 2683 b = self.BufferedWriter(r, 1000) 2684 t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True) 2685 t.write("X") 2686 self.assertEqual(r.getvalue(), b"") # No flush happened 2687 t.write("Y\nZ") 2688 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed 2689 t.write("A\rB") 2690 self.assertEqual(r.getvalue(), b"XY\nZA\rB") 2691 2692 def test_reconfigure_line_buffering(self): 2693 r = self.BytesIO() 2694 b = self.BufferedWriter(r, 1000) 2695 t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False) 2696 t.write("AB\nC") 2697 self.assertEqual(r.getvalue(), b"") 2698 2699 t.reconfigure(line_buffering=True) # implicit flush 2700 self.assertEqual(r.getvalue(), b"AB\nC") 2701 t.write("DEF\nG") 2702 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") 2703 t.write("H") 2704 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") 2705 t.reconfigure(line_buffering=False) # implicit flush 2706 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") 2707 t.write("IJ") 2708 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") 2709 2710 # Keeping default value 2711 t.reconfigure() 2712 t.reconfigure(line_buffering=None) 2713 self.assertEqual(t.line_buffering, False) 2714 t.reconfigure(line_buffering=True) 2715 t.reconfigure() 2716 t.reconfigure(line_buffering=None) 2717 self.assertEqual(t.line_buffering, True) 2718 2719 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled") 2720 def test_default_encoding(self): 2721 old_environ = dict(os.environ) 2722 try: 2723 # try to get a user preferred encoding different than the current 2724 # locale encoding to check that TextIOWrapper() uses the current 2725 # locale encoding and not the user preferred encoding 2726 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'): 2727 if key in os.environ: 2728 del os.environ[key] 2729 2730 current_locale_encoding = locale.getpreferredencoding(False) 2731 b = self.BytesIO() 2732 with warnings.catch_warnings(): 2733 warnings.simplefilter("ignore", EncodingWarning) 2734 t = self.TextIOWrapper(b) 2735 self.assertEqual(t.encoding, current_locale_encoding) 2736 finally: 2737 os.environ.clear() 2738 os.environ.update(old_environ) 2739 2740 @support.cpython_only 2741 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled") 2742 def test_device_encoding(self): 2743 # Issue 15989 2744 import _testcapi 2745 b = self.BytesIO() 2746 b.fileno = lambda: _testcapi.INT_MAX + 1 2747 self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale") 2748 b.fileno = lambda: _testcapi.UINT_MAX + 1 2749 self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale") 2750 2751 def test_encoding(self): 2752 # Check the encoding attribute is always set, and valid 2753 b = self.BytesIO() 2754 t = self.TextIOWrapper(b, encoding="utf-8") 2755 self.assertEqual(t.encoding, "utf-8") 2756 with warnings.catch_warnings(): 2757 warnings.simplefilter("ignore", EncodingWarning) 2758 t = self.TextIOWrapper(b) 2759 self.assertIsNotNone(t.encoding) 2760 codecs.lookup(t.encoding) 2761 2762 def test_encoding_errors_reading(self): 2763 # (1) default 2764 b = self.BytesIO(b"abc\n\xff\n") 2765 t = self.TextIOWrapper(b, encoding="ascii") 2766 self.assertRaises(UnicodeError, t.read) 2767 # (2) explicit strict 2768 b = self.BytesIO(b"abc\n\xff\n") 2769 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2770 self.assertRaises(UnicodeError, t.read) 2771 # (3) ignore 2772 b = self.BytesIO(b"abc\n\xff\n") 2773 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") 2774 self.assertEqual(t.read(), "abc\n\n") 2775 # (4) replace 2776 b = self.BytesIO(b"abc\n\xff\n") 2777 t = self.TextIOWrapper(b, encoding="ascii", errors="replace") 2778 self.assertEqual(t.read(), "abc\n\ufffd\n") 2779 2780 def test_encoding_errors_writing(self): 2781 # (1) default 2782 b = self.BytesIO() 2783 t = self.TextIOWrapper(b, encoding="ascii") 2784 self.assertRaises(UnicodeError, t.write, "\xff") 2785 # (2) explicit strict 2786 b = self.BytesIO() 2787 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2788 self.assertRaises(UnicodeError, t.write, "\xff") 2789 # (3) ignore 2790 b = self.BytesIO() 2791 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", 2792 newline="\n") 2793 t.write("abc\xffdef\n") 2794 t.flush() 2795 self.assertEqual(b.getvalue(), b"abcdef\n") 2796 # (4) replace 2797 b = self.BytesIO() 2798 t = self.TextIOWrapper(b, encoding="ascii", errors="replace", 2799 newline="\n") 2800 t.write("abc\xffdef\n") 2801 t.flush() 2802 self.assertEqual(b.getvalue(), b"abc?def\n") 2803 2804 def test_newlines(self): 2805 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] 2806 2807 tests = [ 2808 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], 2809 [ '', input_lines ], 2810 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ], 2811 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], 2812 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], 2813 ] 2814 encodings = ( 2815 'utf-8', 'latin-1', 2816 'utf-16', 'utf-16-le', 'utf-16-be', 2817 'utf-32', 'utf-32-le', 'utf-32-be', 2818 ) 2819 2820 # Try a range of buffer sizes to test the case where \r is the last 2821 # character in TextIOWrapper._pending_line. 2822 for encoding in encodings: 2823 # XXX: str.encode() should return bytes 2824 data = bytes(''.join(input_lines).encode(encoding)) 2825 for do_reads in (False, True): 2826 for bufsize in range(1, 10): 2827 for newline, exp_lines in tests: 2828 bufio = self.BufferedReader(self.BytesIO(data), bufsize) 2829 textio = self.TextIOWrapper(bufio, newline=newline, 2830 encoding=encoding) 2831 if do_reads: 2832 got_lines = [] 2833 while True: 2834 c2 = textio.read(2) 2835 if c2 == '': 2836 break 2837 self.assertEqual(len(c2), 2) 2838 got_lines.append(c2 + textio.readline()) 2839 else: 2840 got_lines = list(textio) 2841 2842 for got_line, exp_line in zip(got_lines, exp_lines): 2843 self.assertEqual(got_line, exp_line) 2844 self.assertEqual(len(got_lines), len(exp_lines)) 2845 2846 def test_newlines_input(self): 2847 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" 2848 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") 2849 for newline, expected in [ 2850 (None, normalized.decode("ascii").splitlines(keepends=True)), 2851 ("", testdata.decode("ascii").splitlines(keepends=True)), 2852 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2853 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2854 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), 2855 ]: 2856 buf = self.BytesIO(testdata) 2857 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2858 self.assertEqual(txt.readlines(), expected) 2859 txt.seek(0) 2860 self.assertEqual(txt.read(), "".join(expected)) 2861 2862 def test_newlines_output(self): 2863 testdict = { 2864 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2865 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2866 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", 2867 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", 2868 } 2869 tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) 2870 for newline, expected in tests: 2871 buf = self.BytesIO() 2872 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2873 txt.write("AAA\nB") 2874 txt.write("BB\nCCC\n") 2875 txt.write("X\rY\r\nZ") 2876 txt.flush() 2877 self.assertEqual(buf.closed, False) 2878 self.assertEqual(buf.getvalue(), expected) 2879 2880 def test_destructor(self): 2881 l = [] 2882 base = self.BytesIO 2883 class MyBytesIO(base): 2884 def close(self): 2885 l.append(self.getvalue()) 2886 base.close(self) 2887 b = MyBytesIO() 2888 t = self.TextIOWrapper(b, encoding="ascii") 2889 t.write("abc") 2890 del t 2891 support.gc_collect() 2892 self.assertEqual([b"abc"], l) 2893 2894 def test_override_destructor(self): 2895 record = [] 2896 class MyTextIO(self.TextIOWrapper): 2897 def __del__(self): 2898 record.append(1) 2899 try: 2900 f = super().__del__ 2901 except AttributeError: 2902 pass 2903 else: 2904 f() 2905 def close(self): 2906 record.append(2) 2907 super().close() 2908 def flush(self): 2909 record.append(3) 2910 super().flush() 2911 b = self.BytesIO() 2912 t = MyTextIO(b, encoding="ascii") 2913 del t 2914 support.gc_collect() 2915 self.assertEqual(record, [1, 2, 3]) 2916 2917 def test_error_through_destructor(self): 2918 # Test that the exception state is not modified by a destructor, 2919 # even if close() fails. 2920 rawio = self.CloseFailureIO() 2921 with support.catch_unraisable_exception() as cm: 2922 with self.assertRaises(AttributeError): 2923 self.TextIOWrapper(rawio, encoding="utf-8").xyzzy 2924 2925 if not IOBASE_EMITS_UNRAISABLE: 2926 self.assertIsNone(cm.unraisable) 2927 elif cm.unraisable is not None: 2928 self.assertEqual(cm.unraisable.exc_type, OSError) 2929 2930 # Systematic tests of the text I/O API 2931 2932 def test_basic_io(self): 2933 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): 2934 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le": 2935 f = self.open(os_helper.TESTFN, "w+", encoding=enc) 2936 f._CHUNK_SIZE = chunksize 2937 self.assertEqual(f.write("abc"), 3) 2938 f.close() 2939 f = self.open(os_helper.TESTFN, "r+", encoding=enc) 2940 f._CHUNK_SIZE = chunksize 2941 self.assertEqual(f.tell(), 0) 2942 self.assertEqual(f.read(), "abc") 2943 cookie = f.tell() 2944 self.assertEqual(f.seek(0), 0) 2945 self.assertEqual(f.read(None), "abc") 2946 f.seek(0) 2947 self.assertEqual(f.read(2), "ab") 2948 self.assertEqual(f.read(1), "c") 2949 self.assertEqual(f.read(1), "") 2950 self.assertEqual(f.read(), "") 2951 self.assertEqual(f.tell(), cookie) 2952 self.assertEqual(f.seek(0), 0) 2953 self.assertEqual(f.seek(0, 2), cookie) 2954 self.assertEqual(f.write("def"), 3) 2955 self.assertEqual(f.seek(cookie), cookie) 2956 self.assertEqual(f.read(), "def") 2957 if enc.startswith("utf"): 2958 self.multi_line_test(f, enc) 2959 f.close() 2960 2961 def multi_line_test(self, f, enc): 2962 f.seek(0) 2963 f.truncate() 2964 sample = "s\xff\u0fff\uffff" 2965 wlines = [] 2966 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): 2967 chars = [] 2968 for i in range(size): 2969 chars.append(sample[i % len(sample)]) 2970 line = "".join(chars) + "\n" 2971 wlines.append((f.tell(), line)) 2972 f.write(line) 2973 f.seek(0) 2974 rlines = [] 2975 while True: 2976 pos = f.tell() 2977 line = f.readline() 2978 if not line: 2979 break 2980 rlines.append((pos, line)) 2981 self.assertEqual(rlines, wlines) 2982 2983 def test_telling(self): 2984 f = self.open(os_helper.TESTFN, "w+", encoding="utf-8") 2985 p0 = f.tell() 2986 f.write("\xff\n") 2987 p1 = f.tell() 2988 f.write("\xff\n") 2989 p2 = f.tell() 2990 f.seek(0) 2991 self.assertEqual(f.tell(), p0) 2992 self.assertEqual(f.readline(), "\xff\n") 2993 self.assertEqual(f.tell(), p1) 2994 self.assertEqual(f.readline(), "\xff\n") 2995 self.assertEqual(f.tell(), p2) 2996 f.seek(0) 2997 for line in f: 2998 self.assertEqual(line, "\xff\n") 2999 self.assertRaises(OSError, f.tell) 3000 self.assertEqual(f.tell(), p2) 3001 f.close() 3002 3003 def test_seeking(self): 3004 chunk_size = _default_chunk_size() 3005 prefix_size = chunk_size - 2 3006 u_prefix = "a" * prefix_size 3007 prefix = bytes(u_prefix.encode("utf-8")) 3008 self.assertEqual(len(u_prefix), len(prefix)) 3009 u_suffix = "\u8888\n" 3010 suffix = bytes(u_suffix.encode("utf-8")) 3011 line = prefix + suffix 3012 with self.open(os_helper.TESTFN, "wb") as f: 3013 f.write(line*2) 3014 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: 3015 s = f.read(prefix_size) 3016 self.assertEqual(s, str(prefix, "ascii")) 3017 self.assertEqual(f.tell(), prefix_size) 3018 self.assertEqual(f.readline(), u_suffix) 3019 3020 def test_seeking_too(self): 3021 # Regression test for a specific bug 3022 data = b'\xe0\xbf\xbf\n' 3023 with self.open(os_helper.TESTFN, "wb") as f: 3024 f.write(data) 3025 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: 3026 f._CHUNK_SIZE # Just test that it exists 3027 f._CHUNK_SIZE = 2 3028 f.readline() 3029 f.tell() 3030 3031 def test_seek_and_tell(self): 3032 #Test seek/tell using the StatefulIncrementalDecoder. 3033 # Make test faster by doing smaller seeks 3034 CHUNK_SIZE = 128 3035 3036 def test_seek_and_tell_with_data(data, min_pos=0): 3037 """Tell/seek to various points within a data stream and ensure 3038 that the decoded data returned by read() is consistent.""" 3039 f = self.open(os_helper.TESTFN, 'wb') 3040 f.write(data) 3041 f.close() 3042 f = self.open(os_helper.TESTFN, encoding='test_decoder') 3043 f._CHUNK_SIZE = CHUNK_SIZE 3044 decoded = f.read() 3045 f.close() 3046 3047 for i in range(min_pos, len(decoded) + 1): # seek positions 3048 for j in [1, 5, len(decoded) - i]: # read lengths 3049 f = self.open(os_helper.TESTFN, encoding='test_decoder') 3050 self.assertEqual(f.read(i), decoded[:i]) 3051 cookie = f.tell() 3052 self.assertEqual(f.read(j), decoded[i:i + j]) 3053 f.seek(cookie) 3054 self.assertEqual(f.read(), decoded[i:]) 3055 f.close() 3056 3057 # Enable the test decoder. 3058 StatefulIncrementalDecoder.codecEnabled = 1 3059 3060 # Run the tests. 3061 try: 3062 # Try each test case. 3063 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 3064 test_seek_and_tell_with_data(input) 3065 3066 # Position each test case so that it crosses a chunk boundary. 3067 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 3068 offset = CHUNK_SIZE - len(input)//2 3069 prefix = b'.'*offset 3070 # Don't bother seeking into the prefix (takes too long). 3071 min_pos = offset*2 3072 test_seek_and_tell_with_data(prefix + input, min_pos) 3073 3074 # Ensure our test decoder won't interfere with subsequent tests. 3075 finally: 3076 StatefulIncrementalDecoder.codecEnabled = 0 3077 3078 def test_multibyte_seek_and_tell(self): 3079 f = self.open(os_helper.TESTFN, "w", encoding="euc_jp") 3080 f.write("AB\n\u3046\u3048\n") 3081 f.close() 3082 3083 f = self.open(os_helper.TESTFN, "r", encoding="euc_jp") 3084 self.assertEqual(f.readline(), "AB\n") 3085 p0 = f.tell() 3086 self.assertEqual(f.readline(), "\u3046\u3048\n") 3087 p1 = f.tell() 3088 f.seek(p0) 3089 self.assertEqual(f.readline(), "\u3046\u3048\n") 3090 self.assertEqual(f.tell(), p1) 3091 f.close() 3092 3093 def test_seek_with_encoder_state(self): 3094 f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004") 3095 f.write("\u00e6\u0300") 3096 p0 = f.tell() 3097 f.write("\u00e6") 3098 f.seek(p0) 3099 f.write("\u0300") 3100 f.close() 3101 3102 f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004") 3103 self.assertEqual(f.readline(), "\u00e6\u0300\u0300") 3104 f.close() 3105 3106 def test_encoded_writes(self): 3107 data = "1234567890" 3108 tests = ("utf-16", 3109 "utf-16-le", 3110 "utf-16-be", 3111 "utf-32", 3112 "utf-32-le", 3113 "utf-32-be") 3114 for encoding in tests: 3115 buf = self.BytesIO() 3116 f = self.TextIOWrapper(buf, encoding=encoding) 3117 # Check if the BOM is written only once (see issue1753). 3118 f.write(data) 3119 f.write(data) 3120 f.seek(0) 3121 self.assertEqual(f.read(), data * 2) 3122 f.seek(0) 3123 self.assertEqual(f.read(), data * 2) 3124 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding)) 3125 3126 def test_unreadable(self): 3127 class UnReadable(self.BytesIO): 3128 def readable(self): 3129 return False 3130 txt = self.TextIOWrapper(UnReadable(), encoding="utf-8") 3131 self.assertRaises(OSError, txt.read) 3132 3133 def test_read_one_by_one(self): 3134 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8") 3135 reads = "" 3136 while True: 3137 c = txt.read(1) 3138 if not c: 3139 break 3140 reads += c 3141 self.assertEqual(reads, "AA\nBB") 3142 3143 def test_readlines(self): 3144 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8") 3145 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) 3146 txt.seek(0) 3147 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) 3148 txt.seek(0) 3149 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"]) 3150 3151 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. 3152 def test_read_by_chunk(self): 3153 # make sure "\r\n" straddles 128 char boundary. 3154 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8") 3155 reads = "" 3156 while True: 3157 c = txt.read(128) 3158 if not c: 3159 break 3160 reads += c 3161 self.assertEqual(reads, "A"*127+"\nB") 3162 3163 def test_writelines(self): 3164 l = ['ab', 'cd', 'ef'] 3165 buf = self.BytesIO() 3166 txt = self.TextIOWrapper(buf, encoding="utf-8") 3167 txt.writelines(l) 3168 txt.flush() 3169 self.assertEqual(buf.getvalue(), b'abcdef') 3170 3171 def test_writelines_userlist(self): 3172 l = UserList(['ab', 'cd', 'ef']) 3173 buf = self.BytesIO() 3174 txt = self.TextIOWrapper(buf, encoding="utf-8") 3175 txt.writelines(l) 3176 txt.flush() 3177 self.assertEqual(buf.getvalue(), b'abcdef') 3178 3179 def test_writelines_error(self): 3180 txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8") 3181 self.assertRaises(TypeError, txt.writelines, [1, 2, 3]) 3182 self.assertRaises(TypeError, txt.writelines, None) 3183 self.assertRaises(TypeError, txt.writelines, b'abc') 3184 3185 def test_issue1395_1(self): 3186 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3187 3188 # read one char at a time 3189 reads = "" 3190 while True: 3191 c = txt.read(1) 3192 if not c: 3193 break 3194 reads += c 3195 self.assertEqual(reads, self.normalized) 3196 3197 def test_issue1395_2(self): 3198 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3199 txt._CHUNK_SIZE = 4 3200 3201 reads = "" 3202 while True: 3203 c = txt.read(4) 3204 if not c: 3205 break 3206 reads += c 3207 self.assertEqual(reads, self.normalized) 3208 3209 def test_issue1395_3(self): 3210 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3211 txt._CHUNK_SIZE = 4 3212 3213 reads = txt.read(4) 3214 reads += txt.read(4) 3215 reads += txt.readline() 3216 reads += txt.readline() 3217 reads += txt.readline() 3218 self.assertEqual(reads, self.normalized) 3219 3220 def test_issue1395_4(self): 3221 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3222 txt._CHUNK_SIZE = 4 3223 3224 reads = txt.read(4) 3225 reads += txt.read() 3226 self.assertEqual(reads, self.normalized) 3227 3228 def test_issue1395_5(self): 3229 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3230 txt._CHUNK_SIZE = 4 3231 3232 reads = txt.read(4) 3233 pos = txt.tell() 3234 txt.seek(0) 3235 txt.seek(pos) 3236 self.assertEqual(txt.read(4), "BBB\n") 3237 3238 def test_issue2282(self): 3239 buffer = self.BytesIO(self.testdata) 3240 txt = self.TextIOWrapper(buffer, encoding="ascii") 3241 3242 self.assertEqual(buffer.seekable(), txt.seekable()) 3243 3244 def test_append_bom(self): 3245 # The BOM is not written again when appending to a non-empty file 3246 filename = os_helper.TESTFN 3247 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3248 with self.open(filename, 'w', encoding=charset) as f: 3249 f.write('aaa') 3250 pos = f.tell() 3251 with self.open(filename, 'rb') as f: 3252 self.assertEqual(f.read(), 'aaa'.encode(charset)) 3253 3254 with self.open(filename, 'a', encoding=charset) as f: 3255 f.write('xxx') 3256 with self.open(filename, 'rb') as f: 3257 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 3258 3259 def test_seek_bom(self): 3260 # Same test, but when seeking manually 3261 filename = os_helper.TESTFN 3262 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3263 with self.open(filename, 'w', encoding=charset) as f: 3264 f.write('aaa') 3265 pos = f.tell() 3266 with self.open(filename, 'r+', encoding=charset) as f: 3267 f.seek(pos) 3268 f.write('zzz') 3269 f.seek(0) 3270 f.write('bbb') 3271 with self.open(filename, 'rb') as f: 3272 self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) 3273 3274 def test_seek_append_bom(self): 3275 # Same test, but first seek to the start and then to the end 3276 filename = os_helper.TESTFN 3277 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3278 with self.open(filename, 'w', encoding=charset) as f: 3279 f.write('aaa') 3280 with self.open(filename, 'a', encoding=charset) as f: 3281 f.seek(0) 3282 f.seek(0, self.SEEK_END) 3283 f.write('xxx') 3284 with self.open(filename, 'rb') as f: 3285 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 3286 3287 def test_errors_property(self): 3288 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: 3289 self.assertEqual(f.errors, "strict") 3290 with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f: 3291 self.assertEqual(f.errors, "replace") 3292 3293 @support.no_tracing 3294 def test_threads_write(self): 3295 # Issue6750: concurrent writes could duplicate data 3296 event = threading.Event() 3297 with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f: 3298 def run(n): 3299 text = "Thread%03d\n" % n 3300 event.wait() 3301 f.write(text) 3302 threads = [threading.Thread(target=run, args=(x,)) 3303 for x in range(20)] 3304 with threading_helper.start_threads(threads, event.set): 3305 time.sleep(0.02) 3306 with self.open(os_helper.TESTFN, encoding="utf-8") as f: 3307 content = f.read() 3308 for n in range(20): 3309 self.assertEqual(content.count("Thread%03d\n" % n), 1) 3310 3311 def test_flush_error_on_close(self): 3312 # Test that text file is closed despite failed flush 3313 # and that flush() is called before file closed. 3314 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3315 closed = [] 3316 def bad_flush(): 3317 closed[:] = [txt.closed, txt.buffer.closed] 3318 raise OSError() 3319 txt.flush = bad_flush 3320 self.assertRaises(OSError, txt.close) # exception not swallowed 3321 self.assertTrue(txt.closed) 3322 self.assertTrue(txt.buffer.closed) 3323 self.assertTrue(closed) # flush() called 3324 self.assertFalse(closed[0]) # flush() called before file closed 3325 self.assertFalse(closed[1]) 3326 txt.flush = lambda: None # break reference loop 3327 3328 def test_close_error_on_close(self): 3329 buffer = self.BytesIO(self.testdata) 3330 def bad_flush(): 3331 raise OSError('flush') 3332 def bad_close(): 3333 raise OSError('close') 3334 buffer.close = bad_close 3335 txt = self.TextIOWrapper(buffer, encoding="ascii") 3336 txt.flush = bad_flush 3337 with self.assertRaises(OSError) as err: # exception not swallowed 3338 txt.close() 3339 self.assertEqual(err.exception.args, ('close',)) 3340 self.assertIsInstance(err.exception.__context__, OSError) 3341 self.assertEqual(err.exception.__context__.args, ('flush',)) 3342 self.assertFalse(txt.closed) 3343 3344 # Silence destructor error 3345 buffer.close = lambda: None 3346 txt.flush = lambda: None 3347 3348 def test_nonnormalized_close_error_on_close(self): 3349 # Issue #21677 3350 buffer = self.BytesIO(self.testdata) 3351 def bad_flush(): 3352 raise non_existing_flush 3353 def bad_close(): 3354 raise non_existing_close 3355 buffer.close = bad_close 3356 txt = self.TextIOWrapper(buffer, encoding="ascii") 3357 txt.flush = bad_flush 3358 with self.assertRaises(NameError) as err: # exception not swallowed 3359 txt.close() 3360 self.assertIn('non_existing_close', str(err.exception)) 3361 self.assertIsInstance(err.exception.__context__, NameError) 3362 self.assertIn('non_existing_flush', str(err.exception.__context__)) 3363 self.assertFalse(txt.closed) 3364 3365 # Silence destructor error 3366 buffer.close = lambda: None 3367 txt.flush = lambda: None 3368 3369 def test_multi_close(self): 3370 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3371 txt.close() 3372 txt.close() 3373 txt.close() 3374 self.assertRaises(ValueError, txt.flush) 3375 3376 def test_unseekable(self): 3377 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8") 3378 self.assertRaises(self.UnsupportedOperation, txt.tell) 3379 self.assertRaises(self.UnsupportedOperation, txt.seek, 0) 3380 3381 def test_readonly_attributes(self): 3382 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3383 buf = self.BytesIO(self.testdata) 3384 with self.assertRaises(AttributeError): 3385 txt.buffer = buf 3386 3387 def test_rawio(self): 3388 # Issue #12591: TextIOWrapper must work with raw I/O objects, so 3389 # that subprocess.Popen() can have the required unbuffered 3390 # semantics with universal_newlines=True. 3391 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) 3392 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3393 # Reads 3394 self.assertEqual(txt.read(4), 'abcd') 3395 self.assertEqual(txt.readline(), 'efghi\n') 3396 self.assertEqual(list(txt), ['jkl\n', 'opq\n']) 3397 3398 def test_rawio_write_through(self): 3399 # Issue #12591: with write_through=True, writes don't need a flush 3400 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) 3401 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n', 3402 write_through=True) 3403 txt.write('1') 3404 txt.write('23\n4') 3405 txt.write('5') 3406 self.assertEqual(b''.join(raw._write_stack), b'123\n45') 3407 3408 def test_bufio_write_through(self): 3409 # Issue #21396: write_through=True doesn't force a flush() 3410 # on the underlying binary buffered object. 3411 flush_called, write_called = [], [] 3412 class BufferedWriter(self.BufferedWriter): 3413 def flush(self, *args, **kwargs): 3414 flush_called.append(True) 3415 return super().flush(*args, **kwargs) 3416 def write(self, *args, **kwargs): 3417 write_called.append(True) 3418 return super().write(*args, **kwargs) 3419 3420 rawio = self.BytesIO() 3421 data = b"a" 3422 bufio = BufferedWriter(rawio, len(data)*2) 3423 textio = self.TextIOWrapper(bufio, encoding='ascii', 3424 write_through=True) 3425 # write to the buffered io but don't overflow the buffer 3426 text = data.decode('ascii') 3427 textio.write(text) 3428 3429 # buffer.flush is not called with write_through=True 3430 self.assertFalse(flush_called) 3431 # buffer.write *is* called with write_through=True 3432 self.assertTrue(write_called) 3433 self.assertEqual(rawio.getvalue(), b"") # no flush 3434 3435 write_called = [] # reset 3436 textio.write(text * 10) # total content is larger than bufio buffer 3437 self.assertTrue(write_called) 3438 self.assertEqual(rawio.getvalue(), data * 11) # all flushed 3439 3440 def test_reconfigure_write_through(self): 3441 raw = self.MockRawIO([]) 3442 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3443 t.write('1') 3444 t.reconfigure(write_through=True) # implied flush 3445 self.assertEqual(t.write_through, True) 3446 self.assertEqual(b''.join(raw._write_stack), b'1') 3447 t.write('23') 3448 self.assertEqual(b''.join(raw._write_stack), b'123') 3449 t.reconfigure(write_through=False) 3450 self.assertEqual(t.write_through, False) 3451 t.write('45') 3452 t.flush() 3453 self.assertEqual(b''.join(raw._write_stack), b'12345') 3454 # Keeping default value 3455 t.reconfigure() 3456 t.reconfigure(write_through=None) 3457 self.assertEqual(t.write_through, False) 3458 t.reconfigure(write_through=True) 3459 t.reconfigure() 3460 t.reconfigure(write_through=None) 3461 self.assertEqual(t.write_through, True) 3462 3463 def test_read_nonbytes(self): 3464 # Issue #17106 3465 # Crash when underlying read() returns non-bytes 3466 t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") 3467 self.assertRaises(TypeError, t.read, 1) 3468 t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") 3469 self.assertRaises(TypeError, t.readline) 3470 t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") 3471 self.assertRaises(TypeError, t.read) 3472 3473 def test_illegal_encoder(self): 3474 # Issue 31271: Calling write() while the return value of encoder's 3475 # encode() is invalid shouldn't cause an assertion failure. 3476 rot13 = codecs.lookup("rot13") 3477 with support.swap_attr(rot13, '_is_text_encoding', True): 3478 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13") 3479 self.assertRaises(TypeError, t.write, 'bar') 3480 3481 def test_illegal_decoder(self): 3482 # Issue #17106 3483 # Bypass the early encoding check added in issue 20404 3484 def _make_illegal_wrapper(): 3485 quopri = codecs.lookup("quopri") 3486 quopri._is_text_encoding = True 3487 try: 3488 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), 3489 newline='\n', encoding="quopri") 3490 finally: 3491 quopri._is_text_encoding = False 3492 return t 3493 # Crash when decoder returns non-string 3494 t = _make_illegal_wrapper() 3495 self.assertRaises(TypeError, t.read, 1) 3496 t = _make_illegal_wrapper() 3497 self.assertRaises(TypeError, t.readline) 3498 t = _make_illegal_wrapper() 3499 self.assertRaises(TypeError, t.read) 3500 3501 # Issue 31243: calling read() while the return value of decoder's 3502 # getstate() is invalid should neither crash the interpreter nor 3503 # raise a SystemError. 3504 def _make_very_illegal_wrapper(getstate_ret_val): 3505 class BadDecoder: 3506 def getstate(self): 3507 return getstate_ret_val 3508 def _get_bad_decoder(dummy): 3509 return BadDecoder() 3510 quopri = codecs.lookup("quopri") 3511 with support.swap_attr(quopri, 'incrementaldecoder', 3512 _get_bad_decoder): 3513 return _make_illegal_wrapper() 3514 t = _make_very_illegal_wrapper(42) 3515 self.assertRaises(TypeError, t.read, 42) 3516 t = _make_very_illegal_wrapper(()) 3517 self.assertRaises(TypeError, t.read, 42) 3518 t = _make_very_illegal_wrapper((1, 2)) 3519 self.assertRaises(TypeError, t.read, 42) 3520 3521 def _check_create_at_shutdown(self, **kwargs): 3522 # Issue #20037: creating a TextIOWrapper at shutdown 3523 # shouldn't crash the interpreter. 3524 iomod = self.io.__name__ 3525 code = """if 1: 3526 import codecs 3527 import {iomod} as io 3528 3529 # Avoid looking up codecs at shutdown 3530 codecs.lookup('utf-8') 3531 3532 class C: 3533 def __init__(self): 3534 self.buf = io.BytesIO() 3535 def __del__(self): 3536 io.TextIOWrapper(self.buf, **{kwargs}) 3537 print("ok") 3538 c = C() 3539 """.format(iomod=iomod, kwargs=kwargs) 3540 return assert_python_ok("-c", code) 3541 3542 def test_create_at_shutdown_without_encoding(self): 3543 rc, out, err = self._check_create_at_shutdown() 3544 if err: 3545 # Can error out with a RuntimeError if the module state 3546 # isn't found. 3547 self.assertIn(self.shutdown_error, err.decode()) 3548 else: 3549 self.assertEqual("ok", out.decode().strip()) 3550 3551 def test_create_at_shutdown_with_encoding(self): 3552 rc, out, err = self._check_create_at_shutdown(encoding='utf-8', 3553 errors='strict') 3554 self.assertFalse(err) 3555 self.assertEqual("ok", out.decode().strip()) 3556 3557 def test_read_byteslike(self): 3558 r = MemviewBytesIO(b'Just some random string\n') 3559 t = self.TextIOWrapper(r, 'utf-8') 3560 3561 # TextIOwrapper will not read the full string, because 3562 # we truncate it to a multiple of the native int size 3563 # so that we can construct a more complex memoryview. 3564 bytes_val = _to_memoryview(r.getvalue()).tobytes() 3565 3566 self.assertEqual(t.read(200), bytes_val.decode('utf-8')) 3567 3568 def test_issue22849(self): 3569 class F(object): 3570 def readable(self): return True 3571 def writable(self): return True 3572 def seekable(self): return True 3573 3574 for i in range(10): 3575 try: 3576 self.TextIOWrapper(F(), encoding='utf-8') 3577 except Exception: 3578 pass 3579 3580 F.tell = lambda x: 0 3581 t = self.TextIOWrapper(F(), encoding='utf-8') 3582 3583 def test_reconfigure_encoding_read(self): 3584 # latin1 -> utf8 3585 # (latin1 can decode utf-8 encoded string) 3586 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8') 3587 raw = self.BytesIO(data) 3588 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') 3589 self.assertEqual(txt.readline(), 'abc\xe9\n') 3590 with self.assertRaises(self.UnsupportedOperation): 3591 txt.reconfigure(encoding='utf-8') 3592 with self.assertRaises(self.UnsupportedOperation): 3593 txt.reconfigure(newline=None) 3594 3595 def test_reconfigure_write_fromascii(self): 3596 # ascii has a specific encodefunc in the C implementation, 3597 # but utf-8-sig has not. Make sure that we get rid of the 3598 # cached encodefunc when we switch encoders. 3599 raw = self.BytesIO() 3600 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3601 txt.write('foo\n') 3602 txt.reconfigure(encoding='utf-8-sig') 3603 txt.write('\xe9\n') 3604 txt.flush() 3605 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n') 3606 3607 def test_reconfigure_write(self): 3608 # latin -> utf8 3609 raw = self.BytesIO() 3610 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') 3611 txt.write('abc\xe9\n') 3612 txt.reconfigure(encoding='utf-8') 3613 self.assertEqual(raw.getvalue(), b'abc\xe9\n') 3614 txt.write('d\xe9f\n') 3615 txt.flush() 3616 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n') 3617 3618 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of 3619 # the file 3620 raw = self.BytesIO() 3621 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3622 txt.write('abc\n') 3623 txt.reconfigure(encoding='utf-8-sig') 3624 txt.write('d\xe9f\n') 3625 txt.flush() 3626 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n') 3627 3628 def test_reconfigure_write_non_seekable(self): 3629 raw = self.BytesIO() 3630 raw.seekable = lambda: False 3631 raw.seek = None 3632 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3633 txt.write('abc\n') 3634 txt.reconfigure(encoding='utf-8-sig') 3635 txt.write('d\xe9f\n') 3636 txt.flush() 3637 3638 # If the raw stream is not seekable, there'll be a BOM 3639 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n') 3640 3641 def test_reconfigure_defaults(self): 3642 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n') 3643 txt.reconfigure(encoding=None) 3644 self.assertEqual(txt.encoding, 'ascii') 3645 self.assertEqual(txt.errors, 'replace') 3646 txt.write('LF\n') 3647 3648 txt.reconfigure(newline='\r\n') 3649 self.assertEqual(txt.encoding, 'ascii') 3650 self.assertEqual(txt.errors, 'replace') 3651 3652 txt.reconfigure(errors='ignore') 3653 self.assertEqual(txt.encoding, 'ascii') 3654 self.assertEqual(txt.errors, 'ignore') 3655 txt.write('CRLF\n') 3656 3657 txt.reconfigure(encoding='utf-8', newline=None) 3658 self.assertEqual(txt.errors, 'strict') 3659 txt.seek(0) 3660 self.assertEqual(txt.read(), 'LF\nCRLF\n') 3661 3662 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n') 3663 3664 def test_reconfigure_newline(self): 3665 raw = self.BytesIO(b'CR\rEOF') 3666 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3667 txt.reconfigure(newline=None) 3668 self.assertEqual(txt.readline(), 'CR\n') 3669 raw = self.BytesIO(b'CR\rEOF') 3670 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3671 txt.reconfigure(newline='') 3672 self.assertEqual(txt.readline(), 'CR\r') 3673 raw = self.BytesIO(b'CR\rLF\nEOF') 3674 txt = self.TextIOWrapper(raw, 'ascii', newline='\r') 3675 txt.reconfigure(newline='\n') 3676 self.assertEqual(txt.readline(), 'CR\rLF\n') 3677 raw = self.BytesIO(b'LF\nCR\rEOF') 3678 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3679 txt.reconfigure(newline='\r') 3680 self.assertEqual(txt.readline(), 'LF\nCR\r') 3681 raw = self.BytesIO(b'CR\rCRLF\r\nEOF') 3682 txt = self.TextIOWrapper(raw, 'ascii', newline='\r') 3683 txt.reconfigure(newline='\r\n') 3684 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n') 3685 3686 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r') 3687 txt.reconfigure(newline=None) 3688 txt.write('linesep\n') 3689 txt.reconfigure(newline='') 3690 txt.write('LF\n') 3691 txt.reconfigure(newline='\n') 3692 txt.write('LF\n') 3693 txt.reconfigure(newline='\r') 3694 txt.write('CR\n') 3695 txt.reconfigure(newline='\r\n') 3696 txt.write('CRLF\n') 3697 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n' 3698 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected) 3699 3700 def test_issue25862(self): 3701 # Assertion failures occurred in tell() after read() and write(). 3702 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') 3703 t.read(1) 3704 t.read() 3705 t.tell() 3706 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') 3707 t.read(1) 3708 t.write('x') 3709 t.tell() 3710 3711 3712class MemviewBytesIO(io.BytesIO): 3713 '''A BytesIO object whose read method returns memoryviews 3714 rather than bytes''' 3715 3716 def read1(self, len_): 3717 return _to_memoryview(super().read1(len_)) 3718 3719 def read(self, len_): 3720 return _to_memoryview(super().read(len_)) 3721 3722def _to_memoryview(buf): 3723 '''Convert bytes-object *buf* to a non-trivial memoryview''' 3724 3725 arr = array.array('i') 3726 idx = len(buf) - len(buf) % arr.itemsize 3727 arr.frombytes(buf[:idx]) 3728 return memoryview(arr) 3729 3730 3731class CTextIOWrapperTest(TextIOWrapperTest): 3732 io = io 3733 shutdown_error = "LookupError: unknown encoding: ascii" 3734 3735 def test_initialization(self): 3736 r = self.BytesIO(b"\xc3\xa9\n\n") 3737 b = self.BufferedReader(r, 1000) 3738 t = self.TextIOWrapper(b, encoding="utf-8") 3739 self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy') 3740 self.assertRaises(ValueError, t.read) 3741 3742 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 3743 self.assertRaises(Exception, repr, t) 3744 3745 def test_garbage_collection(self): 3746 # C TextIOWrapper objects are collected, and collecting them flushes 3747 # all data to disk. 3748 # The Python version has __del__, so it ends in gc.garbage instead. 3749 with warnings_helper.check_warnings(('', ResourceWarning)): 3750 rawio = io.FileIO(os_helper.TESTFN, "wb") 3751 b = self.BufferedWriter(rawio) 3752 t = self.TextIOWrapper(b, encoding="ascii") 3753 t.write("456def") 3754 t.x = t 3755 wr = weakref.ref(t) 3756 del t 3757 support.gc_collect() 3758 self.assertIsNone(wr(), wr) 3759 with self.open(os_helper.TESTFN, "rb") as f: 3760 self.assertEqual(f.read(), b"456def") 3761 3762 def test_rwpair_cleared_before_textio(self): 3763 # Issue 13070: TextIOWrapper's finalization would crash when called 3764 # after the reference to the underlying BufferedRWPair's writer got 3765 # cleared by the GC. 3766 for i in range(1000): 3767 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 3768 t1 = self.TextIOWrapper(b1, encoding="ascii") 3769 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 3770 t2 = self.TextIOWrapper(b2, encoding="ascii") 3771 # circular references 3772 t1.buddy = t2 3773 t2.buddy = t1 3774 support.gc_collect() 3775 3776 def test_del__CHUNK_SIZE_SystemError(self): 3777 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii') 3778 with self.assertRaises(AttributeError): 3779 del t._CHUNK_SIZE 3780 3781 def test_internal_buffer_size(self): 3782 # bpo-43260: TextIOWrapper's internal buffer should not store 3783 # data larger than chunk size. 3784 chunk_size = 8192 # default chunk size, updated later 3785 3786 class MockIO(self.MockRawIO): 3787 def write(self, data): 3788 if len(data) > chunk_size: 3789 raise RuntimeError 3790 return super().write(data) 3791 3792 buf = MockIO() 3793 t = self.TextIOWrapper(buf, encoding="ascii") 3794 chunk_size = t._CHUNK_SIZE 3795 t.write("abc") 3796 t.write("def") 3797 # default chunk size is 8192 bytes so t don't write data to buf. 3798 self.assertEqual([], buf._write_stack) 3799 3800 with self.assertRaises(RuntimeError): 3801 t.write("x"*(chunk_size+1)) 3802 3803 self.assertEqual([b"abcdef"], buf._write_stack) 3804 t.write("ghi") 3805 t.write("x"*chunk_size) 3806 self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack) 3807 3808 3809class PyTextIOWrapperTest(TextIOWrapperTest): 3810 io = pyio 3811 shutdown_error = "LookupError: unknown encoding: ascii" 3812 3813 3814class IncrementalNewlineDecoderTest(unittest.TestCase): 3815 3816 def check_newline_decoding_utf8(self, decoder): 3817 # UTF-8 specific tests for a newline decoder 3818 def _check_decode(b, s, **kwargs): 3819 # We exercise getstate() / setstate() as well as decode() 3820 state = decoder.getstate() 3821 self.assertEqual(decoder.decode(b, **kwargs), s) 3822 decoder.setstate(state) 3823 self.assertEqual(decoder.decode(b, **kwargs), s) 3824 3825 _check_decode(b'\xe8\xa2\x88', "\u8888") 3826 3827 _check_decode(b'\xe8', "") 3828 _check_decode(b'\xa2', "") 3829 _check_decode(b'\x88', "\u8888") 3830 3831 _check_decode(b'\xe8', "") 3832 _check_decode(b'\xa2', "") 3833 _check_decode(b'\x88', "\u8888") 3834 3835 _check_decode(b'\xe8', "") 3836 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True) 3837 3838 decoder.reset() 3839 _check_decode(b'\n', "\n") 3840 _check_decode(b'\r', "") 3841 _check_decode(b'', "\n", final=True) 3842 _check_decode(b'\r', "\n", final=True) 3843 3844 _check_decode(b'\r', "") 3845 _check_decode(b'a', "\na") 3846 3847 _check_decode(b'\r\r\n', "\n\n") 3848 _check_decode(b'\r', "") 3849 _check_decode(b'\r', "\n") 3850 _check_decode(b'\na', "\na") 3851 3852 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n") 3853 _check_decode(b'\xe8\xa2\x88', "\u8888") 3854 _check_decode(b'\n', "\n") 3855 _check_decode(b'\xe8\xa2\x88\r', "\u8888") 3856 _check_decode(b'\n', "\n") 3857 3858 def check_newline_decoding(self, decoder, encoding): 3859 result = [] 3860 if encoding is not None: 3861 encoder = codecs.getincrementalencoder(encoding)() 3862 def _decode_bytewise(s): 3863 # Decode one byte at a time 3864 for b in encoder.encode(s): 3865 result.append(decoder.decode(bytes([b]))) 3866 else: 3867 encoder = None 3868 def _decode_bytewise(s): 3869 # Decode one char at a time 3870 for c in s: 3871 result.append(decoder.decode(c)) 3872 self.assertEqual(decoder.newlines, None) 3873 _decode_bytewise("abc\n\r") 3874 self.assertEqual(decoder.newlines, '\n') 3875 _decode_bytewise("\nabc") 3876 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 3877 _decode_bytewise("abc\r") 3878 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 3879 _decode_bytewise("abc") 3880 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) 3881 _decode_bytewise("abc\r") 3882 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") 3883 decoder.reset() 3884 input = "abc" 3885 if encoder is not None: 3886 encoder.reset() 3887 input = encoder.encode(input) 3888 self.assertEqual(decoder.decode(input), "abc") 3889 self.assertEqual(decoder.newlines, None) 3890 3891 def test_newline_decoder(self): 3892 encodings = ( 3893 # None meaning the IncrementalNewlineDecoder takes unicode input 3894 # rather than bytes input 3895 None, 'utf-8', 'latin-1', 3896 'utf-16', 'utf-16-le', 'utf-16-be', 3897 'utf-32', 'utf-32-le', 'utf-32-be', 3898 ) 3899 for enc in encodings: 3900 decoder = enc and codecs.getincrementaldecoder(enc)() 3901 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 3902 self.check_newline_decoding(decoder, enc) 3903 decoder = codecs.getincrementaldecoder("utf-8")() 3904 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 3905 self.check_newline_decoding_utf8(decoder) 3906 self.assertRaises(TypeError, decoder.setstate, 42) 3907 3908 def test_newline_bytes(self): 3909 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder 3910 def _check(dec): 3911 self.assertEqual(dec.newlines, None) 3912 self.assertEqual(dec.decode("\u0D00"), "\u0D00") 3913 self.assertEqual(dec.newlines, None) 3914 self.assertEqual(dec.decode("\u0A00"), "\u0A00") 3915 self.assertEqual(dec.newlines, None) 3916 dec = self.IncrementalNewlineDecoder(None, translate=False) 3917 _check(dec) 3918 dec = self.IncrementalNewlineDecoder(None, translate=True) 3919 _check(dec) 3920 3921 def test_translate(self): 3922 # issue 35062 3923 for translate in (-2, -1, 1, 2): 3924 decoder = codecs.getincrementaldecoder("utf-8")() 3925 decoder = self.IncrementalNewlineDecoder(decoder, translate) 3926 self.check_newline_decoding_utf8(decoder) 3927 decoder = codecs.getincrementaldecoder("utf-8")() 3928 decoder = self.IncrementalNewlineDecoder(decoder, translate=0) 3929 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n") 3930 3931class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 3932 pass 3933 3934class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 3935 pass 3936 3937 3938# XXX Tests for open() 3939 3940class MiscIOTest(unittest.TestCase): 3941 3942 def tearDown(self): 3943 os_helper.unlink(os_helper.TESTFN) 3944 3945 def test___all__(self): 3946 for name in self.io.__all__: 3947 obj = getattr(self.io, name, None) 3948 self.assertIsNotNone(obj, name) 3949 if name in ("open", "open_code"): 3950 continue 3951 elif "error" in name.lower() or name == "UnsupportedOperation": 3952 self.assertTrue(issubclass(obj, Exception), name) 3953 elif not name.startswith("SEEK_"): 3954 self.assertTrue(issubclass(obj, self.IOBase)) 3955 3956 def test_attributes(self): 3957 f = self.open(os_helper.TESTFN, "wb", buffering=0) 3958 self.assertEqual(f.mode, "wb") 3959 f.close() 3960 3961 with warnings_helper.check_warnings(('', DeprecationWarning)): 3962 f = self.open(os_helper.TESTFN, "U", encoding="utf-8") 3963 self.assertEqual(f.name, os_helper.TESTFN) 3964 self.assertEqual(f.buffer.name, os_helper.TESTFN) 3965 self.assertEqual(f.buffer.raw.name, os_helper.TESTFN) 3966 self.assertEqual(f.mode, "U") 3967 self.assertEqual(f.buffer.mode, "rb") 3968 self.assertEqual(f.buffer.raw.mode, "rb") 3969 f.close() 3970 3971 f = self.open(os_helper.TESTFN, "w+", encoding="utf-8") 3972 self.assertEqual(f.mode, "w+") 3973 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter? 3974 self.assertEqual(f.buffer.raw.mode, "rb+") 3975 3976 g = self.open(f.fileno(), "wb", closefd=False) 3977 self.assertEqual(g.mode, "wb") 3978 self.assertEqual(g.raw.mode, "wb") 3979 self.assertEqual(g.name, f.fileno()) 3980 self.assertEqual(g.raw.name, f.fileno()) 3981 f.close() 3982 g.close() 3983 3984 def test_open_pipe_with_append(self): 3985 # bpo-27805: Ignore ESPIPE from lseek() in open(). 3986 r, w = os.pipe() 3987 self.addCleanup(os.close, r) 3988 f = self.open(w, 'a', encoding="utf-8") 3989 self.addCleanup(f.close) 3990 # Check that the file is marked non-seekable. On Windows, however, lseek 3991 # somehow succeeds on pipes. 3992 if sys.platform != 'win32': 3993 self.assertFalse(f.seekable()) 3994 3995 def test_io_after_close(self): 3996 for kwargs in [ 3997 {"mode": "w"}, 3998 {"mode": "wb"}, 3999 {"mode": "w", "buffering": 1}, 4000 {"mode": "w", "buffering": 2}, 4001 {"mode": "wb", "buffering": 0}, 4002 {"mode": "r"}, 4003 {"mode": "rb"}, 4004 {"mode": "r", "buffering": 1}, 4005 {"mode": "r", "buffering": 2}, 4006 {"mode": "rb", "buffering": 0}, 4007 {"mode": "w+"}, 4008 {"mode": "w+b"}, 4009 {"mode": "w+", "buffering": 1}, 4010 {"mode": "w+", "buffering": 2}, 4011 {"mode": "w+b", "buffering": 0}, 4012 ]: 4013 if "b" not in kwargs["mode"]: 4014 kwargs["encoding"] = "utf-8" 4015 f = self.open(os_helper.TESTFN, **kwargs) 4016 f.close() 4017 self.assertRaises(ValueError, f.flush) 4018 self.assertRaises(ValueError, f.fileno) 4019 self.assertRaises(ValueError, f.isatty) 4020 self.assertRaises(ValueError, f.__iter__) 4021 if hasattr(f, "peek"): 4022 self.assertRaises(ValueError, f.peek, 1) 4023 self.assertRaises(ValueError, f.read) 4024 if hasattr(f, "read1"): 4025 self.assertRaises(ValueError, f.read1, 1024) 4026 self.assertRaises(ValueError, f.read1) 4027 if hasattr(f, "readall"): 4028 self.assertRaises(ValueError, f.readall) 4029 if hasattr(f, "readinto"): 4030 self.assertRaises(ValueError, f.readinto, bytearray(1024)) 4031 if hasattr(f, "readinto1"): 4032 self.assertRaises(ValueError, f.readinto1, bytearray(1024)) 4033 self.assertRaises(ValueError, f.readline) 4034 self.assertRaises(ValueError, f.readlines) 4035 self.assertRaises(ValueError, f.readlines, 1) 4036 self.assertRaises(ValueError, f.seek, 0) 4037 self.assertRaises(ValueError, f.tell) 4038 self.assertRaises(ValueError, f.truncate) 4039 self.assertRaises(ValueError, f.write, 4040 b"" if "b" in kwargs['mode'] else "") 4041 self.assertRaises(ValueError, f.writelines, []) 4042 self.assertRaises(ValueError, next, f) 4043 4044 def test_blockingioerror(self): 4045 # Various BlockingIOError issues 4046 class C(str): 4047 pass 4048 c = C("") 4049 b = self.BlockingIOError(1, c) 4050 c.b = b 4051 b.c = c 4052 wr = weakref.ref(c) 4053 del c, b 4054 support.gc_collect() 4055 self.assertIsNone(wr(), wr) 4056 4057 def test_abcs(self): 4058 # Test the visible base classes are ABCs. 4059 self.assertIsInstance(self.IOBase, abc.ABCMeta) 4060 self.assertIsInstance(self.RawIOBase, abc.ABCMeta) 4061 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta) 4062 self.assertIsInstance(self.TextIOBase, abc.ABCMeta) 4063 4064 def _check_abc_inheritance(self, abcmodule): 4065 with self.open(os_helper.TESTFN, "wb", buffering=0) as f: 4066 self.assertIsInstance(f, abcmodule.IOBase) 4067 self.assertIsInstance(f, abcmodule.RawIOBase) 4068 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 4069 self.assertNotIsInstance(f, abcmodule.TextIOBase) 4070 with self.open(os_helper.TESTFN, "wb") as f: 4071 self.assertIsInstance(f, abcmodule.IOBase) 4072 self.assertNotIsInstance(f, abcmodule.RawIOBase) 4073 self.assertIsInstance(f, abcmodule.BufferedIOBase) 4074 self.assertNotIsInstance(f, abcmodule.TextIOBase) 4075 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: 4076 self.assertIsInstance(f, abcmodule.IOBase) 4077 self.assertNotIsInstance(f, abcmodule.RawIOBase) 4078 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 4079 self.assertIsInstance(f, abcmodule.TextIOBase) 4080 4081 def test_abc_inheritance(self): 4082 # Test implementations inherit from their respective ABCs 4083 self._check_abc_inheritance(self) 4084 4085 def test_abc_inheritance_official(self): 4086 # Test implementations inherit from the official ABCs of the 4087 # baseline "io" module. 4088 self._check_abc_inheritance(io) 4089 4090 def _check_warn_on_dealloc(self, *args, **kwargs): 4091 f = open(*args, **kwargs) 4092 r = repr(f) 4093 with self.assertWarns(ResourceWarning) as cm: 4094 f = None 4095 support.gc_collect() 4096 self.assertIn(r, str(cm.warning.args[0])) 4097 4098 def test_warn_on_dealloc(self): 4099 self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0) 4100 self._check_warn_on_dealloc(os_helper.TESTFN, "wb") 4101 self._check_warn_on_dealloc(os_helper.TESTFN, "w", encoding="utf-8") 4102 4103 def _check_warn_on_dealloc_fd(self, *args, **kwargs): 4104 fds = [] 4105 def cleanup_fds(): 4106 for fd in fds: 4107 try: 4108 os.close(fd) 4109 except OSError as e: 4110 if e.errno != errno.EBADF: 4111 raise 4112 self.addCleanup(cleanup_fds) 4113 r, w = os.pipe() 4114 fds += r, w 4115 self._check_warn_on_dealloc(r, *args, **kwargs) 4116 # When using closefd=False, there's no warning 4117 r, w = os.pipe() 4118 fds += r, w 4119 with warnings_helper.check_no_resource_warning(self): 4120 open(r, *args, closefd=False, **kwargs) 4121 4122 def test_warn_on_dealloc_fd(self): 4123 self._check_warn_on_dealloc_fd("rb", buffering=0) 4124 self._check_warn_on_dealloc_fd("rb") 4125 self._check_warn_on_dealloc_fd("r", encoding="utf-8") 4126 4127 4128 def test_pickling(self): 4129 # Pickling file objects is forbidden 4130 for kwargs in [ 4131 {"mode": "w"}, 4132 {"mode": "wb"}, 4133 {"mode": "wb", "buffering": 0}, 4134 {"mode": "r"}, 4135 {"mode": "rb"}, 4136 {"mode": "rb", "buffering": 0}, 4137 {"mode": "w+"}, 4138 {"mode": "w+b"}, 4139 {"mode": "w+b", "buffering": 0}, 4140 ]: 4141 if "b" not in kwargs["mode"]: 4142 kwargs["encoding"] = "utf-8" 4143 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 4144 with self.open(os_helper.TESTFN, **kwargs) as f: 4145 self.assertRaises(TypeError, pickle.dumps, f, protocol) 4146 4147 def test_nonblock_pipe_write_bigbuf(self): 4148 self._test_nonblock_pipe_write(16*1024) 4149 4150 def test_nonblock_pipe_write_smallbuf(self): 4151 self._test_nonblock_pipe_write(1024) 4152 4153 @unittest.skipUnless(hasattr(os, 'set_blocking'), 4154 'os.set_blocking() required for this test') 4155 def _test_nonblock_pipe_write(self, bufsize): 4156 sent = [] 4157 received = [] 4158 r, w = os.pipe() 4159 os.set_blocking(r, False) 4160 os.set_blocking(w, False) 4161 4162 # To exercise all code paths in the C implementation we need 4163 # to play with buffer sizes. For instance, if we choose a 4164 # buffer size less than or equal to _PIPE_BUF (4096 on Linux) 4165 # then we will never get a partial write of the buffer. 4166 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize) 4167 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize) 4168 4169 with rf, wf: 4170 for N in 9999, 73, 7574: 4171 try: 4172 i = 0 4173 while True: 4174 msg = bytes([i % 26 + 97]) * N 4175 sent.append(msg) 4176 wf.write(msg) 4177 i += 1 4178 4179 except self.BlockingIOError as e: 4180 self.assertEqual(e.args[0], errno.EAGAIN) 4181 self.assertEqual(e.args[2], e.characters_written) 4182 sent[-1] = sent[-1][:e.characters_written] 4183 received.append(rf.read()) 4184 msg = b'BLOCKED' 4185 wf.write(msg) 4186 sent.append(msg) 4187 4188 while True: 4189 try: 4190 wf.flush() 4191 break 4192 except self.BlockingIOError as e: 4193 self.assertEqual(e.args[0], errno.EAGAIN) 4194 self.assertEqual(e.args[2], e.characters_written) 4195 self.assertEqual(e.characters_written, 0) 4196 received.append(rf.read()) 4197 4198 received += iter(rf.read, None) 4199 4200 sent, received = b''.join(sent), b''.join(received) 4201 self.assertEqual(sent, received) 4202 self.assertTrue(wf.closed) 4203 self.assertTrue(rf.closed) 4204 4205 def test_create_fail(self): 4206 # 'x' mode fails if file is existing 4207 with self.open(os_helper.TESTFN, 'w', encoding="utf-8"): 4208 pass 4209 self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x', encoding="utf-8") 4210 4211 def test_create_writes(self): 4212 # 'x' mode opens for writing 4213 with self.open(os_helper.TESTFN, 'xb') as f: 4214 f.write(b"spam") 4215 with self.open(os_helper.TESTFN, 'rb') as f: 4216 self.assertEqual(b"spam", f.read()) 4217 4218 def test_open_allargs(self): 4219 # there used to be a buffer overflow in the parser for rawmode 4220 self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+', encoding="utf-8") 4221 4222 def test_check_encoding_errors(self): 4223 # bpo-37388: open() and TextIOWrapper must check encoding and errors 4224 # arguments in dev mode 4225 mod = self.io.__name__ 4226 filename = __file__ 4227 invalid = 'Boom, Shaka Laka, Boom!' 4228 code = textwrap.dedent(f''' 4229 import sys 4230 from {mod} import open, TextIOWrapper 4231 4232 try: 4233 open({filename!r}, encoding={invalid!r}) 4234 except LookupError: 4235 pass 4236 else: 4237 sys.exit(21) 4238 4239 try: 4240 open({filename!r}, errors={invalid!r}) 4241 except LookupError: 4242 pass 4243 else: 4244 sys.exit(22) 4245 4246 fp = open({filename!r}, "rb") 4247 with fp: 4248 try: 4249 TextIOWrapper(fp, encoding={invalid!r}) 4250 except LookupError: 4251 pass 4252 else: 4253 sys.exit(23) 4254 4255 try: 4256 TextIOWrapper(fp, errors={invalid!r}) 4257 except LookupError: 4258 pass 4259 else: 4260 sys.exit(24) 4261 4262 sys.exit(10) 4263 ''') 4264 proc = assert_python_failure('-X', 'dev', '-c', code) 4265 self.assertEqual(proc.rc, 10, proc) 4266 4267 def test_check_encoding_warning(self): 4268 # PEP 597: Raise warning when encoding is not specified 4269 # and sys.flags.warn_default_encoding is set. 4270 mod = self.io.__name__ 4271 filename = __file__ 4272 code = textwrap.dedent(f'''\ 4273 import sys 4274 from {mod} import open, TextIOWrapper 4275 import pathlib 4276 4277 with open({filename!r}) as f: # line 5 4278 pass 4279 4280 pathlib.Path({filename!r}).read_text() # line 8 4281 ''') 4282 proc = assert_python_ok('-X', 'warn_default_encoding', '-c', code) 4283 warnings = proc.err.splitlines() 4284 self.assertEqual(len(warnings), 2) 4285 self.assertTrue( 4286 warnings[0].startswith(b"<string>:5: EncodingWarning: ")) 4287 self.assertTrue( 4288 warnings[1].startswith(b"<string>:8: EncodingWarning: ")) 4289 4290 @support.cpython_only 4291 # Depending if OpenWrapper was already created or not, the warning is 4292 # emitted or not. For example, the attribute is already created when this 4293 # test is run multiple times. 4294 @warnings_helper.ignore_warnings(category=DeprecationWarning) 4295 def test_openwrapper(self): 4296 self.assertIs(self.io.OpenWrapper, self.io.open) 4297 4298 4299class CMiscIOTest(MiscIOTest): 4300 io = io 4301 4302 def test_readinto_buffer_overflow(self): 4303 # Issue #18025 4304 class BadReader(self.io.BufferedIOBase): 4305 def read(self, n=-1): 4306 return b'x' * 10**6 4307 bufio = BadReader() 4308 b = bytearray(2) 4309 self.assertRaises(ValueError, bufio.readinto, b) 4310 4311 def check_daemon_threads_shutdown_deadlock(self, stream_name): 4312 # Issue #23309: deadlocks at shutdown should be avoided when a 4313 # daemon thread and the main thread both write to a file. 4314 code = """if 1: 4315 import sys 4316 import time 4317 import threading 4318 from test.support import SuppressCrashReport 4319 4320 file = sys.{stream_name} 4321 4322 def run(): 4323 while True: 4324 file.write('.') 4325 file.flush() 4326 4327 crash = SuppressCrashReport() 4328 crash.__enter__() 4329 # don't call __exit__(): the crash occurs at Python shutdown 4330 4331 thread = threading.Thread(target=run) 4332 thread.daemon = True 4333 thread.start() 4334 4335 time.sleep(0.5) 4336 file.write('!') 4337 file.flush() 4338 """.format_map(locals()) 4339 res, _ = run_python_until_end("-c", code) 4340 err = res.err.decode() 4341 if res.rc != 0: 4342 # Failure: should be a fatal error 4343 pattern = (r"Fatal Python error: _enter_buffered_busy: " 4344 r"could not acquire lock " 4345 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> " 4346 r"at interpreter shutdown, possibly due to " 4347 r"daemon threads".format_map(locals())) 4348 self.assertRegex(err, pattern) 4349 else: 4350 self.assertFalse(err.strip('.!')) 4351 4352 def test_daemon_threads_shutdown_stdout_deadlock(self): 4353 self.check_daemon_threads_shutdown_deadlock('stdout') 4354 4355 def test_daemon_threads_shutdown_stderr_deadlock(self): 4356 self.check_daemon_threads_shutdown_deadlock('stderr') 4357 4358 4359class PyMiscIOTest(MiscIOTest): 4360 io = pyio 4361 4362 4363@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.') 4364class SignalsTest(unittest.TestCase): 4365 4366 def setUp(self): 4367 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 4368 4369 def tearDown(self): 4370 signal.signal(signal.SIGALRM, self.oldalrm) 4371 4372 def alarm_interrupt(self, sig, frame): 4373 1/0 4374 4375 def check_interrupted_write(self, item, bytes, **fdopen_kwargs): 4376 """Check that a partial write, when it gets interrupted, properly 4377 invokes the signal handler, and bubbles up the exception raised 4378 in the latter.""" 4379 4380 # XXX This test has three flaws that appear when objects are 4381 # XXX not reference counted. 4382 4383 # - if wio.write() happens to trigger a garbage collection, 4384 # the signal exception may be raised when some __del__ 4385 # method is running; it will not reach the assertRaises() 4386 # call. 4387 4388 # - more subtle, if the wio object is not destroyed at once 4389 # and survives this function, the next opened file is likely 4390 # to have the same fileno (since the file descriptor was 4391 # actively closed). When wio.__del__ is finally called, it 4392 # will close the other's test file... To trigger this with 4393 # CPython, try adding "global wio" in this function. 4394 4395 # - This happens only for streams created by the _pyio module, 4396 # because a wio.close() that fails still consider that the 4397 # file needs to be closed again. You can try adding an 4398 # "assert wio.closed" at the end of the function. 4399 4400 # Fortunately, a little gc.collect() seems to be enough to 4401 # work around all these issues. 4402 support.gc_collect() # For PyPy or other GCs. 4403 4404 read_results = [] 4405 def _read(): 4406 s = os.read(r, 1) 4407 read_results.append(s) 4408 4409 t = threading.Thread(target=_read) 4410 t.daemon = True 4411 r, w = os.pipe() 4412 fdopen_kwargs["closefd"] = False 4413 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1) 4414 try: 4415 wio = self.io.open(w, **fdopen_kwargs) 4416 if hasattr(signal, 'pthread_sigmask'): 4417 # create the thread with SIGALRM signal blocked 4418 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM]) 4419 t.start() 4420 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM]) 4421 else: 4422 t.start() 4423 4424 # Fill the pipe enough that the write will be blocking. 4425 # It will be interrupted by the timer armed above. Since the 4426 # other thread has read one byte, the low-level write will 4427 # return with a successful (partial) result rather than an EINTR. 4428 # The buffered IO layer must check for pending signal 4429 # handlers, which in this case will invoke alarm_interrupt(). 4430 signal.alarm(1) 4431 try: 4432 self.assertRaises(ZeroDivisionError, wio.write, large_data) 4433 finally: 4434 signal.alarm(0) 4435 t.join() 4436 # We got one byte, get another one and check that it isn't a 4437 # repeat of the first one. 4438 read_results.append(os.read(r, 1)) 4439 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]]) 4440 finally: 4441 os.close(w) 4442 os.close(r) 4443 # This is deliberate. If we didn't close the file descriptor 4444 # before closing wio, wio would try to flush its internal 4445 # buffer, and block again. 4446 try: 4447 wio.close() 4448 except OSError as e: 4449 if e.errno != errno.EBADF: 4450 raise 4451 4452 def test_interrupted_write_unbuffered(self): 4453 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0) 4454 4455 def test_interrupted_write_buffered(self): 4456 self.check_interrupted_write(b"xy", b"xy", mode="wb") 4457 4458 def test_interrupted_write_text(self): 4459 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") 4460 4461 @support.no_tracing 4462 def check_reentrant_write(self, data, **fdopen_kwargs): 4463 def on_alarm(*args): 4464 # Will be called reentrantly from the same thread 4465 wio.write(data) 4466 1/0 4467 signal.signal(signal.SIGALRM, on_alarm) 4468 r, w = os.pipe() 4469 wio = self.io.open(w, **fdopen_kwargs) 4470 try: 4471 signal.alarm(1) 4472 # Either the reentrant call to wio.write() fails with RuntimeError, 4473 # or the signal handler raises ZeroDivisionError. 4474 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm: 4475 while 1: 4476 for i in range(100): 4477 wio.write(data) 4478 wio.flush() 4479 # Make sure the buffer doesn't fill up and block further writes 4480 os.read(r, len(data) * 100) 4481 exc = cm.exception 4482 if isinstance(exc, RuntimeError): 4483 self.assertTrue(str(exc).startswith("reentrant call"), str(exc)) 4484 finally: 4485 signal.alarm(0) 4486 wio.close() 4487 os.close(r) 4488 4489 def test_reentrant_write_buffered(self): 4490 self.check_reentrant_write(b"xy", mode="wb") 4491 4492 def test_reentrant_write_text(self): 4493 self.check_reentrant_write("xy", mode="w", encoding="ascii") 4494 4495 def check_interrupted_read_retry(self, decode, **fdopen_kwargs): 4496 """Check that a buffered read, when it gets interrupted (either 4497 returning a partial result or EINTR), properly invokes the signal 4498 handler and retries if the latter returned successfully.""" 4499 r, w = os.pipe() 4500 fdopen_kwargs["closefd"] = False 4501 def alarm_handler(sig, frame): 4502 os.write(w, b"bar") 4503 signal.signal(signal.SIGALRM, alarm_handler) 4504 try: 4505 rio = self.io.open(r, **fdopen_kwargs) 4506 os.write(w, b"foo") 4507 signal.alarm(1) 4508 # Expected behaviour: 4509 # - first raw read() returns partial b"foo" 4510 # - second raw read() returns EINTR 4511 # - third raw read() returns b"bar" 4512 self.assertEqual(decode(rio.read(6)), "foobar") 4513 finally: 4514 signal.alarm(0) 4515 rio.close() 4516 os.close(w) 4517 os.close(r) 4518 4519 def test_interrupted_read_retry_buffered(self): 4520 self.check_interrupted_read_retry(lambda x: x.decode('latin1'), 4521 mode="rb") 4522 4523 def test_interrupted_read_retry_text(self): 4524 self.check_interrupted_read_retry(lambda x: x, 4525 mode="r", encoding="latin1") 4526 4527 def check_interrupted_write_retry(self, item, **fdopen_kwargs): 4528 """Check that a buffered write, when it gets interrupted (either 4529 returning a partial result or EINTR), properly invokes the signal 4530 handler and retries if the latter returned successfully.""" 4531 select = import_helper.import_module("select") 4532 4533 # A quantity that exceeds the buffer size of an anonymous pipe's 4534 # write end. 4535 N = support.PIPE_MAX_SIZE 4536 r, w = os.pipe() 4537 fdopen_kwargs["closefd"] = False 4538 4539 # We need a separate thread to read from the pipe and allow the 4540 # write() to finish. This thread is started after the SIGALRM is 4541 # received (forcing a first EINTR in write()). 4542 read_results = [] 4543 write_finished = False 4544 error = None 4545 def _read(): 4546 try: 4547 while not write_finished: 4548 while r in select.select([r], [], [], 1.0)[0]: 4549 s = os.read(r, 1024) 4550 read_results.append(s) 4551 except BaseException as exc: 4552 nonlocal error 4553 error = exc 4554 t = threading.Thread(target=_read) 4555 t.daemon = True 4556 def alarm1(sig, frame): 4557 signal.signal(signal.SIGALRM, alarm2) 4558 signal.alarm(1) 4559 def alarm2(sig, frame): 4560 t.start() 4561 4562 large_data = item * N 4563 signal.signal(signal.SIGALRM, alarm1) 4564 try: 4565 wio = self.io.open(w, **fdopen_kwargs) 4566 signal.alarm(1) 4567 # Expected behaviour: 4568 # - first raw write() is partial (because of the limited pipe buffer 4569 # and the first alarm) 4570 # - second raw write() returns EINTR (because of the second alarm) 4571 # - subsequent write()s are successful (either partial or complete) 4572 written = wio.write(large_data) 4573 self.assertEqual(N, written) 4574 4575 wio.flush() 4576 write_finished = True 4577 t.join() 4578 4579 self.assertIsNone(error) 4580 self.assertEqual(N, sum(len(x) for x in read_results)) 4581 finally: 4582 signal.alarm(0) 4583 write_finished = True 4584 os.close(w) 4585 os.close(r) 4586 # This is deliberate. If we didn't close the file descriptor 4587 # before closing wio, wio would try to flush its internal 4588 # buffer, and could block (in case of failure). 4589 try: 4590 wio.close() 4591 except OSError as e: 4592 if e.errno != errno.EBADF: 4593 raise 4594 4595 def test_interrupted_write_retry_buffered(self): 4596 self.check_interrupted_write_retry(b"x", mode="wb") 4597 4598 def test_interrupted_write_retry_text(self): 4599 self.check_interrupted_write_retry("x", mode="w", encoding="latin1") 4600 4601 4602class CSignalsTest(SignalsTest): 4603 io = io 4604 4605class PySignalsTest(SignalsTest): 4606 io = pyio 4607 4608 # Handling reentrancy issues would slow down _pyio even more, so the 4609 # tests are disabled. 4610 test_reentrant_write_buffered = None 4611 test_reentrant_write_text = None 4612 4613 4614def load_tests(*args): 4615 tests = (CIOTest, PyIOTest, APIMismatchTest, 4616 CBufferedReaderTest, PyBufferedReaderTest, 4617 CBufferedWriterTest, PyBufferedWriterTest, 4618 CBufferedRWPairTest, PyBufferedRWPairTest, 4619 CBufferedRandomTest, PyBufferedRandomTest, 4620 StatefulIncrementalDecoderTest, 4621 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest, 4622 CTextIOWrapperTest, PyTextIOWrapperTest, 4623 CMiscIOTest, PyMiscIOTest, 4624 CSignalsTest, PySignalsTest, 4625 ) 4626 4627 # Put the namespaces of the IO module we are testing and some useful mock 4628 # classes in the __dict__ of each test. 4629 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, 4630 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead, 4631 SlowFlushRawIO) 4632 all_members = io.__all__ + ["IncrementalNewlineDecoder"] 4633 c_io_ns = {name : getattr(io, name) for name in all_members} 4634 py_io_ns = {name : getattr(pyio, name) for name in all_members} 4635 globs = globals() 4636 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks) 4637 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks) 4638 for test in tests: 4639 if test.__name__.startswith("C"): 4640 for name, obj in c_io_ns.items(): 4641 setattr(test, name, obj) 4642 elif test.__name__.startswith("Py"): 4643 for name, obj in py_io_ns.items(): 4644 setattr(test, name, obj) 4645 4646 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests]) 4647 return suite 4648 4649if __name__ == "__main__": 4650 unittest.main() 4651