1"""Unit tests for the io module.""" 2 3# Tests of io are scattered over the test suite: 4# * test_bufio - tests file buffering 5# * test_memoryio - tests BytesIO and StringIO 6# * test_fileio - tests FileIO 7# * test_file - tests the file interface 8# * test_io - tests everything else in the io module 9# * test_univnewlines - tests universal newline support 10# * test_largefile - tests operations on a file greater than 2**32 bytes 11# (only enabled with -ulargefile) 12 13################################################################################ 14# ATTENTION TEST WRITERS!!! 15################################################################################ 16# When writing tests for io, it's important to test both the C and Python 17# implementations. This is usually done by writing a base test that refers to 18# the type it is testing as an attribute. Then it provides custom subclasses to 19# test both implementations. This file has lots of examples. 20################################################################################ 21 22import abc 23import array 24import errno 25import locale 26import os 27import pickle 28import random 29import signal 30import sys 31import textwrap 32import threading 33import time 34import unittest 35import warnings 36import weakref 37from collections import deque, UserList 38from itertools import cycle, count 39from test import support 40from test.support.script_helper import ( 41 assert_python_ok, assert_python_failure, run_python_until_end) 42from test.support import import_helper 43from test.support import os_helper 44from test.support import threading_helper 45from test.support import warnings_helper 46from test.support import skip_if_sanitizer 47from test.support.os_helper import FakePath 48 49import codecs 50import io # C implementation of io 51import _pyio as pyio # Python implementation of io 52 53try: 54 import ctypes 55except ImportError: 56 def byteslike(*pos, **kw): 57 return array.array("b", bytes(*pos, **kw)) 58else: 59 def byteslike(*pos, **kw): 60 """Create a bytes-like object having no string or sequence methods""" 61 data = bytes(*pos, **kw) 62 obj = EmptyStruct() 63 ctypes.resize(obj, len(data)) 64 memoryview(obj).cast("B")[:] = data 65 return obj 66 class EmptyStruct(ctypes.Structure): 67 pass 68 69# Does io.IOBase finalizer log the exception if the close() method fails? 70# The exception is ignored silently by default in release build. 71IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode) 72 73 74def _default_chunk_size(): 75 """Get the default TextIOWrapper chunk size""" 76 with open(__file__, "r", encoding="latin-1") as f: 77 return f._CHUNK_SIZE 78 79 80class MockRawIOWithoutRead: 81 """A RawIO implementation without read(), so as to exercise the default 82 RawIO.read() which calls readinto().""" 83 84 def __init__(self, read_stack=()): 85 self._read_stack = list(read_stack) 86 self._write_stack = [] 87 self._reads = 0 88 self._extraneous_reads = 0 89 90 def write(self, b): 91 self._write_stack.append(bytes(b)) 92 return len(b) 93 94 def writable(self): 95 return True 96 97 def fileno(self): 98 return 42 99 100 def readable(self): 101 return True 102 103 def seekable(self): 104 return True 105 106 def seek(self, pos, whence): 107 return 0 # wrong but we gotta return something 108 109 def tell(self): 110 return 0 # same comment as above 111 112 def readinto(self, buf): 113 self._reads += 1 114 max_len = len(buf) 115 try: 116 data = self._read_stack[0] 117 except IndexError: 118 self._extraneous_reads += 1 119 return 0 120 if data is None: 121 del self._read_stack[0] 122 return None 123 n = len(data) 124 if len(data) <= max_len: 125 del self._read_stack[0] 126 buf[:n] = data 127 return n 128 else: 129 buf[:] = data[:max_len] 130 self._read_stack[0] = data[max_len:] 131 return max_len 132 133 def truncate(self, pos=None): 134 return pos 135 136class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase): 137 pass 138 139class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase): 140 pass 141 142 143class MockRawIO(MockRawIOWithoutRead): 144 145 def read(self, n=None): 146 self._reads += 1 147 try: 148 return self._read_stack.pop(0) 149 except: 150 self._extraneous_reads += 1 151 return b"" 152 153class CMockRawIO(MockRawIO, io.RawIOBase): 154 pass 155 156class PyMockRawIO(MockRawIO, pyio.RawIOBase): 157 pass 158 159 160class MisbehavedRawIO(MockRawIO): 161 def write(self, b): 162 return super().write(b) * 2 163 164 def read(self, n=None): 165 return super().read(n) * 2 166 167 def seek(self, pos, whence): 168 return -123 169 170 def tell(self): 171 return -456 172 173 def readinto(self, buf): 174 super().readinto(buf) 175 return len(buf) * 5 176 177class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase): 178 pass 179 180class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase): 181 pass 182 183 184class SlowFlushRawIO(MockRawIO): 185 def __init__(self): 186 super().__init__() 187 self.in_flush = threading.Event() 188 189 def flush(self): 190 self.in_flush.set() 191 time.sleep(0.25) 192 193class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase): 194 pass 195 196class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase): 197 pass 198 199 200class CloseFailureIO(MockRawIO): 201 closed = 0 202 203 def close(self): 204 if not self.closed: 205 self.closed = 1 206 raise OSError 207 208class CCloseFailureIO(CloseFailureIO, io.RawIOBase): 209 pass 210 211class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase): 212 pass 213 214 215class MockFileIO: 216 217 def __init__(self, data): 218 self.read_history = [] 219 super().__init__(data) 220 221 def read(self, n=None): 222 res = super().read(n) 223 self.read_history.append(None if res is None else len(res)) 224 return res 225 226 def readinto(self, b): 227 res = super().readinto(b) 228 self.read_history.append(res) 229 return res 230 231class CMockFileIO(MockFileIO, io.BytesIO): 232 pass 233 234class PyMockFileIO(MockFileIO, pyio.BytesIO): 235 pass 236 237 238class MockUnseekableIO: 239 def seekable(self): 240 return False 241 242 def seek(self, *args): 243 raise self.UnsupportedOperation("not seekable") 244 245 def tell(self, *args): 246 raise self.UnsupportedOperation("not seekable") 247 248 def truncate(self, *args): 249 raise self.UnsupportedOperation("not seekable") 250 251class CMockUnseekableIO(MockUnseekableIO, io.BytesIO): 252 UnsupportedOperation = io.UnsupportedOperation 253 254class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO): 255 UnsupportedOperation = pyio.UnsupportedOperation 256 257 258class MockNonBlockWriterIO: 259 260 def __init__(self): 261 self._write_stack = [] 262 self._blocker_char = None 263 264 def pop_written(self): 265 s = b"".join(self._write_stack) 266 self._write_stack[:] = [] 267 return s 268 269 def block_on(self, char): 270 """Block when a given char is encountered.""" 271 self._blocker_char = char 272 273 def readable(self): 274 return True 275 276 def seekable(self): 277 return True 278 279 def seek(self, pos, whence=0): 280 # naive implementation, enough for tests 281 return 0 282 283 def writable(self): 284 return True 285 286 def write(self, b): 287 b = bytes(b) 288 n = -1 289 if self._blocker_char: 290 try: 291 n = b.index(self._blocker_char) 292 except ValueError: 293 pass 294 else: 295 if n > 0: 296 # write data up to the first blocker 297 self._write_stack.append(b[:n]) 298 return n 299 else: 300 # cancel blocker and indicate would block 301 self._blocker_char = None 302 return None 303 self._write_stack.append(b) 304 return len(b) 305 306class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase): 307 BlockingIOError = io.BlockingIOError 308 309class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): 310 BlockingIOError = pyio.BlockingIOError 311 312 313class IOTest(unittest.TestCase): 314 315 def setUp(self): 316 os_helper.unlink(os_helper.TESTFN) 317 318 def tearDown(self): 319 os_helper.unlink(os_helper.TESTFN) 320 321 def write_ops(self, f): 322 self.assertEqual(f.write(b"blah."), 5) 323 f.truncate(0) 324 self.assertEqual(f.tell(), 5) 325 f.seek(0) 326 327 self.assertEqual(f.write(b"blah."), 5) 328 self.assertEqual(f.seek(0), 0) 329 self.assertEqual(f.write(b"Hello."), 6) 330 self.assertEqual(f.tell(), 6) 331 self.assertEqual(f.seek(-1, 1), 5) 332 self.assertEqual(f.tell(), 5) 333 buffer = bytearray(b" world\n\n\n") 334 self.assertEqual(f.write(buffer), 9) 335 buffer[:] = b"*" * 9 # Overwrite our copy of the data 336 self.assertEqual(f.seek(0), 0) 337 self.assertEqual(f.write(b"h"), 1) 338 self.assertEqual(f.seek(-1, 2), 13) 339 self.assertEqual(f.tell(), 13) 340 341 self.assertEqual(f.truncate(12), 12) 342 self.assertEqual(f.tell(), 13) 343 self.assertRaises(TypeError, f.seek, 0.0) 344 345 def read_ops(self, f, buffered=False): 346 data = f.read(5) 347 self.assertEqual(data, b"hello") 348 data = byteslike(data) 349 self.assertEqual(f.readinto(data), 5) 350 self.assertEqual(bytes(data), b" worl") 351 data = bytearray(5) 352 self.assertEqual(f.readinto(data), 2) 353 self.assertEqual(len(data), 5) 354 self.assertEqual(data[:2], b"d\n") 355 self.assertEqual(f.seek(0), 0) 356 self.assertEqual(f.read(20), b"hello world\n") 357 self.assertEqual(f.read(1), b"") 358 self.assertEqual(f.readinto(byteslike(b"x")), 0) 359 self.assertEqual(f.seek(-6, 2), 6) 360 self.assertEqual(f.read(5), b"world") 361 self.assertEqual(f.read(0), b"") 362 self.assertEqual(f.readinto(byteslike()), 0) 363 self.assertEqual(f.seek(-6, 1), 5) 364 self.assertEqual(f.read(5), b" worl") 365 self.assertEqual(f.tell(), 10) 366 self.assertRaises(TypeError, f.seek, 0.0) 367 if buffered: 368 f.seek(0) 369 self.assertEqual(f.read(), b"hello world\n") 370 f.seek(6) 371 self.assertEqual(f.read(), b"world\n") 372 self.assertEqual(f.read(), b"") 373 f.seek(0) 374 data = byteslike(5) 375 self.assertEqual(f.readinto1(data), 5) 376 self.assertEqual(bytes(data), b"hello") 377 378 LARGE = 2**31 379 380 def large_file_ops(self, f): 381 assert f.readable() 382 assert f.writable() 383 try: 384 self.assertEqual(f.seek(self.LARGE), self.LARGE) 385 except (OverflowError, ValueError): 386 self.skipTest("no largefile support") 387 self.assertEqual(f.tell(), self.LARGE) 388 self.assertEqual(f.write(b"xxx"), 3) 389 self.assertEqual(f.tell(), self.LARGE + 3) 390 self.assertEqual(f.seek(-1, 1), self.LARGE + 2) 391 self.assertEqual(f.truncate(), self.LARGE + 2) 392 self.assertEqual(f.tell(), self.LARGE + 2) 393 self.assertEqual(f.seek(0, 2), self.LARGE + 2) 394 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1) 395 self.assertEqual(f.tell(), self.LARGE + 2) 396 self.assertEqual(f.seek(0, 2), self.LARGE + 1) 397 self.assertEqual(f.seek(-1, 2), self.LARGE) 398 self.assertEqual(f.read(2), b"x") 399 400 def test_invalid_operations(self): 401 # Try writing on a file opened in read mode and vice-versa. 402 exc = self.UnsupportedOperation 403 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as fp: 404 self.assertRaises(exc, fp.read) 405 self.assertRaises(exc, fp.readline) 406 with self.open(os_helper.TESTFN, "wb") as fp: 407 self.assertRaises(exc, fp.read) 408 self.assertRaises(exc, fp.readline) 409 with self.open(os_helper.TESTFN, "wb", buffering=0) as fp: 410 self.assertRaises(exc, fp.read) 411 self.assertRaises(exc, fp.readline) 412 with self.open(os_helper.TESTFN, "rb", buffering=0) as fp: 413 self.assertRaises(exc, fp.write, b"blah") 414 self.assertRaises(exc, fp.writelines, [b"blah\n"]) 415 with self.open(os_helper.TESTFN, "rb") as fp: 416 self.assertRaises(exc, fp.write, b"blah") 417 self.assertRaises(exc, fp.writelines, [b"blah\n"]) 418 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as fp: 419 self.assertRaises(exc, fp.write, "blah") 420 self.assertRaises(exc, fp.writelines, ["blah\n"]) 421 # Non-zero seeking from current or end pos 422 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR) 423 self.assertRaises(exc, fp.seek, -1, self.SEEK_END) 424 425 def test_optional_abilities(self): 426 # Test for OSError when optional APIs are not supported 427 # The purpose of this test is to try fileno(), reading, writing and 428 # seeking operations with various objects that indicate they do not 429 # support these operations. 430 431 def pipe_reader(): 432 [r, w] = os.pipe() 433 os.close(w) # So that read() is harmless 434 return self.FileIO(r, "r") 435 436 def pipe_writer(): 437 [r, w] = os.pipe() 438 self.addCleanup(os.close, r) 439 # Guarantee that we can write into the pipe without blocking 440 thread = threading.Thread(target=os.read, args=(r, 100)) 441 thread.start() 442 self.addCleanup(thread.join) 443 return self.FileIO(w, "w") 444 445 def buffered_reader(): 446 return self.BufferedReader(self.MockUnseekableIO()) 447 448 def buffered_writer(): 449 return self.BufferedWriter(self.MockUnseekableIO()) 450 451 def buffered_random(): 452 return self.BufferedRandom(self.BytesIO()) 453 454 def buffered_rw_pair(): 455 return self.BufferedRWPair(self.MockUnseekableIO(), 456 self.MockUnseekableIO()) 457 458 def text_reader(): 459 class UnseekableReader(self.MockUnseekableIO): 460 writable = self.BufferedIOBase.writable 461 write = self.BufferedIOBase.write 462 return self.TextIOWrapper(UnseekableReader(), "ascii") 463 464 def text_writer(): 465 class UnseekableWriter(self.MockUnseekableIO): 466 readable = self.BufferedIOBase.readable 467 read = self.BufferedIOBase.read 468 return self.TextIOWrapper(UnseekableWriter(), "ascii") 469 470 tests = ( 471 (pipe_reader, "fr"), (pipe_writer, "fw"), 472 (buffered_reader, "r"), (buffered_writer, "w"), 473 (buffered_random, "rws"), (buffered_rw_pair, "rw"), 474 (text_reader, "r"), (text_writer, "w"), 475 (self.BytesIO, "rws"), (self.StringIO, "rws"), 476 ) 477 for [test, abilities] in tests: 478 with self.subTest(test), test() as obj: 479 readable = "r" in abilities 480 self.assertEqual(obj.readable(), readable) 481 writable = "w" in abilities 482 self.assertEqual(obj.writable(), writable) 483 484 if isinstance(obj, self.TextIOBase): 485 data = "3" 486 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)): 487 data = b"3" 488 else: 489 self.fail("Unknown base class") 490 491 if "f" in abilities: 492 obj.fileno() 493 else: 494 self.assertRaises(OSError, obj.fileno) 495 496 if readable: 497 obj.read(1) 498 obj.read() 499 else: 500 self.assertRaises(OSError, obj.read, 1) 501 self.assertRaises(OSError, obj.read) 502 503 if writable: 504 obj.write(data) 505 else: 506 self.assertRaises(OSError, obj.write, data) 507 508 if sys.platform.startswith("win") and test in ( 509 pipe_reader, pipe_writer): 510 # Pipes seem to appear as seekable on Windows 511 continue 512 seekable = "s" in abilities 513 self.assertEqual(obj.seekable(), seekable) 514 515 if seekable: 516 obj.tell() 517 obj.seek(0) 518 else: 519 self.assertRaises(OSError, obj.tell) 520 self.assertRaises(OSError, obj.seek, 0) 521 522 if writable and seekable: 523 obj.truncate() 524 obj.truncate(0) 525 else: 526 self.assertRaises(OSError, obj.truncate) 527 self.assertRaises(OSError, obj.truncate, 0) 528 529 def test_open_handles_NUL_chars(self): 530 fn_with_NUL = 'foo\0bar' 531 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w', encoding="utf-8") 532 533 bytes_fn = bytes(fn_with_NUL, 'ascii') 534 with warnings.catch_warnings(): 535 warnings.simplefilter("ignore", DeprecationWarning) 536 self.assertRaises(ValueError, self.open, bytes_fn, 'w', encoding="utf-8") 537 538 def test_raw_file_io(self): 539 with self.open(os_helper.TESTFN, "wb", buffering=0) as f: 540 self.assertEqual(f.readable(), False) 541 self.assertEqual(f.writable(), True) 542 self.assertEqual(f.seekable(), True) 543 self.write_ops(f) 544 with self.open(os_helper.TESTFN, "rb", buffering=0) as f: 545 self.assertEqual(f.readable(), True) 546 self.assertEqual(f.writable(), False) 547 self.assertEqual(f.seekable(), True) 548 self.read_ops(f) 549 550 def test_buffered_file_io(self): 551 with self.open(os_helper.TESTFN, "wb") as f: 552 self.assertEqual(f.readable(), False) 553 self.assertEqual(f.writable(), True) 554 self.assertEqual(f.seekable(), True) 555 self.write_ops(f) 556 with self.open(os_helper.TESTFN, "rb") as f: 557 self.assertEqual(f.readable(), True) 558 self.assertEqual(f.writable(), False) 559 self.assertEqual(f.seekable(), True) 560 self.read_ops(f, True) 561 562 def test_readline(self): 563 with self.open(os_helper.TESTFN, "wb") as f: 564 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line") 565 with self.open(os_helper.TESTFN, "rb") as f: 566 self.assertEqual(f.readline(), b"abc\n") 567 self.assertEqual(f.readline(10), b"def\n") 568 self.assertEqual(f.readline(2), b"xy") 569 self.assertEqual(f.readline(4), b"zzy\n") 570 self.assertEqual(f.readline(), b"foo\x00bar\n") 571 self.assertEqual(f.readline(None), b"another line") 572 self.assertRaises(TypeError, f.readline, 5.3) 573 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: 574 self.assertRaises(TypeError, f.readline, 5.3) 575 576 def test_readline_nonsizeable(self): 577 # Issue #30061 578 # Crash when readline() returns an object without __len__ 579 class R(self.IOBase): 580 def readline(self): 581 return None 582 self.assertRaises((TypeError, StopIteration), next, R()) 583 584 def test_next_nonsizeable(self): 585 # Issue #30061 586 # Crash when __next__() returns an object without __len__ 587 class R(self.IOBase): 588 def __next__(self): 589 return None 590 self.assertRaises(TypeError, R().readlines, 1) 591 592 def test_raw_bytes_io(self): 593 f = self.BytesIO() 594 self.write_ops(f) 595 data = f.getvalue() 596 self.assertEqual(data, b"hello world\n") 597 f = self.BytesIO(data) 598 self.read_ops(f, True) 599 600 def test_large_file_ops(self): 601 # On Windows and Mac OSX this test consumes large resources; It takes 602 # a long time to build the >2 GiB file and takes >2 GiB of disk space 603 # therefore the resource must be enabled to run this test. 604 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 605 support.requires( 606 'largefile', 607 'test requires %s bytes and a long time to run' % self.LARGE) 608 with self.open(os_helper.TESTFN, "w+b", 0) as f: 609 self.large_file_ops(f) 610 with self.open(os_helper.TESTFN, "w+b") as f: 611 self.large_file_ops(f) 612 613 def test_with_open(self): 614 for bufsize in (0, 100): 615 f = None 616 with self.open(os_helper.TESTFN, "wb", bufsize) as f: 617 f.write(b"xxx") 618 self.assertEqual(f.closed, True) 619 f = None 620 try: 621 with self.open(os_helper.TESTFN, "wb", bufsize) as f: 622 1/0 623 except ZeroDivisionError: 624 self.assertEqual(f.closed, True) 625 else: 626 self.fail("1/0 didn't raise an exception") 627 628 # issue 5008 629 def test_append_mode_tell(self): 630 with self.open(os_helper.TESTFN, "wb") as f: 631 f.write(b"xxx") 632 with self.open(os_helper.TESTFN, "ab", buffering=0) as f: 633 self.assertEqual(f.tell(), 3) 634 with self.open(os_helper.TESTFN, "ab") as f: 635 self.assertEqual(f.tell(), 3) 636 with self.open(os_helper.TESTFN, "a", encoding="utf-8") as f: 637 self.assertGreater(f.tell(), 0) 638 639 def test_destructor(self): 640 record = [] 641 class MyFileIO(self.FileIO): 642 def __del__(self): 643 record.append(1) 644 try: 645 f = super().__del__ 646 except AttributeError: 647 pass 648 else: 649 f() 650 def close(self): 651 record.append(2) 652 super().close() 653 def flush(self): 654 record.append(3) 655 super().flush() 656 with warnings_helper.check_warnings(('', ResourceWarning)): 657 f = MyFileIO(os_helper.TESTFN, "wb") 658 f.write(b"xxx") 659 del f 660 support.gc_collect() 661 self.assertEqual(record, [1, 2, 3]) 662 with self.open(os_helper.TESTFN, "rb") as f: 663 self.assertEqual(f.read(), b"xxx") 664 665 def _check_base_destructor(self, base): 666 record = [] 667 class MyIO(base): 668 def __init__(self): 669 # This exercises the availability of attributes on object 670 # destruction. 671 # (in the C version, close() is called by the tp_dealloc 672 # function, not by __del__) 673 self.on_del = 1 674 self.on_close = 2 675 self.on_flush = 3 676 def __del__(self): 677 record.append(self.on_del) 678 try: 679 f = super().__del__ 680 except AttributeError: 681 pass 682 else: 683 f() 684 def close(self): 685 record.append(self.on_close) 686 super().close() 687 def flush(self): 688 record.append(self.on_flush) 689 super().flush() 690 f = MyIO() 691 del f 692 support.gc_collect() 693 self.assertEqual(record, [1, 2, 3]) 694 695 def test_IOBase_destructor(self): 696 self._check_base_destructor(self.IOBase) 697 698 def test_RawIOBase_destructor(self): 699 self._check_base_destructor(self.RawIOBase) 700 701 def test_BufferedIOBase_destructor(self): 702 self._check_base_destructor(self.BufferedIOBase) 703 704 def test_TextIOBase_destructor(self): 705 self._check_base_destructor(self.TextIOBase) 706 707 def test_close_flushes(self): 708 with self.open(os_helper.TESTFN, "wb") as f: 709 f.write(b"xxx") 710 with self.open(os_helper.TESTFN, "rb") as f: 711 self.assertEqual(f.read(), b"xxx") 712 713 def test_array_writes(self): 714 a = array.array('i', range(10)) 715 n = len(a.tobytes()) 716 def check(f): 717 with f: 718 self.assertEqual(f.write(a), n) 719 f.writelines((a,)) 720 check(self.BytesIO()) 721 check(self.FileIO(os_helper.TESTFN, "w")) 722 check(self.BufferedWriter(self.MockRawIO())) 723 check(self.BufferedRandom(self.MockRawIO())) 724 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())) 725 726 def test_closefd(self): 727 self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w', 728 encoding="utf-8", closefd=False) 729 730 def test_read_closed(self): 731 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: 732 f.write("egg\n") 733 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: 734 file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False) 735 self.assertEqual(file.read(), "egg\n") 736 file.seek(0) 737 file.close() 738 self.assertRaises(ValueError, file.read) 739 with self.open(os_helper.TESTFN, "rb") as f: 740 file = self.open(f.fileno(), "rb", closefd=False) 741 self.assertEqual(file.read()[:3], b"egg") 742 file.close() 743 self.assertRaises(ValueError, file.readinto, bytearray(1)) 744 745 def test_no_closefd_with_filename(self): 746 # can't use closefd in combination with a file name 747 self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r", 748 encoding="utf-8", closefd=False) 749 750 def test_closefd_attr(self): 751 with self.open(os_helper.TESTFN, "wb") as f: 752 f.write(b"egg\n") 753 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: 754 self.assertEqual(f.buffer.raw.closefd, True) 755 file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False) 756 self.assertEqual(file.buffer.raw.closefd, False) 757 758 def test_garbage_collection(self): 759 # FileIO objects are collected, and collecting them flushes 760 # all data to disk. 761 with warnings_helper.check_warnings(('', ResourceWarning)): 762 f = self.FileIO(os_helper.TESTFN, "wb") 763 f.write(b"abcxxx") 764 f.f = f 765 wr = weakref.ref(f) 766 del f 767 support.gc_collect() 768 self.assertIsNone(wr(), wr) 769 with self.open(os_helper.TESTFN, "rb") as f: 770 self.assertEqual(f.read(), b"abcxxx") 771 772 def test_unbounded_file(self): 773 # Issue #1174606: reading from an unbounded stream such as /dev/zero. 774 zero = "/dev/zero" 775 if not os.path.exists(zero): 776 self.skipTest("{0} does not exist".format(zero)) 777 if sys.maxsize > 0x7FFFFFFF: 778 self.skipTest("test can only run in a 32-bit address space") 779 if support.real_max_memuse < support._2G: 780 self.skipTest("test requires at least 2 GiB of memory") 781 with self.open(zero, "rb", buffering=0) as f: 782 self.assertRaises(OverflowError, f.read) 783 with self.open(zero, "rb") as f: 784 self.assertRaises(OverflowError, f.read) 785 with self.open(zero, "r") as f: 786 self.assertRaises(OverflowError, f.read) 787 788 def check_flush_error_on_close(self, *args, **kwargs): 789 # Test that the file is closed despite failed flush 790 # and that flush() is called before file closed. 791 f = self.open(*args, **kwargs) 792 closed = [] 793 def bad_flush(): 794 closed[:] = [f.closed] 795 raise OSError() 796 f.flush = bad_flush 797 self.assertRaises(OSError, f.close) # exception not swallowed 798 self.assertTrue(f.closed) 799 self.assertTrue(closed) # flush() called 800 self.assertFalse(closed[0]) # flush() called before file closed 801 f.flush = lambda: None # break reference loop 802 803 def test_flush_error_on_close(self): 804 # raw file 805 # Issue #5700: io.FileIO calls flush() after file closed 806 self.check_flush_error_on_close(os_helper.TESTFN, 'wb', buffering=0) 807 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) 808 self.check_flush_error_on_close(fd, 'wb', buffering=0) 809 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) 810 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False) 811 os.close(fd) 812 # buffered io 813 self.check_flush_error_on_close(os_helper.TESTFN, 'wb') 814 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) 815 self.check_flush_error_on_close(fd, 'wb') 816 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) 817 self.check_flush_error_on_close(fd, 'wb', closefd=False) 818 os.close(fd) 819 # text io 820 self.check_flush_error_on_close(os_helper.TESTFN, 'w', encoding="utf-8") 821 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) 822 self.check_flush_error_on_close(fd, 'w', encoding="utf-8") 823 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) 824 self.check_flush_error_on_close(fd, 'w', encoding="utf-8", closefd=False) 825 os.close(fd) 826 827 def test_multi_close(self): 828 f = self.open(os_helper.TESTFN, "wb", buffering=0) 829 f.close() 830 f.close() 831 f.close() 832 self.assertRaises(ValueError, f.flush) 833 834 def test_RawIOBase_read(self): 835 # Exercise the default limited RawIOBase.read(n) implementation (which 836 # calls readinto() internally). 837 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None)) 838 self.assertEqual(rawio.read(2), b"ab") 839 self.assertEqual(rawio.read(2), b"c") 840 self.assertEqual(rawio.read(2), b"d") 841 self.assertEqual(rawio.read(2), None) 842 self.assertEqual(rawio.read(2), b"ef") 843 self.assertEqual(rawio.read(2), b"g") 844 self.assertEqual(rawio.read(2), None) 845 self.assertEqual(rawio.read(2), b"") 846 847 def test_types_have_dict(self): 848 test = ( 849 self.IOBase(), 850 self.RawIOBase(), 851 self.TextIOBase(), 852 self.StringIO(), 853 self.BytesIO() 854 ) 855 for obj in test: 856 self.assertTrue(hasattr(obj, "__dict__")) 857 858 def test_opener(self): 859 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: 860 f.write("egg\n") 861 fd = os.open(os_helper.TESTFN, os.O_RDONLY) 862 def opener(path, flags): 863 return fd 864 with self.open("non-existent", "r", encoding="utf-8", opener=opener) as f: 865 self.assertEqual(f.read(), "egg\n") 866 867 def test_bad_opener_negative_1(self): 868 # Issue #27066. 869 def badopener(fname, flags): 870 return -1 871 with self.assertRaises(ValueError) as cm: 872 open('non-existent', 'r', opener=badopener) 873 self.assertEqual(str(cm.exception), 'opener returned -1') 874 875 def test_bad_opener_other_negative(self): 876 # Issue #27066. 877 def badopener(fname, flags): 878 return -2 879 with self.assertRaises(ValueError) as cm: 880 open('non-existent', 'r', opener=badopener) 881 self.assertEqual(str(cm.exception), 'opener returned -2') 882 883 def test_fileio_closefd(self): 884 # Issue #4841 885 with self.open(__file__, 'rb') as f1, \ 886 self.open(__file__, 'rb') as f2: 887 fileio = self.FileIO(f1.fileno(), closefd=False) 888 # .__init__() must not close f1 889 fileio.__init__(f2.fileno(), closefd=False) 890 f1.readline() 891 # .close() must not close f2 892 fileio.close() 893 f2.readline() 894 895 def test_nonbuffered_textio(self): 896 with warnings_helper.check_no_resource_warning(self): 897 with self.assertRaises(ValueError): 898 self.open(os_helper.TESTFN, 'w', encoding="utf-8", buffering=0) 899 900 def test_invalid_newline(self): 901 with warnings_helper.check_no_resource_warning(self): 902 with self.assertRaises(ValueError): 903 self.open(os_helper.TESTFN, 'w', encoding="utf-8", newline='invalid') 904 905 def test_buffered_readinto_mixin(self): 906 # Test the implementation provided by BufferedIOBase 907 class Stream(self.BufferedIOBase): 908 def read(self, size): 909 return b"12345" 910 read1 = read 911 stream = Stream() 912 for method in ("readinto", "readinto1"): 913 with self.subTest(method): 914 buffer = byteslike(5) 915 self.assertEqual(getattr(stream, method)(buffer), 5) 916 self.assertEqual(bytes(buffer), b"12345") 917 918 def test_fspath_support(self): 919 def check_path_succeeds(path): 920 with self.open(path, "w", encoding="utf-8") as f: 921 f.write("egg\n") 922 923 with self.open(path, "r", encoding="utf-8") as f: 924 self.assertEqual(f.read(), "egg\n") 925 926 check_path_succeeds(FakePath(os_helper.TESTFN)) 927 check_path_succeeds(FakePath(os.fsencode(os_helper.TESTFN))) 928 929 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: 930 bad_path = FakePath(f.fileno()) 931 with self.assertRaises(TypeError): 932 self.open(bad_path, 'w', encoding="utf-8") 933 934 bad_path = FakePath(None) 935 with self.assertRaises(TypeError): 936 self.open(bad_path, 'w', encoding="utf-8") 937 938 bad_path = FakePath(FloatingPointError) 939 with self.assertRaises(FloatingPointError): 940 self.open(bad_path, 'w', encoding="utf-8") 941 942 # ensure that refcounting is correct with some error conditions 943 with self.assertRaisesRegex(ValueError, 'read/write/append mode'): 944 self.open(FakePath(os_helper.TESTFN), 'rwxa', encoding="utf-8") 945 946 def test_RawIOBase_readall(self): 947 # Exercise the default unlimited RawIOBase.read() and readall() 948 # implementations. 949 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg")) 950 self.assertEqual(rawio.read(), b"abcdefg") 951 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg")) 952 self.assertEqual(rawio.readall(), b"abcdefg") 953 954 def test_BufferedIOBase_readinto(self): 955 # Exercise the default BufferedIOBase.readinto() and readinto1() 956 # implementations (which call read() or read1() internally). 957 class Reader(self.BufferedIOBase): 958 def __init__(self, avail): 959 self.avail = avail 960 def read(self, size): 961 result = self.avail[:size] 962 self.avail = self.avail[size:] 963 return result 964 def read1(self, size): 965 """Returns no more than 5 bytes at once""" 966 return self.read(min(size, 5)) 967 tests = ( 968 # (test method, total data available, read buffer size, expected 969 # read size) 970 ("readinto", 10, 5, 5), 971 ("readinto", 10, 6, 6), # More than read1() can return 972 ("readinto", 5, 6, 5), # Buffer larger than total available 973 ("readinto", 6, 7, 6), 974 ("readinto", 10, 0, 0), # Empty buffer 975 ("readinto1", 10, 5, 5), # Result limited to single read1() call 976 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return 977 ("readinto1", 5, 6, 5), # Buffer larger than total available 978 ("readinto1", 6, 7, 5), 979 ("readinto1", 10, 0, 0), # Empty buffer 980 ) 981 UNUSED_BYTE = 0x81 982 for test in tests: 983 with self.subTest(test): 984 method, avail, request, result = test 985 reader = Reader(bytes(range(avail))) 986 buffer = bytearray((UNUSED_BYTE,) * request) 987 method = getattr(reader, method) 988 self.assertEqual(method(buffer), result) 989 self.assertEqual(len(buffer), request) 990 self.assertSequenceEqual(buffer[:result], range(result)) 991 unused = (UNUSED_BYTE,) * (request - result) 992 self.assertSequenceEqual(buffer[result:], unused) 993 self.assertEqual(len(reader.avail), avail - result) 994 995 def test_close_assert(self): 996 class R(self.IOBase): 997 def __setattr__(self, name, value): 998 pass 999 def flush(self): 1000 raise OSError() 1001 f = R() 1002 # This would cause an assertion failure. 1003 self.assertRaises(OSError, f.close) 1004 1005 # Silence destructor error 1006 R.flush = lambda self: None 1007 1008 1009class CIOTest(IOTest): 1010 1011 def test_IOBase_finalize(self): 1012 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a 1013 # class which inherits IOBase and an object of this class are caught 1014 # in a reference cycle and close() is already in the method cache. 1015 class MyIO(self.IOBase): 1016 def close(self): 1017 pass 1018 1019 # create an instance to populate the method cache 1020 MyIO() 1021 obj = MyIO() 1022 obj.obj = obj 1023 wr = weakref.ref(obj) 1024 del MyIO 1025 del obj 1026 support.gc_collect() 1027 self.assertIsNone(wr(), wr) 1028 1029class PyIOTest(IOTest): 1030 pass 1031 1032 1033@support.cpython_only 1034class APIMismatchTest(unittest.TestCase): 1035 1036 def test_RawIOBase_io_in_pyio_match(self): 1037 """Test that pyio RawIOBase class has all c RawIOBase methods""" 1038 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase, 1039 ignore=('__weakref__',)) 1040 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods') 1041 1042 def test_RawIOBase_pyio_in_io_match(self): 1043 """Test that c RawIOBase class has all pyio RawIOBase methods""" 1044 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase) 1045 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods') 1046 1047 1048class CommonBufferedTests: 1049 # Tests common to BufferedReader, BufferedWriter and BufferedRandom 1050 1051 def test_detach(self): 1052 raw = self.MockRawIO() 1053 buf = self.tp(raw) 1054 self.assertIs(buf.detach(), raw) 1055 self.assertRaises(ValueError, buf.detach) 1056 1057 repr(buf) # Should still work 1058 1059 def test_fileno(self): 1060 rawio = self.MockRawIO() 1061 bufio = self.tp(rawio) 1062 1063 self.assertEqual(42, bufio.fileno()) 1064 1065 def test_invalid_args(self): 1066 rawio = self.MockRawIO() 1067 bufio = self.tp(rawio) 1068 # Invalid whence 1069 self.assertRaises(ValueError, bufio.seek, 0, -1) 1070 self.assertRaises(ValueError, bufio.seek, 0, 9) 1071 1072 def test_override_destructor(self): 1073 tp = self.tp 1074 record = [] 1075 class MyBufferedIO(tp): 1076 def __del__(self): 1077 record.append(1) 1078 try: 1079 f = super().__del__ 1080 except AttributeError: 1081 pass 1082 else: 1083 f() 1084 def close(self): 1085 record.append(2) 1086 super().close() 1087 def flush(self): 1088 record.append(3) 1089 super().flush() 1090 rawio = self.MockRawIO() 1091 bufio = MyBufferedIO(rawio) 1092 del bufio 1093 support.gc_collect() 1094 self.assertEqual(record, [1, 2, 3]) 1095 1096 def test_context_manager(self): 1097 # Test usability as a context manager 1098 rawio = self.MockRawIO() 1099 bufio = self.tp(rawio) 1100 def _with(): 1101 with bufio: 1102 pass 1103 _with() 1104 # bufio should now be closed, and using it a second time should raise 1105 # a ValueError. 1106 self.assertRaises(ValueError, _with) 1107 1108 def test_error_through_destructor(self): 1109 # Test that the exception state is not modified by a destructor, 1110 # even if close() fails. 1111 rawio = self.CloseFailureIO() 1112 with support.catch_unraisable_exception() as cm: 1113 with self.assertRaises(AttributeError): 1114 self.tp(rawio).xyzzy 1115 1116 if not IOBASE_EMITS_UNRAISABLE: 1117 self.assertIsNone(cm.unraisable) 1118 elif cm.unraisable is not None: 1119 self.assertEqual(cm.unraisable.exc_type, OSError) 1120 1121 def test_repr(self): 1122 raw = self.MockRawIO() 1123 b = self.tp(raw) 1124 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__) 1125 self.assertRegex(repr(b), "<%s>" % clsname) 1126 raw.name = "dummy" 1127 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname) 1128 raw.name = b"dummy" 1129 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname) 1130 1131 def test_recursive_repr(self): 1132 # Issue #25455 1133 raw = self.MockRawIO() 1134 b = self.tp(raw) 1135 with support.swap_attr(raw, 'name', b): 1136 try: 1137 repr(b) # Should not crash 1138 except RuntimeError: 1139 pass 1140 1141 def test_flush_error_on_close(self): 1142 # Test that buffered file is closed despite failed flush 1143 # and that flush() is called before file closed. 1144 raw = self.MockRawIO() 1145 closed = [] 1146 def bad_flush(): 1147 closed[:] = [b.closed, raw.closed] 1148 raise OSError() 1149 raw.flush = bad_flush 1150 b = self.tp(raw) 1151 self.assertRaises(OSError, b.close) # exception not swallowed 1152 self.assertTrue(b.closed) 1153 self.assertTrue(raw.closed) 1154 self.assertTrue(closed) # flush() called 1155 self.assertFalse(closed[0]) # flush() called before file closed 1156 self.assertFalse(closed[1]) 1157 raw.flush = lambda: None # break reference loop 1158 1159 def test_close_error_on_close(self): 1160 raw = self.MockRawIO() 1161 def bad_flush(): 1162 raise OSError('flush') 1163 def bad_close(): 1164 raise OSError('close') 1165 raw.close = bad_close 1166 b = self.tp(raw) 1167 b.flush = bad_flush 1168 with self.assertRaises(OSError) as err: # exception not swallowed 1169 b.close() 1170 self.assertEqual(err.exception.args, ('close',)) 1171 self.assertIsInstance(err.exception.__context__, OSError) 1172 self.assertEqual(err.exception.__context__.args, ('flush',)) 1173 self.assertFalse(b.closed) 1174 1175 # Silence destructor error 1176 raw.close = lambda: None 1177 b.flush = lambda: None 1178 1179 def test_nonnormalized_close_error_on_close(self): 1180 # Issue #21677 1181 raw = self.MockRawIO() 1182 def bad_flush(): 1183 raise non_existing_flush 1184 def bad_close(): 1185 raise non_existing_close 1186 raw.close = bad_close 1187 b = self.tp(raw) 1188 b.flush = bad_flush 1189 with self.assertRaises(NameError) as err: # exception not swallowed 1190 b.close() 1191 self.assertIn('non_existing_close', str(err.exception)) 1192 self.assertIsInstance(err.exception.__context__, NameError) 1193 self.assertIn('non_existing_flush', str(err.exception.__context__)) 1194 self.assertFalse(b.closed) 1195 1196 # Silence destructor error 1197 b.flush = lambda: None 1198 raw.close = lambda: None 1199 1200 def test_multi_close(self): 1201 raw = self.MockRawIO() 1202 b = self.tp(raw) 1203 b.close() 1204 b.close() 1205 b.close() 1206 self.assertRaises(ValueError, b.flush) 1207 1208 def test_unseekable(self): 1209 bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) 1210 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1211 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1212 1213 def test_readonly_attributes(self): 1214 raw = self.MockRawIO() 1215 buf = self.tp(raw) 1216 x = self.MockRawIO() 1217 with self.assertRaises(AttributeError): 1218 buf.raw = x 1219 1220 1221class SizeofTest: 1222 1223 @support.cpython_only 1224 def test_sizeof(self): 1225 bufsize1 = 4096 1226 bufsize2 = 8192 1227 rawio = self.MockRawIO() 1228 bufio = self.tp(rawio, buffer_size=bufsize1) 1229 size = sys.getsizeof(bufio) - bufsize1 1230 rawio = self.MockRawIO() 1231 bufio = self.tp(rawio, buffer_size=bufsize2) 1232 self.assertEqual(sys.getsizeof(bufio), size + bufsize2) 1233 1234 @support.cpython_only 1235 def test_buffer_freeing(self) : 1236 bufsize = 4096 1237 rawio = self.MockRawIO() 1238 bufio = self.tp(rawio, buffer_size=bufsize) 1239 size = sys.getsizeof(bufio) - bufsize 1240 bufio.close() 1241 self.assertEqual(sys.getsizeof(bufio), size) 1242 1243class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): 1244 read_mode = "rb" 1245 1246 def test_constructor(self): 1247 rawio = self.MockRawIO([b"abc"]) 1248 bufio = self.tp(rawio) 1249 bufio.__init__(rawio) 1250 bufio.__init__(rawio, buffer_size=1024) 1251 bufio.__init__(rawio, buffer_size=16) 1252 self.assertEqual(b"abc", bufio.read()) 1253 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1254 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1255 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1256 rawio = self.MockRawIO([b"abc"]) 1257 bufio.__init__(rawio) 1258 self.assertEqual(b"abc", bufio.read()) 1259 1260 def test_uninitialized(self): 1261 bufio = self.tp.__new__(self.tp) 1262 del bufio 1263 bufio = self.tp.__new__(self.tp) 1264 self.assertRaisesRegex((ValueError, AttributeError), 1265 'uninitialized|has no attribute', 1266 bufio.read, 0) 1267 bufio.__init__(self.MockRawIO()) 1268 self.assertEqual(bufio.read(0), b'') 1269 1270 def test_read(self): 1271 for arg in (None, 7): 1272 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1273 bufio = self.tp(rawio) 1274 self.assertEqual(b"abcdefg", bufio.read(arg)) 1275 # Invalid args 1276 self.assertRaises(ValueError, bufio.read, -2) 1277 1278 def test_read1(self): 1279 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1280 bufio = self.tp(rawio) 1281 self.assertEqual(b"a", bufio.read(1)) 1282 self.assertEqual(b"b", bufio.read1(1)) 1283 self.assertEqual(rawio._reads, 1) 1284 self.assertEqual(b"", bufio.read1(0)) 1285 self.assertEqual(b"c", bufio.read1(100)) 1286 self.assertEqual(rawio._reads, 1) 1287 self.assertEqual(b"d", bufio.read1(100)) 1288 self.assertEqual(rawio._reads, 2) 1289 self.assertEqual(b"efg", bufio.read1(100)) 1290 self.assertEqual(rawio._reads, 3) 1291 self.assertEqual(b"", bufio.read1(100)) 1292 self.assertEqual(rawio._reads, 4) 1293 1294 def test_read1_arbitrary(self): 1295 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1296 bufio = self.tp(rawio) 1297 self.assertEqual(b"a", bufio.read(1)) 1298 self.assertEqual(b"bc", bufio.read1()) 1299 self.assertEqual(b"d", bufio.read1()) 1300 self.assertEqual(b"efg", bufio.read1(-1)) 1301 self.assertEqual(rawio._reads, 3) 1302 self.assertEqual(b"", bufio.read1()) 1303 self.assertEqual(rawio._reads, 4) 1304 1305 def test_readinto(self): 1306 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1307 bufio = self.tp(rawio) 1308 b = bytearray(2) 1309 self.assertEqual(bufio.readinto(b), 2) 1310 self.assertEqual(b, b"ab") 1311 self.assertEqual(bufio.readinto(b), 2) 1312 self.assertEqual(b, b"cd") 1313 self.assertEqual(bufio.readinto(b), 2) 1314 self.assertEqual(b, b"ef") 1315 self.assertEqual(bufio.readinto(b), 1) 1316 self.assertEqual(b, b"gf") 1317 self.assertEqual(bufio.readinto(b), 0) 1318 self.assertEqual(b, b"gf") 1319 rawio = self.MockRawIO((b"abc", None)) 1320 bufio = self.tp(rawio) 1321 self.assertEqual(bufio.readinto(b), 2) 1322 self.assertEqual(b, b"ab") 1323 self.assertEqual(bufio.readinto(b), 1) 1324 self.assertEqual(b, b"cb") 1325 1326 def test_readinto1(self): 1327 buffer_size = 10 1328 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl")) 1329 bufio = self.tp(rawio, buffer_size=buffer_size) 1330 b = bytearray(2) 1331 self.assertEqual(bufio.peek(3), b'abc') 1332 self.assertEqual(rawio._reads, 1) 1333 self.assertEqual(bufio.readinto1(b), 2) 1334 self.assertEqual(b, b"ab") 1335 self.assertEqual(rawio._reads, 1) 1336 self.assertEqual(bufio.readinto1(b), 1) 1337 self.assertEqual(b[:1], b"c") 1338 self.assertEqual(rawio._reads, 1) 1339 self.assertEqual(bufio.readinto1(b), 2) 1340 self.assertEqual(b, b"de") 1341 self.assertEqual(rawio._reads, 2) 1342 b = bytearray(2*buffer_size) 1343 self.assertEqual(bufio.peek(3), b'fgh') 1344 self.assertEqual(rawio._reads, 3) 1345 self.assertEqual(bufio.readinto1(b), 6) 1346 self.assertEqual(b[:6], b"fghjkl") 1347 self.assertEqual(rawio._reads, 4) 1348 1349 def test_readinto_array(self): 1350 buffer_size = 60 1351 data = b"a" * 26 1352 rawio = self.MockRawIO((data,)) 1353 bufio = self.tp(rawio, buffer_size=buffer_size) 1354 1355 # Create an array with element size > 1 byte 1356 b = array.array('i', b'x' * 32) 1357 assert len(b) != 16 1358 1359 # Read into it. We should get as many *bytes* as we can fit into b 1360 # (which is more than the number of elements) 1361 n = bufio.readinto(b) 1362 self.assertGreater(n, len(b)) 1363 1364 # Check that old contents of b are preserved 1365 bm = memoryview(b).cast('B') 1366 self.assertLess(n, len(bm)) 1367 self.assertEqual(bm[:n], data[:n]) 1368 self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) 1369 1370 def test_readinto1_array(self): 1371 buffer_size = 60 1372 data = b"a" * 26 1373 rawio = self.MockRawIO((data,)) 1374 bufio = self.tp(rawio, buffer_size=buffer_size) 1375 1376 # Create an array with element size > 1 byte 1377 b = array.array('i', b'x' * 32) 1378 assert len(b) != 16 1379 1380 # Read into it. We should get as many *bytes* as we can fit into b 1381 # (which is more than the number of elements) 1382 n = bufio.readinto1(b) 1383 self.assertGreater(n, len(b)) 1384 1385 # Check that old contents of b are preserved 1386 bm = memoryview(b).cast('B') 1387 self.assertLess(n, len(bm)) 1388 self.assertEqual(bm[:n], data[:n]) 1389 self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) 1390 1391 def test_readlines(self): 1392 def bufio(): 1393 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) 1394 return self.tp(rawio) 1395 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"]) 1396 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"]) 1397 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"]) 1398 1399 def test_buffering(self): 1400 data = b"abcdefghi" 1401 dlen = len(data) 1402 1403 tests = [ 1404 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ], 1405 [ 100, [ 3, 3, 3], [ dlen ] ], 1406 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ], 1407 ] 1408 1409 for bufsize, buf_read_sizes, raw_read_sizes in tests: 1410 rawio = self.MockFileIO(data) 1411 bufio = self.tp(rawio, buffer_size=bufsize) 1412 pos = 0 1413 for nbytes in buf_read_sizes: 1414 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes]) 1415 pos += nbytes 1416 # this is mildly implementation-dependent 1417 self.assertEqual(rawio.read_history, raw_read_sizes) 1418 1419 def test_read_non_blocking(self): 1420 # Inject some None's in there to simulate EWOULDBLOCK 1421 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None)) 1422 bufio = self.tp(rawio) 1423 self.assertEqual(b"abcd", bufio.read(6)) 1424 self.assertEqual(b"e", bufio.read(1)) 1425 self.assertEqual(b"fg", bufio.read()) 1426 self.assertEqual(b"", bufio.peek(1)) 1427 self.assertIsNone(bufio.read()) 1428 self.assertEqual(b"", bufio.read()) 1429 1430 rawio = self.MockRawIO((b"a", None, None)) 1431 self.assertEqual(b"a", rawio.readall()) 1432 self.assertIsNone(rawio.readall()) 1433 1434 def test_read_past_eof(self): 1435 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1436 bufio = self.tp(rawio) 1437 1438 self.assertEqual(b"abcdefg", bufio.read(9000)) 1439 1440 def test_read_all(self): 1441 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1442 bufio = self.tp(rawio) 1443 1444 self.assertEqual(b"abcdefg", bufio.read()) 1445 1446 @support.requires_resource('cpu') 1447 def test_threads(self): 1448 try: 1449 # Write out many bytes with exactly the same number of 0's, 1450 # 1's... 255's. This will help us check that concurrent reading 1451 # doesn't duplicate or forget contents. 1452 N = 1000 1453 l = list(range(256)) * N 1454 random.shuffle(l) 1455 s = bytes(bytearray(l)) 1456 with self.open(os_helper.TESTFN, "wb") as f: 1457 f.write(s) 1458 with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw: 1459 bufio = self.tp(raw, 8) 1460 errors = [] 1461 results = [] 1462 def f(): 1463 try: 1464 # Intra-buffer read then buffer-flushing read 1465 for n in cycle([1, 19]): 1466 s = bufio.read(n) 1467 if not s: 1468 break 1469 # list.append() is atomic 1470 results.append(s) 1471 except Exception as e: 1472 errors.append(e) 1473 raise 1474 threads = [threading.Thread(target=f) for x in range(20)] 1475 with threading_helper.start_threads(threads): 1476 time.sleep(0.02) # yield 1477 self.assertFalse(errors, 1478 "the following exceptions were caught: %r" % errors) 1479 s = b''.join(results) 1480 for i in range(256): 1481 c = bytes(bytearray([i])) 1482 self.assertEqual(s.count(c), N) 1483 finally: 1484 os_helper.unlink(os_helper.TESTFN) 1485 1486 def test_unseekable(self): 1487 bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) 1488 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1489 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1490 bufio.read(1) 1491 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1492 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1493 1494 def test_misbehaved_io(self): 1495 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1496 bufio = self.tp(rawio) 1497 self.assertRaises(OSError, bufio.seek, 0) 1498 self.assertRaises(OSError, bufio.tell) 1499 1500 # Silence destructor error 1501 bufio.close = lambda: None 1502 1503 def test_no_extraneous_read(self): 1504 # Issue #9550; when the raw IO object has satisfied the read request, 1505 # we should not issue any additional reads, otherwise it may block 1506 # (e.g. socket). 1507 bufsize = 16 1508 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2): 1509 rawio = self.MockRawIO([b"x" * n]) 1510 bufio = self.tp(rawio, bufsize) 1511 self.assertEqual(bufio.read(n), b"x" * n) 1512 # Simple case: one raw read is enough to satisfy the request. 1513 self.assertEqual(rawio._extraneous_reads, 0, 1514 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1515 # A more complex case where two raw reads are needed to satisfy 1516 # the request. 1517 rawio = self.MockRawIO([b"x" * (n - 1), b"x"]) 1518 bufio = self.tp(rawio, bufsize) 1519 self.assertEqual(bufio.read(n), b"x" * n) 1520 self.assertEqual(rawio._extraneous_reads, 0, 1521 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1522 1523 def test_read_on_closed(self): 1524 # Issue #23796 1525 b = io.BufferedReader(io.BytesIO(b"12")) 1526 b.read(1) 1527 b.close() 1528 self.assertRaises(ValueError, b.peek) 1529 self.assertRaises(ValueError, b.read1, 1) 1530 1531 def test_truncate_on_read_only(self): 1532 rawio = self.MockFileIO(b"abc") 1533 bufio = self.tp(rawio) 1534 self.assertFalse(bufio.writable()) 1535 self.assertRaises(self.UnsupportedOperation, bufio.truncate) 1536 self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0) 1537 1538 1539class CBufferedReaderTest(BufferedReaderTest, SizeofTest): 1540 tp = io.BufferedReader 1541 1542 @skip_if_sanitizer(memory=True, address=True, reason= "sanitizer defaults to crashing " 1543 "instead of returning NULL for malloc failure.") 1544 def test_constructor(self): 1545 BufferedReaderTest.test_constructor(self) 1546 # The allocation can succeed on 32-bit builds, e.g. with more 1547 # than 2 GiB RAM and a 64-bit kernel. 1548 if sys.maxsize > 0x7FFFFFFF: 1549 rawio = self.MockRawIO() 1550 bufio = self.tp(rawio) 1551 self.assertRaises((OverflowError, MemoryError, ValueError), 1552 bufio.__init__, rawio, sys.maxsize) 1553 1554 def test_initialization(self): 1555 rawio = self.MockRawIO([b"abc"]) 1556 bufio = self.tp(rawio) 1557 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1558 self.assertRaises(ValueError, bufio.read) 1559 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1560 self.assertRaises(ValueError, bufio.read) 1561 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1562 self.assertRaises(ValueError, bufio.read) 1563 1564 def test_misbehaved_io_read(self): 1565 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1566 bufio = self.tp(rawio) 1567 # _pyio.BufferedReader seems to implement reading different, so that 1568 # checking this is not so easy. 1569 self.assertRaises(OSError, bufio.read, 10) 1570 1571 def test_garbage_collection(self): 1572 # C BufferedReader objects are collected. 1573 # The Python version has __del__, so it ends into gc.garbage instead 1574 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1575 with warnings_helper.check_warnings(('', ResourceWarning)): 1576 rawio = self.FileIO(os_helper.TESTFN, "w+b") 1577 f = self.tp(rawio) 1578 f.f = f 1579 wr = weakref.ref(f) 1580 del f 1581 support.gc_collect() 1582 self.assertIsNone(wr(), wr) 1583 1584 def test_args_error(self): 1585 # Issue #17275 1586 with self.assertRaisesRegex(TypeError, "BufferedReader"): 1587 self.tp(io.BytesIO(), 1024, 1024, 1024) 1588 1589 def test_bad_readinto_value(self): 1590 rawio = io.BufferedReader(io.BytesIO(b"12")) 1591 rawio.readinto = lambda buf: -1 1592 bufio = self.tp(rawio) 1593 with self.assertRaises(OSError) as cm: 1594 bufio.readline() 1595 self.assertIsNone(cm.exception.__cause__) 1596 1597 def test_bad_readinto_type(self): 1598 rawio = io.BufferedReader(io.BytesIO(b"12")) 1599 rawio.readinto = lambda buf: b'' 1600 bufio = self.tp(rawio) 1601 with self.assertRaises(OSError) as cm: 1602 bufio.readline() 1603 self.assertIsInstance(cm.exception.__cause__, TypeError) 1604 1605 1606class PyBufferedReaderTest(BufferedReaderTest): 1607 tp = pyio.BufferedReader 1608 1609 1610class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): 1611 write_mode = "wb" 1612 1613 def test_constructor(self): 1614 rawio = self.MockRawIO() 1615 bufio = self.tp(rawio) 1616 bufio.__init__(rawio) 1617 bufio.__init__(rawio, buffer_size=1024) 1618 bufio.__init__(rawio, buffer_size=16) 1619 self.assertEqual(3, bufio.write(b"abc")) 1620 bufio.flush() 1621 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1622 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1623 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1624 bufio.__init__(rawio) 1625 self.assertEqual(3, bufio.write(b"ghi")) 1626 bufio.flush() 1627 self.assertEqual(b"".join(rawio._write_stack), b"abcghi") 1628 1629 def test_uninitialized(self): 1630 bufio = self.tp.__new__(self.tp) 1631 del bufio 1632 bufio = self.tp.__new__(self.tp) 1633 self.assertRaisesRegex((ValueError, AttributeError), 1634 'uninitialized|has no attribute', 1635 bufio.write, b'') 1636 bufio.__init__(self.MockRawIO()) 1637 self.assertEqual(bufio.write(b''), 0) 1638 1639 def test_detach_flush(self): 1640 raw = self.MockRawIO() 1641 buf = self.tp(raw) 1642 buf.write(b"howdy!") 1643 self.assertFalse(raw._write_stack) 1644 buf.detach() 1645 self.assertEqual(raw._write_stack, [b"howdy!"]) 1646 1647 def test_write(self): 1648 # Write to the buffered IO but don't overflow the buffer. 1649 writer = self.MockRawIO() 1650 bufio = self.tp(writer, 8) 1651 bufio.write(b"abc") 1652 self.assertFalse(writer._write_stack) 1653 buffer = bytearray(b"def") 1654 bufio.write(buffer) 1655 buffer[:] = b"***" # Overwrite our copy of the data 1656 bufio.flush() 1657 self.assertEqual(b"".join(writer._write_stack), b"abcdef") 1658 1659 def test_write_overflow(self): 1660 writer = self.MockRawIO() 1661 bufio = self.tp(writer, 8) 1662 contents = b"abcdefghijklmnop" 1663 for n in range(0, len(contents), 3): 1664 bufio.write(contents[n:n+3]) 1665 flushed = b"".join(writer._write_stack) 1666 # At least (total - 8) bytes were implicitly flushed, perhaps more 1667 # depending on the implementation. 1668 self.assertTrue(flushed.startswith(contents[:-8]), flushed) 1669 1670 def check_writes(self, intermediate_func): 1671 # Lots of writes, test the flushed output is as expected. 1672 contents = bytes(range(256)) * 1000 1673 n = 0 1674 writer = self.MockRawIO() 1675 bufio = self.tp(writer, 13) 1676 # Generator of write sizes: repeat each N 15 times then proceed to N+1 1677 def gen_sizes(): 1678 for size in count(1): 1679 for i in range(15): 1680 yield size 1681 sizes = gen_sizes() 1682 while n < len(contents): 1683 size = min(next(sizes), len(contents) - n) 1684 self.assertEqual(bufio.write(contents[n:n+size]), size) 1685 intermediate_func(bufio) 1686 n += size 1687 bufio.flush() 1688 self.assertEqual(contents, b"".join(writer._write_stack)) 1689 1690 def test_writes(self): 1691 self.check_writes(lambda bufio: None) 1692 1693 def test_writes_and_flushes(self): 1694 self.check_writes(lambda bufio: bufio.flush()) 1695 1696 def test_writes_and_seeks(self): 1697 def _seekabs(bufio): 1698 pos = bufio.tell() 1699 bufio.seek(pos + 1, 0) 1700 bufio.seek(pos - 1, 0) 1701 bufio.seek(pos, 0) 1702 self.check_writes(_seekabs) 1703 def _seekrel(bufio): 1704 pos = bufio.seek(0, 1) 1705 bufio.seek(+1, 1) 1706 bufio.seek(-1, 1) 1707 bufio.seek(pos, 0) 1708 self.check_writes(_seekrel) 1709 1710 def test_writes_and_truncates(self): 1711 self.check_writes(lambda bufio: bufio.truncate(bufio.tell())) 1712 1713 def test_write_non_blocking(self): 1714 raw = self.MockNonBlockWriterIO() 1715 bufio = self.tp(raw, 8) 1716 1717 self.assertEqual(bufio.write(b"abcd"), 4) 1718 self.assertEqual(bufio.write(b"efghi"), 5) 1719 # 1 byte will be written, the rest will be buffered 1720 raw.block_on(b"k") 1721 self.assertEqual(bufio.write(b"jklmn"), 5) 1722 1723 # 8 bytes will be written, 8 will be buffered and the rest will be lost 1724 raw.block_on(b"0") 1725 try: 1726 bufio.write(b"opqrwxyz0123456789") 1727 except self.BlockingIOError as e: 1728 written = e.characters_written 1729 else: 1730 self.fail("BlockingIOError should have been raised") 1731 self.assertEqual(written, 16) 1732 self.assertEqual(raw.pop_written(), 1733 b"abcdefghijklmnopqrwxyz") 1734 1735 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9) 1736 s = raw.pop_written() 1737 # Previously buffered bytes were flushed 1738 self.assertTrue(s.startswith(b"01234567A"), s) 1739 1740 def test_write_and_rewind(self): 1741 raw = io.BytesIO() 1742 bufio = self.tp(raw, 4) 1743 self.assertEqual(bufio.write(b"abcdef"), 6) 1744 self.assertEqual(bufio.tell(), 6) 1745 bufio.seek(0, 0) 1746 self.assertEqual(bufio.write(b"XY"), 2) 1747 bufio.seek(6, 0) 1748 self.assertEqual(raw.getvalue(), b"XYcdef") 1749 self.assertEqual(bufio.write(b"123456"), 6) 1750 bufio.flush() 1751 self.assertEqual(raw.getvalue(), b"XYcdef123456") 1752 1753 def test_flush(self): 1754 writer = self.MockRawIO() 1755 bufio = self.tp(writer, 8) 1756 bufio.write(b"abc") 1757 bufio.flush() 1758 self.assertEqual(b"abc", writer._write_stack[0]) 1759 1760 def test_writelines(self): 1761 l = [b'ab', b'cd', b'ef'] 1762 writer = self.MockRawIO() 1763 bufio = self.tp(writer, 8) 1764 bufio.writelines(l) 1765 bufio.flush() 1766 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1767 1768 def test_writelines_userlist(self): 1769 l = UserList([b'ab', b'cd', b'ef']) 1770 writer = self.MockRawIO() 1771 bufio = self.tp(writer, 8) 1772 bufio.writelines(l) 1773 bufio.flush() 1774 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1775 1776 def test_writelines_error(self): 1777 writer = self.MockRawIO() 1778 bufio = self.tp(writer, 8) 1779 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3]) 1780 self.assertRaises(TypeError, bufio.writelines, None) 1781 self.assertRaises(TypeError, bufio.writelines, 'abc') 1782 1783 def test_destructor(self): 1784 writer = self.MockRawIO() 1785 bufio = self.tp(writer, 8) 1786 bufio.write(b"abc") 1787 del bufio 1788 support.gc_collect() 1789 self.assertEqual(b"abc", writer._write_stack[0]) 1790 1791 def test_truncate(self): 1792 # Truncate implicitly flushes the buffer. 1793 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1794 with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw: 1795 bufio = self.tp(raw, 8) 1796 bufio.write(b"abcdef") 1797 self.assertEqual(bufio.truncate(3), 3) 1798 self.assertEqual(bufio.tell(), 6) 1799 with self.open(os_helper.TESTFN, "rb", buffering=0) as f: 1800 self.assertEqual(f.read(), b"abc") 1801 1802 def test_truncate_after_write(self): 1803 # Ensure that truncate preserves the file position after 1804 # writes longer than the buffer size. 1805 # Issue: https://bugs.python.org/issue32228 1806 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1807 with self.open(os_helper.TESTFN, "wb") as f: 1808 # Fill with some buffer 1809 f.write(b'\x00' * 10000) 1810 buffer_sizes = [8192, 4096, 200] 1811 for buffer_size in buffer_sizes: 1812 with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f: 1813 f.write(b'\x00' * (buffer_size + 1)) 1814 # After write write_pos and write_end are set to 0 1815 f.read(1) 1816 # read operation makes sure that pos != raw_pos 1817 f.truncate() 1818 self.assertEqual(f.tell(), buffer_size + 2) 1819 1820 @support.requires_resource('cpu') 1821 def test_threads(self): 1822 try: 1823 # Write out many bytes from many threads and test they were 1824 # all flushed. 1825 N = 1000 1826 contents = bytes(range(256)) * N 1827 sizes = cycle([1, 19]) 1828 n = 0 1829 queue = deque() 1830 while n < len(contents): 1831 size = next(sizes) 1832 queue.append(contents[n:n+size]) 1833 n += size 1834 del contents 1835 # We use a real file object because it allows us to 1836 # exercise situations where the GIL is released before 1837 # writing the buffer to the raw streams. This is in addition 1838 # to concurrency issues due to switching threads in the middle 1839 # of Python code. 1840 with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw: 1841 bufio = self.tp(raw, 8) 1842 errors = [] 1843 def f(): 1844 try: 1845 while True: 1846 try: 1847 s = queue.popleft() 1848 except IndexError: 1849 return 1850 bufio.write(s) 1851 except Exception as e: 1852 errors.append(e) 1853 raise 1854 threads = [threading.Thread(target=f) for x in range(20)] 1855 with threading_helper.start_threads(threads): 1856 time.sleep(0.02) # yield 1857 self.assertFalse(errors, 1858 "the following exceptions were caught: %r" % errors) 1859 bufio.close() 1860 with self.open(os_helper.TESTFN, "rb") as f: 1861 s = f.read() 1862 for i in range(256): 1863 self.assertEqual(s.count(bytes([i])), N) 1864 finally: 1865 os_helper.unlink(os_helper.TESTFN) 1866 1867 def test_misbehaved_io(self): 1868 rawio = self.MisbehavedRawIO() 1869 bufio = self.tp(rawio, 5) 1870 self.assertRaises(OSError, bufio.seek, 0) 1871 self.assertRaises(OSError, bufio.tell) 1872 self.assertRaises(OSError, bufio.write, b"abcdef") 1873 1874 # Silence destructor error 1875 bufio.close = lambda: None 1876 1877 def test_max_buffer_size_removal(self): 1878 with self.assertRaises(TypeError): 1879 self.tp(self.MockRawIO(), 8, 12) 1880 1881 def test_write_error_on_close(self): 1882 raw = self.MockRawIO() 1883 def bad_write(b): 1884 raise OSError() 1885 raw.write = bad_write 1886 b = self.tp(raw) 1887 b.write(b'spam') 1888 self.assertRaises(OSError, b.close) # exception not swallowed 1889 self.assertTrue(b.closed) 1890 1891 def test_slow_close_from_thread(self): 1892 # Issue #31976 1893 rawio = self.SlowFlushRawIO() 1894 bufio = self.tp(rawio, 8) 1895 t = threading.Thread(target=bufio.close) 1896 t.start() 1897 rawio.in_flush.wait() 1898 self.assertRaises(ValueError, bufio.write, b'spam') 1899 self.assertTrue(bufio.closed) 1900 t.join() 1901 1902 1903 1904class CBufferedWriterTest(BufferedWriterTest, SizeofTest): 1905 tp = io.BufferedWriter 1906 1907 @skip_if_sanitizer(memory=True, address=True, reason= "sanitizer defaults to crashing " 1908 "instead of returning NULL for malloc failure.") 1909 def test_constructor(self): 1910 BufferedWriterTest.test_constructor(self) 1911 # The allocation can succeed on 32-bit builds, e.g. with more 1912 # than 2 GiB RAM and a 64-bit kernel. 1913 if sys.maxsize > 0x7FFFFFFF: 1914 rawio = self.MockRawIO() 1915 bufio = self.tp(rawio) 1916 self.assertRaises((OverflowError, MemoryError, ValueError), 1917 bufio.__init__, rawio, sys.maxsize) 1918 1919 def test_initialization(self): 1920 rawio = self.MockRawIO() 1921 bufio = self.tp(rawio) 1922 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1923 self.assertRaises(ValueError, bufio.write, b"def") 1924 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1925 self.assertRaises(ValueError, bufio.write, b"def") 1926 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1927 self.assertRaises(ValueError, bufio.write, b"def") 1928 1929 def test_garbage_collection(self): 1930 # C BufferedWriter objects are collected, and collecting them flushes 1931 # all data to disk. 1932 # The Python version has __del__, so it ends into gc.garbage instead 1933 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1934 with warnings_helper.check_warnings(('', ResourceWarning)): 1935 rawio = self.FileIO(os_helper.TESTFN, "w+b") 1936 f = self.tp(rawio) 1937 f.write(b"123xxx") 1938 f.x = f 1939 wr = weakref.ref(f) 1940 del f 1941 support.gc_collect() 1942 self.assertIsNone(wr(), wr) 1943 with self.open(os_helper.TESTFN, "rb") as f: 1944 self.assertEqual(f.read(), b"123xxx") 1945 1946 def test_args_error(self): 1947 # Issue #17275 1948 with self.assertRaisesRegex(TypeError, "BufferedWriter"): 1949 self.tp(io.BytesIO(), 1024, 1024, 1024) 1950 1951 1952class PyBufferedWriterTest(BufferedWriterTest): 1953 tp = pyio.BufferedWriter 1954 1955class BufferedRWPairTest(unittest.TestCase): 1956 1957 def test_constructor(self): 1958 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1959 self.assertFalse(pair.closed) 1960 1961 def test_uninitialized(self): 1962 pair = self.tp.__new__(self.tp) 1963 del pair 1964 pair = self.tp.__new__(self.tp) 1965 self.assertRaisesRegex((ValueError, AttributeError), 1966 'uninitialized|has no attribute', 1967 pair.read, 0) 1968 self.assertRaisesRegex((ValueError, AttributeError), 1969 'uninitialized|has no attribute', 1970 pair.write, b'') 1971 pair.__init__(self.MockRawIO(), self.MockRawIO()) 1972 self.assertEqual(pair.read(0), b'') 1973 self.assertEqual(pair.write(b''), 0) 1974 1975 def test_detach(self): 1976 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1977 self.assertRaises(self.UnsupportedOperation, pair.detach) 1978 1979 def test_constructor_max_buffer_size_removal(self): 1980 with self.assertRaises(TypeError): 1981 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) 1982 1983 def test_constructor_with_not_readable(self): 1984 class NotReadable(MockRawIO): 1985 def readable(self): 1986 return False 1987 1988 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO()) 1989 1990 def test_constructor_with_not_writeable(self): 1991 class NotWriteable(MockRawIO): 1992 def writable(self): 1993 return False 1994 1995 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable()) 1996 1997 def test_read(self): 1998 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1999 2000 self.assertEqual(pair.read(3), b"abc") 2001 self.assertEqual(pair.read(1), b"d") 2002 self.assertEqual(pair.read(), b"ef") 2003 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO()) 2004 self.assertEqual(pair.read(None), b"abc") 2005 2006 def test_readlines(self): 2007 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO()) 2008 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 2009 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 2010 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"]) 2011 2012 def test_read1(self): 2013 # .read1() is delegated to the underlying reader object, so this test 2014 # can be shallow. 2015 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 2016 2017 self.assertEqual(pair.read1(3), b"abc") 2018 self.assertEqual(pair.read1(), b"def") 2019 2020 def test_readinto(self): 2021 for method in ("readinto", "readinto1"): 2022 with self.subTest(method): 2023 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 2024 2025 data = byteslike(b'\0' * 5) 2026 self.assertEqual(getattr(pair, method)(data), 5) 2027 self.assertEqual(bytes(data), b"abcde") 2028 2029 def test_write(self): 2030 w = self.MockRawIO() 2031 pair = self.tp(self.MockRawIO(), w) 2032 2033 pair.write(b"abc") 2034 pair.flush() 2035 buffer = bytearray(b"def") 2036 pair.write(buffer) 2037 buffer[:] = b"***" # Overwrite our copy of the data 2038 pair.flush() 2039 self.assertEqual(w._write_stack, [b"abc", b"def"]) 2040 2041 def test_peek(self): 2042 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 2043 2044 self.assertTrue(pair.peek(3).startswith(b"abc")) 2045 self.assertEqual(pair.read(3), b"abc") 2046 2047 def test_readable(self): 2048 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2049 self.assertTrue(pair.readable()) 2050 2051 def test_writeable(self): 2052 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2053 self.assertTrue(pair.writable()) 2054 2055 def test_seekable(self): 2056 # BufferedRWPairs are never seekable, even if their readers and writers 2057 # are. 2058 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2059 self.assertFalse(pair.seekable()) 2060 2061 # .flush() is delegated to the underlying writer object and has been 2062 # tested in the test_write method. 2063 2064 def test_close_and_closed(self): 2065 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2066 self.assertFalse(pair.closed) 2067 pair.close() 2068 self.assertTrue(pair.closed) 2069 2070 def test_reader_close_error_on_close(self): 2071 def reader_close(): 2072 reader_non_existing 2073 reader = self.MockRawIO() 2074 reader.close = reader_close 2075 writer = self.MockRawIO() 2076 pair = self.tp(reader, writer) 2077 with self.assertRaises(NameError) as err: 2078 pair.close() 2079 self.assertIn('reader_non_existing', str(err.exception)) 2080 self.assertTrue(pair.closed) 2081 self.assertFalse(reader.closed) 2082 self.assertTrue(writer.closed) 2083 2084 # Silence destructor error 2085 reader.close = lambda: None 2086 2087 def test_writer_close_error_on_close(self): 2088 def writer_close(): 2089 writer_non_existing 2090 reader = self.MockRawIO() 2091 writer = self.MockRawIO() 2092 writer.close = writer_close 2093 pair = self.tp(reader, writer) 2094 with self.assertRaises(NameError) as err: 2095 pair.close() 2096 self.assertIn('writer_non_existing', str(err.exception)) 2097 self.assertFalse(pair.closed) 2098 self.assertTrue(reader.closed) 2099 self.assertFalse(writer.closed) 2100 2101 # Silence destructor error 2102 writer.close = lambda: None 2103 writer = None 2104 2105 # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception 2106 with support.catch_unraisable_exception(): 2107 # Ignore BufferedRWPair unraisable exception 2108 with support.catch_unraisable_exception(): 2109 pair = None 2110 support.gc_collect() 2111 support.gc_collect() 2112 2113 def test_reader_writer_close_error_on_close(self): 2114 def reader_close(): 2115 reader_non_existing 2116 def writer_close(): 2117 writer_non_existing 2118 reader = self.MockRawIO() 2119 reader.close = reader_close 2120 writer = self.MockRawIO() 2121 writer.close = writer_close 2122 pair = self.tp(reader, writer) 2123 with self.assertRaises(NameError) as err: 2124 pair.close() 2125 self.assertIn('reader_non_existing', str(err.exception)) 2126 self.assertIsInstance(err.exception.__context__, NameError) 2127 self.assertIn('writer_non_existing', str(err.exception.__context__)) 2128 self.assertFalse(pair.closed) 2129 self.assertFalse(reader.closed) 2130 self.assertFalse(writer.closed) 2131 2132 # Silence destructor error 2133 reader.close = lambda: None 2134 writer.close = lambda: None 2135 2136 def test_isatty(self): 2137 class SelectableIsAtty(MockRawIO): 2138 def __init__(self, isatty): 2139 MockRawIO.__init__(self) 2140 self._isatty = isatty 2141 2142 def isatty(self): 2143 return self._isatty 2144 2145 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False)) 2146 self.assertFalse(pair.isatty()) 2147 2148 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False)) 2149 self.assertTrue(pair.isatty()) 2150 2151 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True)) 2152 self.assertTrue(pair.isatty()) 2153 2154 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True)) 2155 self.assertTrue(pair.isatty()) 2156 2157 def test_weakref_clearing(self): 2158 brw = self.tp(self.MockRawIO(), self.MockRawIO()) 2159 ref = weakref.ref(brw) 2160 brw = None 2161 ref = None # Shouldn't segfault. 2162 2163class CBufferedRWPairTest(BufferedRWPairTest): 2164 tp = io.BufferedRWPair 2165 2166class PyBufferedRWPairTest(BufferedRWPairTest): 2167 tp = pyio.BufferedRWPair 2168 2169 2170class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): 2171 read_mode = "rb+" 2172 write_mode = "wb+" 2173 2174 def test_constructor(self): 2175 BufferedReaderTest.test_constructor(self) 2176 BufferedWriterTest.test_constructor(self) 2177 2178 def test_uninitialized(self): 2179 BufferedReaderTest.test_uninitialized(self) 2180 BufferedWriterTest.test_uninitialized(self) 2181 2182 def test_read_and_write(self): 2183 raw = self.MockRawIO((b"asdf", b"ghjk")) 2184 rw = self.tp(raw, 8) 2185 2186 self.assertEqual(b"as", rw.read(2)) 2187 rw.write(b"ddd") 2188 rw.write(b"eee") 2189 self.assertFalse(raw._write_stack) # Buffer writes 2190 self.assertEqual(b"ghjk", rw.read()) 2191 self.assertEqual(b"dddeee", raw._write_stack[0]) 2192 2193 def test_seek_and_tell(self): 2194 raw = self.BytesIO(b"asdfghjkl") 2195 rw = self.tp(raw) 2196 2197 self.assertEqual(b"as", rw.read(2)) 2198 self.assertEqual(2, rw.tell()) 2199 rw.seek(0, 0) 2200 self.assertEqual(b"asdf", rw.read(4)) 2201 2202 rw.write(b"123f") 2203 rw.seek(0, 0) 2204 self.assertEqual(b"asdf123fl", rw.read()) 2205 self.assertEqual(9, rw.tell()) 2206 rw.seek(-4, 2) 2207 self.assertEqual(5, rw.tell()) 2208 rw.seek(2, 1) 2209 self.assertEqual(7, rw.tell()) 2210 self.assertEqual(b"fl", rw.read(11)) 2211 rw.flush() 2212 self.assertEqual(b"asdf123fl", raw.getvalue()) 2213 2214 self.assertRaises(TypeError, rw.seek, 0.0) 2215 2216 def check_flush_and_read(self, read_func): 2217 raw = self.BytesIO(b"abcdefghi") 2218 bufio = self.tp(raw) 2219 2220 self.assertEqual(b"ab", read_func(bufio, 2)) 2221 bufio.write(b"12") 2222 self.assertEqual(b"ef", read_func(bufio, 2)) 2223 self.assertEqual(6, bufio.tell()) 2224 bufio.flush() 2225 self.assertEqual(6, bufio.tell()) 2226 self.assertEqual(b"ghi", read_func(bufio)) 2227 raw.seek(0, 0) 2228 raw.write(b"XYZ") 2229 # flush() resets the read buffer 2230 bufio.flush() 2231 bufio.seek(0, 0) 2232 self.assertEqual(b"XYZ", read_func(bufio, 3)) 2233 2234 def test_flush_and_read(self): 2235 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args)) 2236 2237 def test_flush_and_readinto(self): 2238 def _readinto(bufio, n=-1): 2239 b = bytearray(n if n >= 0 else 9999) 2240 n = bufio.readinto(b) 2241 return bytes(b[:n]) 2242 self.check_flush_and_read(_readinto) 2243 2244 def test_flush_and_peek(self): 2245 def _peek(bufio, n=-1): 2246 # This relies on the fact that the buffer can contain the whole 2247 # raw stream, otherwise peek() can return less. 2248 b = bufio.peek(n) 2249 if n != -1: 2250 b = b[:n] 2251 bufio.seek(len(b), 1) 2252 return b 2253 self.check_flush_and_read(_peek) 2254 2255 def test_flush_and_write(self): 2256 raw = self.BytesIO(b"abcdefghi") 2257 bufio = self.tp(raw) 2258 2259 bufio.write(b"123") 2260 bufio.flush() 2261 bufio.write(b"45") 2262 bufio.flush() 2263 bufio.seek(0, 0) 2264 self.assertEqual(b"12345fghi", raw.getvalue()) 2265 self.assertEqual(b"12345fghi", bufio.read()) 2266 2267 def test_threads(self): 2268 BufferedReaderTest.test_threads(self) 2269 BufferedWriterTest.test_threads(self) 2270 2271 def test_writes_and_peek(self): 2272 def _peek(bufio): 2273 bufio.peek(1) 2274 self.check_writes(_peek) 2275 def _peek(bufio): 2276 pos = bufio.tell() 2277 bufio.seek(-1, 1) 2278 bufio.peek(1) 2279 bufio.seek(pos, 0) 2280 self.check_writes(_peek) 2281 2282 def test_writes_and_reads(self): 2283 def _read(bufio): 2284 bufio.seek(-1, 1) 2285 bufio.read(1) 2286 self.check_writes(_read) 2287 2288 def test_writes_and_read1s(self): 2289 def _read1(bufio): 2290 bufio.seek(-1, 1) 2291 bufio.read1(1) 2292 self.check_writes(_read1) 2293 2294 def test_writes_and_readintos(self): 2295 def _read(bufio): 2296 bufio.seek(-1, 1) 2297 bufio.readinto(bytearray(1)) 2298 self.check_writes(_read) 2299 2300 def test_write_after_readahead(self): 2301 # Issue #6629: writing after the buffer was filled by readahead should 2302 # first rewind the raw stream. 2303 for overwrite_size in [1, 5]: 2304 raw = self.BytesIO(b"A" * 10) 2305 bufio = self.tp(raw, 4) 2306 # Trigger readahead 2307 self.assertEqual(bufio.read(1), b"A") 2308 self.assertEqual(bufio.tell(), 1) 2309 # Overwriting should rewind the raw stream if it needs so 2310 bufio.write(b"B" * overwrite_size) 2311 self.assertEqual(bufio.tell(), overwrite_size + 1) 2312 # If the write size was smaller than the buffer size, flush() and 2313 # check that rewind happens. 2314 bufio.flush() 2315 self.assertEqual(bufio.tell(), overwrite_size + 1) 2316 s = raw.getvalue() 2317 self.assertEqual(s, 2318 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size)) 2319 2320 def test_write_rewind_write(self): 2321 # Various combinations of reading / writing / seeking backwards / writing again 2322 def mutate(bufio, pos1, pos2): 2323 assert pos2 >= pos1 2324 # Fill the buffer 2325 bufio.seek(pos1) 2326 bufio.read(pos2 - pos1) 2327 bufio.write(b'\x02') 2328 # This writes earlier than the previous write, but still inside 2329 # the buffer. 2330 bufio.seek(pos1) 2331 bufio.write(b'\x01') 2332 2333 b = b"\x80\x81\x82\x83\x84" 2334 for i in range(0, len(b)): 2335 for j in range(i, len(b)): 2336 raw = self.BytesIO(b) 2337 bufio = self.tp(raw, 100) 2338 mutate(bufio, i, j) 2339 bufio.flush() 2340 expected = bytearray(b) 2341 expected[j] = 2 2342 expected[i] = 1 2343 self.assertEqual(raw.getvalue(), expected, 2344 "failed result for i=%d, j=%d" % (i, j)) 2345 2346 def test_truncate_after_read_or_write(self): 2347 raw = self.BytesIO(b"A" * 10) 2348 bufio = self.tp(raw, 100) 2349 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled 2350 self.assertEqual(bufio.truncate(), 2) 2351 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases 2352 self.assertEqual(bufio.truncate(), 4) 2353 2354 def test_misbehaved_io(self): 2355 BufferedReaderTest.test_misbehaved_io(self) 2356 BufferedWriterTest.test_misbehaved_io(self) 2357 2358 def test_interleaved_read_write(self): 2359 # Test for issue #12213 2360 with self.BytesIO(b'abcdefgh') as raw: 2361 with self.tp(raw, 100) as f: 2362 f.write(b"1") 2363 self.assertEqual(f.read(1), b'b') 2364 f.write(b'2') 2365 self.assertEqual(f.read1(1), b'd') 2366 f.write(b'3') 2367 buf = bytearray(1) 2368 f.readinto(buf) 2369 self.assertEqual(buf, b'f') 2370 f.write(b'4') 2371 self.assertEqual(f.peek(1), b'h') 2372 f.flush() 2373 self.assertEqual(raw.getvalue(), b'1b2d3f4h') 2374 2375 with self.BytesIO(b'abc') as raw: 2376 with self.tp(raw, 100) as f: 2377 self.assertEqual(f.read(1), b'a') 2378 f.write(b"2") 2379 self.assertEqual(f.read(1), b'c') 2380 f.flush() 2381 self.assertEqual(raw.getvalue(), b'a2c') 2382 2383 def test_interleaved_readline_write(self): 2384 with self.BytesIO(b'ab\ncdef\ng\n') as raw: 2385 with self.tp(raw) as f: 2386 f.write(b'1') 2387 self.assertEqual(f.readline(), b'b\n') 2388 f.write(b'2') 2389 self.assertEqual(f.readline(), b'def\n') 2390 f.write(b'3') 2391 self.assertEqual(f.readline(), b'\n') 2392 f.flush() 2393 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n') 2394 2395 # You can't construct a BufferedRandom over a non-seekable stream. 2396 test_unseekable = None 2397 2398 # writable() returns True, so there's no point to test it over 2399 # a writable stream. 2400 test_truncate_on_read_only = None 2401 2402 2403class CBufferedRandomTest(BufferedRandomTest, SizeofTest): 2404 tp = io.BufferedRandom 2405 2406 @skip_if_sanitizer(memory=True, address=True, reason= "sanitizer defaults to crashing " 2407 "instead of returning NULL for malloc failure.") 2408 def test_constructor(self): 2409 BufferedRandomTest.test_constructor(self) 2410 # The allocation can succeed on 32-bit builds, e.g. with more 2411 # than 2 GiB RAM and a 64-bit kernel. 2412 if sys.maxsize > 0x7FFFFFFF: 2413 rawio = self.MockRawIO() 2414 bufio = self.tp(rawio) 2415 self.assertRaises((OverflowError, MemoryError, ValueError), 2416 bufio.__init__, rawio, sys.maxsize) 2417 2418 def test_garbage_collection(self): 2419 CBufferedReaderTest.test_garbage_collection(self) 2420 CBufferedWriterTest.test_garbage_collection(self) 2421 2422 def test_args_error(self): 2423 # Issue #17275 2424 with self.assertRaisesRegex(TypeError, "BufferedRandom"): 2425 self.tp(io.BytesIO(), 1024, 1024, 1024) 2426 2427 2428class PyBufferedRandomTest(BufferedRandomTest): 2429 tp = pyio.BufferedRandom 2430 2431 2432# To fully exercise seek/tell, the StatefulIncrementalDecoder has these 2433# properties: 2434# - A single output character can correspond to many bytes of input. 2435# - The number of input bytes to complete the character can be 2436# undetermined until the last input byte is received. 2437# - The number of input bytes can vary depending on previous input. 2438# - A single input byte can correspond to many characters of output. 2439# - The number of output characters can be undetermined until the 2440# last input byte is received. 2441# - The number of output characters can vary depending on previous input. 2442 2443class StatefulIncrementalDecoder(codecs.IncrementalDecoder): 2444 """ 2445 For testing seek/tell behavior with a stateful, buffering decoder. 2446 2447 Input is a sequence of words. Words may be fixed-length (length set 2448 by input) or variable-length (period-terminated). In variable-length 2449 mode, extra periods are ignored. Possible words are: 2450 - 'i' followed by a number sets the input length, I (maximum 99). 2451 When I is set to 0, words are space-terminated. 2452 - 'o' followed by a number sets the output length, O (maximum 99). 2453 - Any other word is converted into a word followed by a period on 2454 the output. The output word consists of the input word truncated 2455 or padded out with hyphens to make its length equal to O. If O 2456 is 0, the word is output verbatim without truncating or padding. 2457 I and O are initially set to 1. When I changes, any buffered input is 2458 re-scanned according to the new I. EOF also terminates the last word. 2459 """ 2460 2461 def __init__(self, errors='strict'): 2462 codecs.IncrementalDecoder.__init__(self, errors) 2463 self.reset() 2464 2465 def __repr__(self): 2466 return '<SID %x>' % id(self) 2467 2468 def reset(self): 2469 self.i = 1 2470 self.o = 1 2471 self.buffer = bytearray() 2472 2473 def getstate(self): 2474 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset() 2475 return bytes(self.buffer), i*100 + o 2476 2477 def setstate(self, state): 2478 buffer, io = state 2479 self.buffer = bytearray(buffer) 2480 i, o = divmod(io, 100) 2481 self.i, self.o = i ^ 1, o ^ 1 2482 2483 def decode(self, input, final=False): 2484 output = '' 2485 for b in input: 2486 if self.i == 0: # variable-length, terminated with period 2487 if b == ord('.'): 2488 if self.buffer: 2489 output += self.process_word() 2490 else: 2491 self.buffer.append(b) 2492 else: # fixed-length, terminate after self.i bytes 2493 self.buffer.append(b) 2494 if len(self.buffer) == self.i: 2495 output += self.process_word() 2496 if final and self.buffer: # EOF terminates the last word 2497 output += self.process_word() 2498 return output 2499 2500 def process_word(self): 2501 output = '' 2502 if self.buffer[0] == ord('i'): 2503 self.i = min(99, int(self.buffer[1:] or 0)) # set input length 2504 elif self.buffer[0] == ord('o'): 2505 self.o = min(99, int(self.buffer[1:] or 0)) # set output length 2506 else: 2507 output = self.buffer.decode('ascii') 2508 if len(output) < self.o: 2509 output += '-'*self.o # pad out with hyphens 2510 if self.o: 2511 output = output[:self.o] # truncate to output length 2512 output += '.' 2513 self.buffer = bytearray() 2514 return output 2515 2516 codecEnabled = False 2517 2518 2519# bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak 2520# when registering codecs and cleanup functions. 2521def lookupTestDecoder(name): 2522 if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder': 2523 latin1 = codecs.lookup('latin-1') 2524 return codecs.CodecInfo( 2525 name='test_decoder', encode=latin1.encode, decode=None, 2526 incrementalencoder=None, 2527 streamreader=None, streamwriter=None, 2528 incrementaldecoder=StatefulIncrementalDecoder) 2529 2530 2531class StatefulIncrementalDecoderTest(unittest.TestCase): 2532 """ 2533 Make sure the StatefulIncrementalDecoder actually works. 2534 """ 2535 2536 test_cases = [ 2537 # I=1, O=1 (fixed-length input == fixed-length output) 2538 (b'abcd', False, 'a.b.c.d.'), 2539 # I=0, O=0 (variable-length input, variable-length output) 2540 (b'oiabcd', True, 'abcd.'), 2541 # I=0, O=0 (should ignore extra periods) 2542 (b'oi...abcd...', True, 'abcd.'), 2543 # I=0, O=6 (variable-length input, fixed-length output) 2544 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'), 2545 # I=2, O=6 (fixed-length input < fixed-length output) 2546 (b'i.i2.o6xyz', True, 'xy----.z-----.'), 2547 # I=6, O=3 (fixed-length input > fixed-length output) 2548 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'), 2549 # I=0, then 3; O=29, then 15 (with longer output) 2550 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True, 2551 'a----------------------------.' + 2552 'b----------------------------.' + 2553 'cde--------------------------.' + 2554 'abcdefghijabcde.' + 2555 'a.b------------.' + 2556 '.c.------------.' + 2557 'd.e------------.' + 2558 'k--------------.' + 2559 'l--------------.' + 2560 'm--------------.') 2561 ] 2562 2563 def test_decoder(self): 2564 # Try a few one-shot test cases. 2565 for input, eof, output in self.test_cases: 2566 d = StatefulIncrementalDecoder() 2567 self.assertEqual(d.decode(input, eof), output) 2568 2569 # Also test an unfinished decode, followed by forcing EOF. 2570 d = StatefulIncrementalDecoder() 2571 self.assertEqual(d.decode(b'oiabcd'), '') 2572 self.assertEqual(d.decode(b'', 1), 'abcd.') 2573 2574class TextIOWrapperTest(unittest.TestCase): 2575 2576 def setUp(self): 2577 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" 2578 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") 2579 os_helper.unlink(os_helper.TESTFN) 2580 codecs.register(lookupTestDecoder) 2581 self.addCleanup(codecs.unregister, lookupTestDecoder) 2582 2583 def tearDown(self): 2584 os_helper.unlink(os_helper.TESTFN) 2585 2586 def test_constructor(self): 2587 r = self.BytesIO(b"\xc3\xa9\n\n") 2588 b = self.BufferedReader(r, 1000) 2589 t = self.TextIOWrapper(b, encoding="utf-8") 2590 t.__init__(b, encoding="latin-1", newline="\r\n") 2591 self.assertEqual(t.encoding, "latin-1") 2592 self.assertEqual(t.line_buffering, False) 2593 t.__init__(b, encoding="utf-8", line_buffering=True) 2594 self.assertEqual(t.encoding, "utf-8") 2595 self.assertEqual(t.line_buffering, True) 2596 self.assertEqual("\xe9\n", t.readline()) 2597 self.assertRaises(TypeError, t.__init__, b, encoding="utf-8", newline=42) 2598 self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy') 2599 2600 def test_uninitialized(self): 2601 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2602 del t 2603 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2604 self.assertRaises(Exception, repr, t) 2605 self.assertRaisesRegex((ValueError, AttributeError), 2606 'uninitialized|has no attribute', 2607 t.read, 0) 2608 t.__init__(self.MockRawIO(), encoding="utf-8") 2609 self.assertEqual(t.read(0), '') 2610 2611 def test_non_text_encoding_codecs_are_rejected(self): 2612 # Ensure the constructor complains if passed a codec that isn't 2613 # marked as a text encoding 2614 # http://bugs.python.org/issue20404 2615 r = self.BytesIO() 2616 b = self.BufferedWriter(r) 2617 with self.assertRaisesRegex(LookupError, "is not a text encoding"): 2618 self.TextIOWrapper(b, encoding="hex") 2619 2620 def test_detach(self): 2621 r = self.BytesIO() 2622 b = self.BufferedWriter(r) 2623 t = self.TextIOWrapper(b, encoding="ascii") 2624 self.assertIs(t.detach(), b) 2625 2626 t = self.TextIOWrapper(b, encoding="ascii") 2627 t.write("howdy") 2628 self.assertFalse(r.getvalue()) 2629 t.detach() 2630 self.assertEqual(r.getvalue(), b"howdy") 2631 self.assertRaises(ValueError, t.detach) 2632 2633 # Operations independent of the detached stream should still work 2634 repr(t) 2635 self.assertEqual(t.encoding, "ascii") 2636 self.assertEqual(t.errors, "strict") 2637 self.assertFalse(t.line_buffering) 2638 self.assertFalse(t.write_through) 2639 2640 def test_repr(self): 2641 raw = self.BytesIO("hello".encode("utf-8")) 2642 b = self.BufferedReader(raw) 2643 t = self.TextIOWrapper(b, encoding="utf-8") 2644 modname = self.TextIOWrapper.__module__ 2645 self.assertRegex(repr(t), 2646 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname) 2647 raw.name = "dummy" 2648 self.assertRegex(repr(t), 2649 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname) 2650 t.mode = "r" 2651 self.assertRegex(repr(t), 2652 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname) 2653 raw.name = b"dummy" 2654 self.assertRegex(repr(t), 2655 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname) 2656 2657 t.buffer.detach() 2658 repr(t) # Should not raise an exception 2659 2660 def test_recursive_repr(self): 2661 # Issue #25455 2662 raw = self.BytesIO() 2663 t = self.TextIOWrapper(raw, encoding="utf-8") 2664 with support.swap_attr(raw, 'name', t): 2665 try: 2666 repr(t) # Should not crash 2667 except RuntimeError: 2668 pass 2669 2670 def test_line_buffering(self): 2671 r = self.BytesIO() 2672 b = self.BufferedWriter(r, 1000) 2673 t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True) 2674 t.write("X") 2675 self.assertEqual(r.getvalue(), b"") # No flush happened 2676 t.write("Y\nZ") 2677 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed 2678 t.write("A\rB") 2679 self.assertEqual(r.getvalue(), b"XY\nZA\rB") 2680 2681 def test_reconfigure_line_buffering(self): 2682 r = self.BytesIO() 2683 b = self.BufferedWriter(r, 1000) 2684 t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False) 2685 t.write("AB\nC") 2686 self.assertEqual(r.getvalue(), b"") 2687 2688 t.reconfigure(line_buffering=True) # implicit flush 2689 self.assertEqual(r.getvalue(), b"AB\nC") 2690 t.write("DEF\nG") 2691 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") 2692 t.write("H") 2693 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") 2694 t.reconfigure(line_buffering=False) # implicit flush 2695 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") 2696 t.write("IJ") 2697 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") 2698 2699 # Keeping default value 2700 t.reconfigure() 2701 t.reconfigure(line_buffering=None) 2702 self.assertEqual(t.line_buffering, False) 2703 t.reconfigure(line_buffering=True) 2704 t.reconfigure() 2705 t.reconfigure(line_buffering=None) 2706 self.assertEqual(t.line_buffering, True) 2707 2708 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled") 2709 def test_default_encoding(self): 2710 old_environ = dict(os.environ) 2711 try: 2712 # try to get a user preferred encoding different than the current 2713 # locale encoding to check that TextIOWrapper() uses the current 2714 # locale encoding and not the user preferred encoding 2715 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'): 2716 if key in os.environ: 2717 del os.environ[key] 2718 2719 current_locale_encoding = locale.getpreferredencoding(False) 2720 b = self.BytesIO() 2721 with warnings.catch_warnings(): 2722 warnings.simplefilter("ignore", EncodingWarning) 2723 t = self.TextIOWrapper(b) 2724 self.assertEqual(t.encoding, current_locale_encoding) 2725 finally: 2726 os.environ.clear() 2727 os.environ.update(old_environ) 2728 2729 @support.cpython_only 2730 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled") 2731 def test_device_encoding(self): 2732 # Issue 15989 2733 import _testcapi 2734 b = self.BytesIO() 2735 b.fileno = lambda: _testcapi.INT_MAX + 1 2736 self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale") 2737 b.fileno = lambda: _testcapi.UINT_MAX + 1 2738 self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale") 2739 2740 def test_encoding(self): 2741 # Check the encoding attribute is always set, and valid 2742 b = self.BytesIO() 2743 t = self.TextIOWrapper(b, encoding="utf-8") 2744 self.assertEqual(t.encoding, "utf-8") 2745 with warnings.catch_warnings(): 2746 warnings.simplefilter("ignore", EncodingWarning) 2747 t = self.TextIOWrapper(b) 2748 self.assertIsNotNone(t.encoding) 2749 codecs.lookup(t.encoding) 2750 2751 def test_encoding_errors_reading(self): 2752 # (1) default 2753 b = self.BytesIO(b"abc\n\xff\n") 2754 t = self.TextIOWrapper(b, encoding="ascii") 2755 self.assertRaises(UnicodeError, t.read) 2756 # (2) explicit strict 2757 b = self.BytesIO(b"abc\n\xff\n") 2758 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2759 self.assertRaises(UnicodeError, t.read) 2760 # (3) ignore 2761 b = self.BytesIO(b"abc\n\xff\n") 2762 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") 2763 self.assertEqual(t.read(), "abc\n\n") 2764 # (4) replace 2765 b = self.BytesIO(b"abc\n\xff\n") 2766 t = self.TextIOWrapper(b, encoding="ascii", errors="replace") 2767 self.assertEqual(t.read(), "abc\n\ufffd\n") 2768 2769 def test_encoding_errors_writing(self): 2770 # (1) default 2771 b = self.BytesIO() 2772 t = self.TextIOWrapper(b, encoding="ascii") 2773 self.assertRaises(UnicodeError, t.write, "\xff") 2774 # (2) explicit strict 2775 b = self.BytesIO() 2776 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2777 self.assertRaises(UnicodeError, t.write, "\xff") 2778 # (3) ignore 2779 b = self.BytesIO() 2780 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", 2781 newline="\n") 2782 t.write("abc\xffdef\n") 2783 t.flush() 2784 self.assertEqual(b.getvalue(), b"abcdef\n") 2785 # (4) replace 2786 b = self.BytesIO() 2787 t = self.TextIOWrapper(b, encoding="ascii", errors="replace", 2788 newline="\n") 2789 t.write("abc\xffdef\n") 2790 t.flush() 2791 self.assertEqual(b.getvalue(), b"abc?def\n") 2792 2793 def test_newlines(self): 2794 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] 2795 2796 tests = [ 2797 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], 2798 [ '', input_lines ], 2799 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ], 2800 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], 2801 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], 2802 ] 2803 encodings = ( 2804 'utf-8', 'latin-1', 2805 'utf-16', 'utf-16-le', 'utf-16-be', 2806 'utf-32', 'utf-32-le', 'utf-32-be', 2807 ) 2808 2809 # Try a range of buffer sizes to test the case where \r is the last 2810 # character in TextIOWrapper._pending_line. 2811 for encoding in encodings: 2812 # XXX: str.encode() should return bytes 2813 data = bytes(''.join(input_lines).encode(encoding)) 2814 for do_reads in (False, True): 2815 for bufsize in range(1, 10): 2816 for newline, exp_lines in tests: 2817 bufio = self.BufferedReader(self.BytesIO(data), bufsize) 2818 textio = self.TextIOWrapper(bufio, newline=newline, 2819 encoding=encoding) 2820 if do_reads: 2821 got_lines = [] 2822 while True: 2823 c2 = textio.read(2) 2824 if c2 == '': 2825 break 2826 self.assertEqual(len(c2), 2) 2827 got_lines.append(c2 + textio.readline()) 2828 else: 2829 got_lines = list(textio) 2830 2831 for got_line, exp_line in zip(got_lines, exp_lines): 2832 self.assertEqual(got_line, exp_line) 2833 self.assertEqual(len(got_lines), len(exp_lines)) 2834 2835 def test_newlines_input(self): 2836 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" 2837 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") 2838 for newline, expected in [ 2839 (None, normalized.decode("ascii").splitlines(keepends=True)), 2840 ("", testdata.decode("ascii").splitlines(keepends=True)), 2841 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2842 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2843 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), 2844 ]: 2845 buf = self.BytesIO(testdata) 2846 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2847 self.assertEqual(txt.readlines(), expected) 2848 txt.seek(0) 2849 self.assertEqual(txt.read(), "".join(expected)) 2850 2851 def test_newlines_output(self): 2852 testdict = { 2853 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2854 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2855 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", 2856 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", 2857 } 2858 tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) 2859 for newline, expected in tests: 2860 buf = self.BytesIO() 2861 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2862 txt.write("AAA\nB") 2863 txt.write("BB\nCCC\n") 2864 txt.write("X\rY\r\nZ") 2865 txt.flush() 2866 self.assertEqual(buf.closed, False) 2867 self.assertEqual(buf.getvalue(), expected) 2868 2869 def test_destructor(self): 2870 l = [] 2871 base = self.BytesIO 2872 class MyBytesIO(base): 2873 def close(self): 2874 l.append(self.getvalue()) 2875 base.close(self) 2876 b = MyBytesIO() 2877 t = self.TextIOWrapper(b, encoding="ascii") 2878 t.write("abc") 2879 del t 2880 support.gc_collect() 2881 self.assertEqual([b"abc"], l) 2882 2883 def test_override_destructor(self): 2884 record = [] 2885 class MyTextIO(self.TextIOWrapper): 2886 def __del__(self): 2887 record.append(1) 2888 try: 2889 f = super().__del__ 2890 except AttributeError: 2891 pass 2892 else: 2893 f() 2894 def close(self): 2895 record.append(2) 2896 super().close() 2897 def flush(self): 2898 record.append(3) 2899 super().flush() 2900 b = self.BytesIO() 2901 t = MyTextIO(b, encoding="ascii") 2902 del t 2903 support.gc_collect() 2904 self.assertEqual(record, [1, 2, 3]) 2905 2906 def test_error_through_destructor(self): 2907 # Test that the exception state is not modified by a destructor, 2908 # even if close() fails. 2909 rawio = self.CloseFailureIO() 2910 with support.catch_unraisable_exception() as cm: 2911 with self.assertRaises(AttributeError): 2912 self.TextIOWrapper(rawio, encoding="utf-8").xyzzy 2913 2914 if not IOBASE_EMITS_UNRAISABLE: 2915 self.assertIsNone(cm.unraisable) 2916 elif cm.unraisable is not None: 2917 self.assertEqual(cm.unraisable.exc_type, OSError) 2918 2919 # Systematic tests of the text I/O API 2920 2921 def test_basic_io(self): 2922 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): 2923 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le": 2924 f = self.open(os_helper.TESTFN, "w+", encoding=enc) 2925 f._CHUNK_SIZE = chunksize 2926 self.assertEqual(f.write("abc"), 3) 2927 f.close() 2928 f = self.open(os_helper.TESTFN, "r+", encoding=enc) 2929 f._CHUNK_SIZE = chunksize 2930 self.assertEqual(f.tell(), 0) 2931 self.assertEqual(f.read(), "abc") 2932 cookie = f.tell() 2933 self.assertEqual(f.seek(0), 0) 2934 self.assertEqual(f.read(None), "abc") 2935 f.seek(0) 2936 self.assertEqual(f.read(2), "ab") 2937 self.assertEqual(f.read(1), "c") 2938 self.assertEqual(f.read(1), "") 2939 self.assertEqual(f.read(), "") 2940 self.assertEqual(f.tell(), cookie) 2941 self.assertEqual(f.seek(0), 0) 2942 self.assertEqual(f.seek(0, 2), cookie) 2943 self.assertEqual(f.write("def"), 3) 2944 self.assertEqual(f.seek(cookie), cookie) 2945 self.assertEqual(f.read(), "def") 2946 if enc.startswith("utf"): 2947 self.multi_line_test(f, enc) 2948 f.close() 2949 2950 def multi_line_test(self, f, enc): 2951 f.seek(0) 2952 f.truncate() 2953 sample = "s\xff\u0fff\uffff" 2954 wlines = [] 2955 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): 2956 chars = [] 2957 for i in range(size): 2958 chars.append(sample[i % len(sample)]) 2959 line = "".join(chars) + "\n" 2960 wlines.append((f.tell(), line)) 2961 f.write(line) 2962 f.seek(0) 2963 rlines = [] 2964 while True: 2965 pos = f.tell() 2966 line = f.readline() 2967 if not line: 2968 break 2969 rlines.append((pos, line)) 2970 self.assertEqual(rlines, wlines) 2971 2972 def test_telling(self): 2973 f = self.open(os_helper.TESTFN, "w+", encoding="utf-8") 2974 p0 = f.tell() 2975 f.write("\xff\n") 2976 p1 = f.tell() 2977 f.write("\xff\n") 2978 p2 = f.tell() 2979 f.seek(0) 2980 self.assertEqual(f.tell(), p0) 2981 self.assertEqual(f.readline(), "\xff\n") 2982 self.assertEqual(f.tell(), p1) 2983 self.assertEqual(f.readline(), "\xff\n") 2984 self.assertEqual(f.tell(), p2) 2985 f.seek(0) 2986 for line in f: 2987 self.assertEqual(line, "\xff\n") 2988 self.assertRaises(OSError, f.tell) 2989 self.assertEqual(f.tell(), p2) 2990 f.close() 2991 2992 def test_seeking(self): 2993 chunk_size = _default_chunk_size() 2994 prefix_size = chunk_size - 2 2995 u_prefix = "a" * prefix_size 2996 prefix = bytes(u_prefix.encode("utf-8")) 2997 self.assertEqual(len(u_prefix), len(prefix)) 2998 u_suffix = "\u8888\n" 2999 suffix = bytes(u_suffix.encode("utf-8")) 3000 line = prefix + suffix 3001 with self.open(os_helper.TESTFN, "wb") as f: 3002 f.write(line*2) 3003 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: 3004 s = f.read(prefix_size) 3005 self.assertEqual(s, str(prefix, "ascii")) 3006 self.assertEqual(f.tell(), prefix_size) 3007 self.assertEqual(f.readline(), u_suffix) 3008 3009 def test_seeking_too(self): 3010 # Regression test for a specific bug 3011 data = b'\xe0\xbf\xbf\n' 3012 with self.open(os_helper.TESTFN, "wb") as f: 3013 f.write(data) 3014 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: 3015 f._CHUNK_SIZE # Just test that it exists 3016 f._CHUNK_SIZE = 2 3017 f.readline() 3018 f.tell() 3019 3020 def test_seek_and_tell(self): 3021 #Test seek/tell using the StatefulIncrementalDecoder. 3022 # Make test faster by doing smaller seeks 3023 CHUNK_SIZE = 128 3024 3025 def test_seek_and_tell_with_data(data, min_pos=0): 3026 """Tell/seek to various points within a data stream and ensure 3027 that the decoded data returned by read() is consistent.""" 3028 f = self.open(os_helper.TESTFN, 'wb') 3029 f.write(data) 3030 f.close() 3031 f = self.open(os_helper.TESTFN, encoding='test_decoder') 3032 f._CHUNK_SIZE = CHUNK_SIZE 3033 decoded = f.read() 3034 f.close() 3035 3036 for i in range(min_pos, len(decoded) + 1): # seek positions 3037 for j in [1, 5, len(decoded) - i]: # read lengths 3038 f = self.open(os_helper.TESTFN, encoding='test_decoder') 3039 self.assertEqual(f.read(i), decoded[:i]) 3040 cookie = f.tell() 3041 self.assertEqual(f.read(j), decoded[i:i + j]) 3042 f.seek(cookie) 3043 self.assertEqual(f.read(), decoded[i:]) 3044 f.close() 3045 3046 # Enable the test decoder. 3047 StatefulIncrementalDecoder.codecEnabled = 1 3048 3049 # Run the tests. 3050 try: 3051 # Try each test case. 3052 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 3053 test_seek_and_tell_with_data(input) 3054 3055 # Position each test case so that it crosses a chunk boundary. 3056 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 3057 offset = CHUNK_SIZE - len(input)//2 3058 prefix = b'.'*offset 3059 # Don't bother seeking into the prefix (takes too long). 3060 min_pos = offset*2 3061 test_seek_and_tell_with_data(prefix + input, min_pos) 3062 3063 # Ensure our test decoder won't interfere with subsequent tests. 3064 finally: 3065 StatefulIncrementalDecoder.codecEnabled = 0 3066 3067 def test_multibyte_seek_and_tell(self): 3068 f = self.open(os_helper.TESTFN, "w", encoding="euc_jp") 3069 f.write("AB\n\u3046\u3048\n") 3070 f.close() 3071 3072 f = self.open(os_helper.TESTFN, "r", encoding="euc_jp") 3073 self.assertEqual(f.readline(), "AB\n") 3074 p0 = f.tell() 3075 self.assertEqual(f.readline(), "\u3046\u3048\n") 3076 p1 = f.tell() 3077 f.seek(p0) 3078 self.assertEqual(f.readline(), "\u3046\u3048\n") 3079 self.assertEqual(f.tell(), p1) 3080 f.close() 3081 3082 def test_seek_with_encoder_state(self): 3083 f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004") 3084 f.write("\u00e6\u0300") 3085 p0 = f.tell() 3086 f.write("\u00e6") 3087 f.seek(p0) 3088 f.write("\u0300") 3089 f.close() 3090 3091 f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004") 3092 self.assertEqual(f.readline(), "\u00e6\u0300\u0300") 3093 f.close() 3094 3095 def test_encoded_writes(self): 3096 data = "1234567890" 3097 tests = ("utf-16", 3098 "utf-16-le", 3099 "utf-16-be", 3100 "utf-32", 3101 "utf-32-le", 3102 "utf-32-be") 3103 for encoding in tests: 3104 buf = self.BytesIO() 3105 f = self.TextIOWrapper(buf, encoding=encoding) 3106 # Check if the BOM is written only once (see issue1753). 3107 f.write(data) 3108 f.write(data) 3109 f.seek(0) 3110 self.assertEqual(f.read(), data * 2) 3111 f.seek(0) 3112 self.assertEqual(f.read(), data * 2) 3113 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding)) 3114 3115 def test_unreadable(self): 3116 class UnReadable(self.BytesIO): 3117 def readable(self): 3118 return False 3119 txt = self.TextIOWrapper(UnReadable(), encoding="utf-8") 3120 self.assertRaises(OSError, txt.read) 3121 3122 def test_read_one_by_one(self): 3123 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8") 3124 reads = "" 3125 while True: 3126 c = txt.read(1) 3127 if not c: 3128 break 3129 reads += c 3130 self.assertEqual(reads, "AA\nBB") 3131 3132 def test_readlines(self): 3133 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8") 3134 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) 3135 txt.seek(0) 3136 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) 3137 txt.seek(0) 3138 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"]) 3139 3140 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. 3141 def test_read_by_chunk(self): 3142 # make sure "\r\n" straddles 128 char boundary. 3143 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8") 3144 reads = "" 3145 while True: 3146 c = txt.read(128) 3147 if not c: 3148 break 3149 reads += c 3150 self.assertEqual(reads, "A"*127+"\nB") 3151 3152 def test_writelines(self): 3153 l = ['ab', 'cd', 'ef'] 3154 buf = self.BytesIO() 3155 txt = self.TextIOWrapper(buf, encoding="utf-8") 3156 txt.writelines(l) 3157 txt.flush() 3158 self.assertEqual(buf.getvalue(), b'abcdef') 3159 3160 def test_writelines_userlist(self): 3161 l = UserList(['ab', 'cd', 'ef']) 3162 buf = self.BytesIO() 3163 txt = self.TextIOWrapper(buf, encoding="utf-8") 3164 txt.writelines(l) 3165 txt.flush() 3166 self.assertEqual(buf.getvalue(), b'abcdef') 3167 3168 def test_writelines_error(self): 3169 txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8") 3170 self.assertRaises(TypeError, txt.writelines, [1, 2, 3]) 3171 self.assertRaises(TypeError, txt.writelines, None) 3172 self.assertRaises(TypeError, txt.writelines, b'abc') 3173 3174 def test_issue1395_1(self): 3175 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3176 3177 # read one char at a time 3178 reads = "" 3179 while True: 3180 c = txt.read(1) 3181 if not c: 3182 break 3183 reads += c 3184 self.assertEqual(reads, self.normalized) 3185 3186 def test_issue1395_2(self): 3187 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3188 txt._CHUNK_SIZE = 4 3189 3190 reads = "" 3191 while True: 3192 c = txt.read(4) 3193 if not c: 3194 break 3195 reads += c 3196 self.assertEqual(reads, self.normalized) 3197 3198 def test_issue1395_3(self): 3199 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3200 txt._CHUNK_SIZE = 4 3201 3202 reads = txt.read(4) 3203 reads += txt.read(4) 3204 reads += txt.readline() 3205 reads += txt.readline() 3206 reads += txt.readline() 3207 self.assertEqual(reads, self.normalized) 3208 3209 def test_issue1395_4(self): 3210 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3211 txt._CHUNK_SIZE = 4 3212 3213 reads = txt.read(4) 3214 reads += txt.read() 3215 self.assertEqual(reads, self.normalized) 3216 3217 def test_issue1395_5(self): 3218 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3219 txt._CHUNK_SIZE = 4 3220 3221 reads = txt.read(4) 3222 pos = txt.tell() 3223 txt.seek(0) 3224 txt.seek(pos) 3225 self.assertEqual(txt.read(4), "BBB\n") 3226 3227 def test_issue2282(self): 3228 buffer = self.BytesIO(self.testdata) 3229 txt = self.TextIOWrapper(buffer, encoding="ascii") 3230 3231 self.assertEqual(buffer.seekable(), txt.seekable()) 3232 3233 def test_append_bom(self): 3234 # The BOM is not written again when appending to a non-empty file 3235 filename = os_helper.TESTFN 3236 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3237 with self.open(filename, 'w', encoding=charset) as f: 3238 f.write('aaa') 3239 pos = f.tell() 3240 with self.open(filename, 'rb') as f: 3241 self.assertEqual(f.read(), 'aaa'.encode(charset)) 3242 3243 with self.open(filename, 'a', encoding=charset) as f: 3244 f.write('xxx') 3245 with self.open(filename, 'rb') as f: 3246 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 3247 3248 def test_seek_bom(self): 3249 # Same test, but when seeking manually 3250 filename = os_helper.TESTFN 3251 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3252 with self.open(filename, 'w', encoding=charset) as f: 3253 f.write('aaa') 3254 pos = f.tell() 3255 with self.open(filename, 'r+', encoding=charset) as f: 3256 f.seek(pos) 3257 f.write('zzz') 3258 f.seek(0) 3259 f.write('bbb') 3260 with self.open(filename, 'rb') as f: 3261 self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) 3262 3263 def test_seek_append_bom(self): 3264 # Same test, but first seek to the start and then to the end 3265 filename = os_helper.TESTFN 3266 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3267 with self.open(filename, 'w', encoding=charset) as f: 3268 f.write('aaa') 3269 with self.open(filename, 'a', encoding=charset) as f: 3270 f.seek(0) 3271 f.seek(0, self.SEEK_END) 3272 f.write('xxx') 3273 with self.open(filename, 'rb') as f: 3274 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 3275 3276 def test_errors_property(self): 3277 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: 3278 self.assertEqual(f.errors, "strict") 3279 with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f: 3280 self.assertEqual(f.errors, "replace") 3281 3282 @support.no_tracing 3283 def test_threads_write(self): 3284 # Issue6750: concurrent writes could duplicate data 3285 event = threading.Event() 3286 with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f: 3287 def run(n): 3288 text = "Thread%03d\n" % n 3289 event.wait() 3290 f.write(text) 3291 threads = [threading.Thread(target=run, args=(x,)) 3292 for x in range(20)] 3293 with threading_helper.start_threads(threads, event.set): 3294 time.sleep(0.02) 3295 with self.open(os_helper.TESTFN, encoding="utf-8") as f: 3296 content = f.read() 3297 for n in range(20): 3298 self.assertEqual(content.count("Thread%03d\n" % n), 1) 3299 3300 def test_flush_error_on_close(self): 3301 # Test that text file is closed despite failed flush 3302 # and that flush() is called before file closed. 3303 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3304 closed = [] 3305 def bad_flush(): 3306 closed[:] = [txt.closed, txt.buffer.closed] 3307 raise OSError() 3308 txt.flush = bad_flush 3309 self.assertRaises(OSError, txt.close) # exception not swallowed 3310 self.assertTrue(txt.closed) 3311 self.assertTrue(txt.buffer.closed) 3312 self.assertTrue(closed) # flush() called 3313 self.assertFalse(closed[0]) # flush() called before file closed 3314 self.assertFalse(closed[1]) 3315 txt.flush = lambda: None # break reference loop 3316 3317 def test_close_error_on_close(self): 3318 buffer = self.BytesIO(self.testdata) 3319 def bad_flush(): 3320 raise OSError('flush') 3321 def bad_close(): 3322 raise OSError('close') 3323 buffer.close = bad_close 3324 txt = self.TextIOWrapper(buffer, encoding="ascii") 3325 txt.flush = bad_flush 3326 with self.assertRaises(OSError) as err: # exception not swallowed 3327 txt.close() 3328 self.assertEqual(err.exception.args, ('close',)) 3329 self.assertIsInstance(err.exception.__context__, OSError) 3330 self.assertEqual(err.exception.__context__.args, ('flush',)) 3331 self.assertFalse(txt.closed) 3332 3333 # Silence destructor error 3334 buffer.close = lambda: None 3335 txt.flush = lambda: None 3336 3337 def test_nonnormalized_close_error_on_close(self): 3338 # Issue #21677 3339 buffer = self.BytesIO(self.testdata) 3340 def bad_flush(): 3341 raise non_existing_flush 3342 def bad_close(): 3343 raise non_existing_close 3344 buffer.close = bad_close 3345 txt = self.TextIOWrapper(buffer, encoding="ascii") 3346 txt.flush = bad_flush 3347 with self.assertRaises(NameError) as err: # exception not swallowed 3348 txt.close() 3349 self.assertIn('non_existing_close', str(err.exception)) 3350 self.assertIsInstance(err.exception.__context__, NameError) 3351 self.assertIn('non_existing_flush', str(err.exception.__context__)) 3352 self.assertFalse(txt.closed) 3353 3354 # Silence destructor error 3355 buffer.close = lambda: None 3356 txt.flush = lambda: None 3357 3358 def test_multi_close(self): 3359 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3360 txt.close() 3361 txt.close() 3362 txt.close() 3363 self.assertRaises(ValueError, txt.flush) 3364 3365 def test_unseekable(self): 3366 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8") 3367 self.assertRaises(self.UnsupportedOperation, txt.tell) 3368 self.assertRaises(self.UnsupportedOperation, txt.seek, 0) 3369 3370 def test_readonly_attributes(self): 3371 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3372 buf = self.BytesIO(self.testdata) 3373 with self.assertRaises(AttributeError): 3374 txt.buffer = buf 3375 3376 def test_rawio(self): 3377 # Issue #12591: TextIOWrapper must work with raw I/O objects, so 3378 # that subprocess.Popen() can have the required unbuffered 3379 # semantics with universal_newlines=True. 3380 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) 3381 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3382 # Reads 3383 self.assertEqual(txt.read(4), 'abcd') 3384 self.assertEqual(txt.readline(), 'efghi\n') 3385 self.assertEqual(list(txt), ['jkl\n', 'opq\n']) 3386 3387 def test_rawio_write_through(self): 3388 # Issue #12591: with write_through=True, writes don't need a flush 3389 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) 3390 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n', 3391 write_through=True) 3392 txt.write('1') 3393 txt.write('23\n4') 3394 txt.write('5') 3395 self.assertEqual(b''.join(raw._write_stack), b'123\n45') 3396 3397 def test_bufio_write_through(self): 3398 # Issue #21396: write_through=True doesn't force a flush() 3399 # on the underlying binary buffered object. 3400 flush_called, write_called = [], [] 3401 class BufferedWriter(self.BufferedWriter): 3402 def flush(self, *args, **kwargs): 3403 flush_called.append(True) 3404 return super().flush(*args, **kwargs) 3405 def write(self, *args, **kwargs): 3406 write_called.append(True) 3407 return super().write(*args, **kwargs) 3408 3409 rawio = self.BytesIO() 3410 data = b"a" 3411 bufio = BufferedWriter(rawio, len(data)*2) 3412 textio = self.TextIOWrapper(bufio, encoding='ascii', 3413 write_through=True) 3414 # write to the buffered io but don't overflow the buffer 3415 text = data.decode('ascii') 3416 textio.write(text) 3417 3418 # buffer.flush is not called with write_through=True 3419 self.assertFalse(flush_called) 3420 # buffer.write *is* called with write_through=True 3421 self.assertTrue(write_called) 3422 self.assertEqual(rawio.getvalue(), b"") # no flush 3423 3424 write_called = [] # reset 3425 textio.write(text * 10) # total content is larger than bufio buffer 3426 self.assertTrue(write_called) 3427 self.assertEqual(rawio.getvalue(), data * 11) # all flushed 3428 3429 def test_reconfigure_write_through(self): 3430 raw = self.MockRawIO([]) 3431 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3432 t.write('1') 3433 t.reconfigure(write_through=True) # implied flush 3434 self.assertEqual(t.write_through, True) 3435 self.assertEqual(b''.join(raw._write_stack), b'1') 3436 t.write('23') 3437 self.assertEqual(b''.join(raw._write_stack), b'123') 3438 t.reconfigure(write_through=False) 3439 self.assertEqual(t.write_through, False) 3440 t.write('45') 3441 t.flush() 3442 self.assertEqual(b''.join(raw._write_stack), b'12345') 3443 # Keeping default value 3444 t.reconfigure() 3445 t.reconfigure(write_through=None) 3446 self.assertEqual(t.write_through, False) 3447 t.reconfigure(write_through=True) 3448 t.reconfigure() 3449 t.reconfigure(write_through=None) 3450 self.assertEqual(t.write_through, True) 3451 3452 def test_read_nonbytes(self): 3453 # Issue #17106 3454 # Crash when underlying read() returns non-bytes 3455 t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") 3456 self.assertRaises(TypeError, t.read, 1) 3457 t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") 3458 self.assertRaises(TypeError, t.readline) 3459 t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") 3460 self.assertRaises(TypeError, t.read) 3461 3462 def test_illegal_encoder(self): 3463 # Issue 31271: Calling write() while the return value of encoder's 3464 # encode() is invalid shouldn't cause an assertion failure. 3465 rot13 = codecs.lookup("rot13") 3466 with support.swap_attr(rot13, '_is_text_encoding', True): 3467 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13") 3468 self.assertRaises(TypeError, t.write, 'bar') 3469 3470 def test_illegal_decoder(self): 3471 # Issue #17106 3472 # Bypass the early encoding check added in issue 20404 3473 def _make_illegal_wrapper(): 3474 quopri = codecs.lookup("quopri") 3475 quopri._is_text_encoding = True 3476 try: 3477 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), 3478 newline='\n', encoding="quopri") 3479 finally: 3480 quopri._is_text_encoding = False 3481 return t 3482 # Crash when decoder returns non-string 3483 t = _make_illegal_wrapper() 3484 self.assertRaises(TypeError, t.read, 1) 3485 t = _make_illegal_wrapper() 3486 self.assertRaises(TypeError, t.readline) 3487 t = _make_illegal_wrapper() 3488 self.assertRaises(TypeError, t.read) 3489 3490 # Issue 31243: calling read() while the return value of decoder's 3491 # getstate() is invalid should neither crash the interpreter nor 3492 # raise a SystemError. 3493 def _make_very_illegal_wrapper(getstate_ret_val): 3494 class BadDecoder: 3495 def getstate(self): 3496 return getstate_ret_val 3497 def _get_bad_decoder(dummy): 3498 return BadDecoder() 3499 quopri = codecs.lookup("quopri") 3500 with support.swap_attr(quopri, 'incrementaldecoder', 3501 _get_bad_decoder): 3502 return _make_illegal_wrapper() 3503 t = _make_very_illegal_wrapper(42) 3504 self.assertRaises(TypeError, t.read, 42) 3505 t = _make_very_illegal_wrapper(()) 3506 self.assertRaises(TypeError, t.read, 42) 3507 t = _make_very_illegal_wrapper((1, 2)) 3508 self.assertRaises(TypeError, t.read, 42) 3509 3510 def _check_create_at_shutdown(self, **kwargs): 3511 # Issue #20037: creating a TextIOWrapper at shutdown 3512 # shouldn't crash the interpreter. 3513 iomod = self.io.__name__ 3514 code = """if 1: 3515 import codecs 3516 import {iomod} as io 3517 3518 # Avoid looking up codecs at shutdown 3519 codecs.lookup('utf-8') 3520 3521 class C: 3522 def __init__(self): 3523 self.buf = io.BytesIO() 3524 def __del__(self): 3525 io.TextIOWrapper(self.buf, **{kwargs}) 3526 print("ok") 3527 c = C() 3528 """.format(iomod=iomod, kwargs=kwargs) 3529 return assert_python_ok("-c", code) 3530 3531 def test_create_at_shutdown_without_encoding(self): 3532 rc, out, err = self._check_create_at_shutdown() 3533 if err: 3534 # Can error out with a RuntimeError if the module state 3535 # isn't found. 3536 self.assertIn(self.shutdown_error, err.decode()) 3537 else: 3538 self.assertEqual("ok", out.decode().strip()) 3539 3540 def test_create_at_shutdown_with_encoding(self): 3541 rc, out, err = self._check_create_at_shutdown(encoding='utf-8', 3542 errors='strict') 3543 self.assertFalse(err) 3544 self.assertEqual("ok", out.decode().strip()) 3545 3546 def test_read_byteslike(self): 3547 r = MemviewBytesIO(b'Just some random string\n') 3548 t = self.TextIOWrapper(r, 'utf-8') 3549 3550 # TextIOwrapper will not read the full string, because 3551 # we truncate it to a multiple of the native int size 3552 # so that we can construct a more complex memoryview. 3553 bytes_val = _to_memoryview(r.getvalue()).tobytes() 3554 3555 self.assertEqual(t.read(200), bytes_val.decode('utf-8')) 3556 3557 def test_issue22849(self): 3558 class F(object): 3559 def readable(self): return True 3560 def writable(self): return True 3561 def seekable(self): return True 3562 3563 for i in range(10): 3564 try: 3565 self.TextIOWrapper(F(), encoding='utf-8') 3566 except Exception: 3567 pass 3568 3569 F.tell = lambda x: 0 3570 t = self.TextIOWrapper(F(), encoding='utf-8') 3571 3572 def test_reconfigure_encoding_read(self): 3573 # latin1 -> utf8 3574 # (latin1 can decode utf-8 encoded string) 3575 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8') 3576 raw = self.BytesIO(data) 3577 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') 3578 self.assertEqual(txt.readline(), 'abc\xe9\n') 3579 with self.assertRaises(self.UnsupportedOperation): 3580 txt.reconfigure(encoding='utf-8') 3581 with self.assertRaises(self.UnsupportedOperation): 3582 txt.reconfigure(newline=None) 3583 3584 def test_reconfigure_write_fromascii(self): 3585 # ascii has a specific encodefunc in the C implementation, 3586 # but utf-8-sig has not. Make sure that we get rid of the 3587 # cached encodefunc when we switch encoders. 3588 raw = self.BytesIO() 3589 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3590 txt.write('foo\n') 3591 txt.reconfigure(encoding='utf-8-sig') 3592 txt.write('\xe9\n') 3593 txt.flush() 3594 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n') 3595 3596 def test_reconfigure_write(self): 3597 # latin -> utf8 3598 raw = self.BytesIO() 3599 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') 3600 txt.write('abc\xe9\n') 3601 txt.reconfigure(encoding='utf-8') 3602 self.assertEqual(raw.getvalue(), b'abc\xe9\n') 3603 txt.write('d\xe9f\n') 3604 txt.flush() 3605 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n') 3606 3607 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of 3608 # the file 3609 raw = self.BytesIO() 3610 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3611 txt.write('abc\n') 3612 txt.reconfigure(encoding='utf-8-sig') 3613 txt.write('d\xe9f\n') 3614 txt.flush() 3615 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n') 3616 3617 def test_reconfigure_write_non_seekable(self): 3618 raw = self.BytesIO() 3619 raw.seekable = lambda: False 3620 raw.seek = None 3621 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3622 txt.write('abc\n') 3623 txt.reconfigure(encoding='utf-8-sig') 3624 txt.write('d\xe9f\n') 3625 txt.flush() 3626 3627 # If the raw stream is not seekable, there'll be a BOM 3628 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n') 3629 3630 def test_reconfigure_defaults(self): 3631 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n') 3632 txt.reconfigure(encoding=None) 3633 self.assertEqual(txt.encoding, 'ascii') 3634 self.assertEqual(txt.errors, 'replace') 3635 txt.write('LF\n') 3636 3637 txt.reconfigure(newline='\r\n') 3638 self.assertEqual(txt.encoding, 'ascii') 3639 self.assertEqual(txt.errors, 'replace') 3640 3641 txt.reconfigure(errors='ignore') 3642 self.assertEqual(txt.encoding, 'ascii') 3643 self.assertEqual(txt.errors, 'ignore') 3644 txt.write('CRLF\n') 3645 3646 txt.reconfigure(encoding='utf-8', newline=None) 3647 self.assertEqual(txt.errors, 'strict') 3648 txt.seek(0) 3649 self.assertEqual(txt.read(), 'LF\nCRLF\n') 3650 3651 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n') 3652 3653 def test_reconfigure_newline(self): 3654 raw = self.BytesIO(b'CR\rEOF') 3655 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3656 txt.reconfigure(newline=None) 3657 self.assertEqual(txt.readline(), 'CR\n') 3658 raw = self.BytesIO(b'CR\rEOF') 3659 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3660 txt.reconfigure(newline='') 3661 self.assertEqual(txt.readline(), 'CR\r') 3662 raw = self.BytesIO(b'CR\rLF\nEOF') 3663 txt = self.TextIOWrapper(raw, 'ascii', newline='\r') 3664 txt.reconfigure(newline='\n') 3665 self.assertEqual(txt.readline(), 'CR\rLF\n') 3666 raw = self.BytesIO(b'LF\nCR\rEOF') 3667 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3668 txt.reconfigure(newline='\r') 3669 self.assertEqual(txt.readline(), 'LF\nCR\r') 3670 raw = self.BytesIO(b'CR\rCRLF\r\nEOF') 3671 txt = self.TextIOWrapper(raw, 'ascii', newline='\r') 3672 txt.reconfigure(newline='\r\n') 3673 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n') 3674 3675 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r') 3676 txt.reconfigure(newline=None) 3677 txt.write('linesep\n') 3678 txt.reconfigure(newline='') 3679 txt.write('LF\n') 3680 txt.reconfigure(newline='\n') 3681 txt.write('LF\n') 3682 txt.reconfigure(newline='\r') 3683 txt.write('CR\n') 3684 txt.reconfigure(newline='\r\n') 3685 txt.write('CRLF\n') 3686 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n' 3687 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected) 3688 3689 def test_issue25862(self): 3690 # Assertion failures occurred in tell() after read() and write(). 3691 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') 3692 t.read(1) 3693 t.read() 3694 t.tell() 3695 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') 3696 t.read(1) 3697 t.write('x') 3698 t.tell() 3699 3700 3701class MemviewBytesIO(io.BytesIO): 3702 '''A BytesIO object whose read method returns memoryviews 3703 rather than bytes''' 3704 3705 def read1(self, len_): 3706 return _to_memoryview(super().read1(len_)) 3707 3708 def read(self, len_): 3709 return _to_memoryview(super().read(len_)) 3710 3711def _to_memoryview(buf): 3712 '''Convert bytes-object *buf* to a non-trivial memoryview''' 3713 3714 arr = array.array('i') 3715 idx = len(buf) - len(buf) % arr.itemsize 3716 arr.frombytes(buf[:idx]) 3717 return memoryview(arr) 3718 3719 3720class CTextIOWrapperTest(TextIOWrapperTest): 3721 io = io 3722 shutdown_error = "LookupError: unknown encoding: ascii" 3723 3724 def test_initialization(self): 3725 r = self.BytesIO(b"\xc3\xa9\n\n") 3726 b = self.BufferedReader(r, 1000) 3727 t = self.TextIOWrapper(b, encoding="utf-8") 3728 self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy') 3729 self.assertRaises(ValueError, t.read) 3730 3731 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 3732 self.assertRaises(Exception, repr, t) 3733 3734 def test_garbage_collection(self): 3735 # C TextIOWrapper objects are collected, and collecting them flushes 3736 # all data to disk. 3737 # The Python version has __del__, so it ends in gc.garbage instead. 3738 with warnings_helper.check_warnings(('', ResourceWarning)): 3739 rawio = io.FileIO(os_helper.TESTFN, "wb") 3740 b = self.BufferedWriter(rawio) 3741 t = self.TextIOWrapper(b, encoding="ascii") 3742 t.write("456def") 3743 t.x = t 3744 wr = weakref.ref(t) 3745 del t 3746 support.gc_collect() 3747 self.assertIsNone(wr(), wr) 3748 with self.open(os_helper.TESTFN, "rb") as f: 3749 self.assertEqual(f.read(), b"456def") 3750 3751 def test_rwpair_cleared_before_textio(self): 3752 # Issue 13070: TextIOWrapper's finalization would crash when called 3753 # after the reference to the underlying BufferedRWPair's writer got 3754 # cleared by the GC. 3755 for i in range(1000): 3756 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 3757 t1 = self.TextIOWrapper(b1, encoding="ascii") 3758 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 3759 t2 = self.TextIOWrapper(b2, encoding="ascii") 3760 # circular references 3761 t1.buddy = t2 3762 t2.buddy = t1 3763 support.gc_collect() 3764 3765 def test_del__CHUNK_SIZE_SystemError(self): 3766 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii') 3767 with self.assertRaises(AttributeError): 3768 del t._CHUNK_SIZE 3769 3770 def test_internal_buffer_size(self): 3771 # bpo-43260: TextIOWrapper's internal buffer should not store 3772 # data larger than chunk size. 3773 chunk_size = 8192 # default chunk size, updated later 3774 3775 class MockIO(self.MockRawIO): 3776 def write(self, data): 3777 if len(data) > chunk_size: 3778 raise RuntimeError 3779 return super().write(data) 3780 3781 buf = MockIO() 3782 t = self.TextIOWrapper(buf, encoding="ascii") 3783 chunk_size = t._CHUNK_SIZE 3784 t.write("abc") 3785 t.write("def") 3786 # default chunk size is 8192 bytes so t don't write data to buf. 3787 self.assertEqual([], buf._write_stack) 3788 3789 with self.assertRaises(RuntimeError): 3790 t.write("x"*(chunk_size+1)) 3791 3792 self.assertEqual([b"abcdef"], buf._write_stack) 3793 t.write("ghi") 3794 t.write("x"*chunk_size) 3795 self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack) 3796 3797 3798class PyTextIOWrapperTest(TextIOWrapperTest): 3799 io = pyio 3800 shutdown_error = "LookupError: unknown encoding: ascii" 3801 3802 3803class IncrementalNewlineDecoderTest(unittest.TestCase): 3804 3805 def check_newline_decoding_utf8(self, decoder): 3806 # UTF-8 specific tests for a newline decoder 3807 def _check_decode(b, s, **kwargs): 3808 # We exercise getstate() / setstate() as well as decode() 3809 state = decoder.getstate() 3810 self.assertEqual(decoder.decode(b, **kwargs), s) 3811 decoder.setstate(state) 3812 self.assertEqual(decoder.decode(b, **kwargs), s) 3813 3814 _check_decode(b'\xe8\xa2\x88', "\u8888") 3815 3816 _check_decode(b'\xe8', "") 3817 _check_decode(b'\xa2', "") 3818 _check_decode(b'\x88', "\u8888") 3819 3820 _check_decode(b'\xe8', "") 3821 _check_decode(b'\xa2', "") 3822 _check_decode(b'\x88', "\u8888") 3823 3824 _check_decode(b'\xe8', "") 3825 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True) 3826 3827 decoder.reset() 3828 _check_decode(b'\n', "\n") 3829 _check_decode(b'\r', "") 3830 _check_decode(b'', "\n", final=True) 3831 _check_decode(b'\r', "\n", final=True) 3832 3833 _check_decode(b'\r', "") 3834 _check_decode(b'a', "\na") 3835 3836 _check_decode(b'\r\r\n', "\n\n") 3837 _check_decode(b'\r', "") 3838 _check_decode(b'\r', "\n") 3839 _check_decode(b'\na', "\na") 3840 3841 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n") 3842 _check_decode(b'\xe8\xa2\x88', "\u8888") 3843 _check_decode(b'\n', "\n") 3844 _check_decode(b'\xe8\xa2\x88\r', "\u8888") 3845 _check_decode(b'\n', "\n") 3846 3847 def check_newline_decoding(self, decoder, encoding): 3848 result = [] 3849 if encoding is not None: 3850 encoder = codecs.getincrementalencoder(encoding)() 3851 def _decode_bytewise(s): 3852 # Decode one byte at a time 3853 for b in encoder.encode(s): 3854 result.append(decoder.decode(bytes([b]))) 3855 else: 3856 encoder = None 3857 def _decode_bytewise(s): 3858 # Decode one char at a time 3859 for c in s: 3860 result.append(decoder.decode(c)) 3861 self.assertEqual(decoder.newlines, None) 3862 _decode_bytewise("abc\n\r") 3863 self.assertEqual(decoder.newlines, '\n') 3864 _decode_bytewise("\nabc") 3865 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 3866 _decode_bytewise("abc\r") 3867 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 3868 _decode_bytewise("abc") 3869 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) 3870 _decode_bytewise("abc\r") 3871 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") 3872 decoder.reset() 3873 input = "abc" 3874 if encoder is not None: 3875 encoder.reset() 3876 input = encoder.encode(input) 3877 self.assertEqual(decoder.decode(input), "abc") 3878 self.assertEqual(decoder.newlines, None) 3879 3880 def test_newline_decoder(self): 3881 encodings = ( 3882 # None meaning the IncrementalNewlineDecoder takes unicode input 3883 # rather than bytes input 3884 None, 'utf-8', 'latin-1', 3885 'utf-16', 'utf-16-le', 'utf-16-be', 3886 'utf-32', 'utf-32-le', 'utf-32-be', 3887 ) 3888 for enc in encodings: 3889 decoder = enc and codecs.getincrementaldecoder(enc)() 3890 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 3891 self.check_newline_decoding(decoder, enc) 3892 decoder = codecs.getincrementaldecoder("utf-8")() 3893 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 3894 self.check_newline_decoding_utf8(decoder) 3895 self.assertRaises(TypeError, decoder.setstate, 42) 3896 3897 def test_newline_bytes(self): 3898 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder 3899 def _check(dec): 3900 self.assertEqual(dec.newlines, None) 3901 self.assertEqual(dec.decode("\u0D00"), "\u0D00") 3902 self.assertEqual(dec.newlines, None) 3903 self.assertEqual(dec.decode("\u0A00"), "\u0A00") 3904 self.assertEqual(dec.newlines, None) 3905 dec = self.IncrementalNewlineDecoder(None, translate=False) 3906 _check(dec) 3907 dec = self.IncrementalNewlineDecoder(None, translate=True) 3908 _check(dec) 3909 3910 def test_translate(self): 3911 # issue 35062 3912 for translate in (-2, -1, 1, 2): 3913 decoder = codecs.getincrementaldecoder("utf-8")() 3914 decoder = self.IncrementalNewlineDecoder(decoder, translate) 3915 self.check_newline_decoding_utf8(decoder) 3916 decoder = codecs.getincrementaldecoder("utf-8")() 3917 decoder = self.IncrementalNewlineDecoder(decoder, translate=0) 3918 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n") 3919 3920class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 3921 pass 3922 3923class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 3924 pass 3925 3926 3927# XXX Tests for open() 3928 3929class MiscIOTest(unittest.TestCase): 3930 3931 def tearDown(self): 3932 os_helper.unlink(os_helper.TESTFN) 3933 3934 def test___all__(self): 3935 for name in self.io.__all__: 3936 obj = getattr(self.io, name, None) 3937 self.assertIsNotNone(obj, name) 3938 if name in ("open", "open_code"): 3939 continue 3940 elif "error" in name.lower() or name == "UnsupportedOperation": 3941 self.assertTrue(issubclass(obj, Exception), name) 3942 elif not name.startswith("SEEK_"): 3943 self.assertTrue(issubclass(obj, self.IOBase)) 3944 3945 def test_attributes(self): 3946 f = self.open(os_helper.TESTFN, "wb", buffering=0) 3947 self.assertEqual(f.mode, "wb") 3948 f.close() 3949 3950 with warnings_helper.check_warnings(('', DeprecationWarning)): 3951 f = self.open(os_helper.TESTFN, "U", encoding="utf-8") 3952 self.assertEqual(f.name, os_helper.TESTFN) 3953 self.assertEqual(f.buffer.name, os_helper.TESTFN) 3954 self.assertEqual(f.buffer.raw.name, os_helper.TESTFN) 3955 self.assertEqual(f.mode, "U") 3956 self.assertEqual(f.buffer.mode, "rb") 3957 self.assertEqual(f.buffer.raw.mode, "rb") 3958 f.close() 3959 3960 f = self.open(os_helper.TESTFN, "w+", encoding="utf-8") 3961 self.assertEqual(f.mode, "w+") 3962 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter? 3963 self.assertEqual(f.buffer.raw.mode, "rb+") 3964 3965 g = self.open(f.fileno(), "wb", closefd=False) 3966 self.assertEqual(g.mode, "wb") 3967 self.assertEqual(g.raw.mode, "wb") 3968 self.assertEqual(g.name, f.fileno()) 3969 self.assertEqual(g.raw.name, f.fileno()) 3970 f.close() 3971 g.close() 3972 3973 def test_open_pipe_with_append(self): 3974 # bpo-27805: Ignore ESPIPE from lseek() in open(). 3975 r, w = os.pipe() 3976 self.addCleanup(os.close, r) 3977 f = self.open(w, 'a', encoding="utf-8") 3978 self.addCleanup(f.close) 3979 # Check that the file is marked non-seekable. On Windows, however, lseek 3980 # somehow succeeds on pipes. 3981 if sys.platform != 'win32': 3982 self.assertFalse(f.seekable()) 3983 3984 def test_io_after_close(self): 3985 for kwargs in [ 3986 {"mode": "w"}, 3987 {"mode": "wb"}, 3988 {"mode": "w", "buffering": 1}, 3989 {"mode": "w", "buffering": 2}, 3990 {"mode": "wb", "buffering": 0}, 3991 {"mode": "r"}, 3992 {"mode": "rb"}, 3993 {"mode": "r", "buffering": 1}, 3994 {"mode": "r", "buffering": 2}, 3995 {"mode": "rb", "buffering": 0}, 3996 {"mode": "w+"}, 3997 {"mode": "w+b"}, 3998 {"mode": "w+", "buffering": 1}, 3999 {"mode": "w+", "buffering": 2}, 4000 {"mode": "w+b", "buffering": 0}, 4001 ]: 4002 if "b" not in kwargs["mode"]: 4003 kwargs["encoding"] = "utf-8" 4004 f = self.open(os_helper.TESTFN, **kwargs) 4005 f.close() 4006 self.assertRaises(ValueError, f.flush) 4007 self.assertRaises(ValueError, f.fileno) 4008 self.assertRaises(ValueError, f.isatty) 4009 self.assertRaises(ValueError, f.__iter__) 4010 if hasattr(f, "peek"): 4011 self.assertRaises(ValueError, f.peek, 1) 4012 self.assertRaises(ValueError, f.read) 4013 if hasattr(f, "read1"): 4014 self.assertRaises(ValueError, f.read1, 1024) 4015 self.assertRaises(ValueError, f.read1) 4016 if hasattr(f, "readall"): 4017 self.assertRaises(ValueError, f.readall) 4018 if hasattr(f, "readinto"): 4019 self.assertRaises(ValueError, f.readinto, bytearray(1024)) 4020 if hasattr(f, "readinto1"): 4021 self.assertRaises(ValueError, f.readinto1, bytearray(1024)) 4022 self.assertRaises(ValueError, f.readline) 4023 self.assertRaises(ValueError, f.readlines) 4024 self.assertRaises(ValueError, f.readlines, 1) 4025 self.assertRaises(ValueError, f.seek, 0) 4026 self.assertRaises(ValueError, f.tell) 4027 self.assertRaises(ValueError, f.truncate) 4028 self.assertRaises(ValueError, f.write, 4029 b"" if "b" in kwargs['mode'] else "") 4030 self.assertRaises(ValueError, f.writelines, []) 4031 self.assertRaises(ValueError, next, f) 4032 4033 def test_blockingioerror(self): 4034 # Various BlockingIOError issues 4035 class C(str): 4036 pass 4037 c = C("") 4038 b = self.BlockingIOError(1, c) 4039 c.b = b 4040 b.c = c 4041 wr = weakref.ref(c) 4042 del c, b 4043 support.gc_collect() 4044 self.assertIsNone(wr(), wr) 4045 4046 def test_abcs(self): 4047 # Test the visible base classes are ABCs. 4048 self.assertIsInstance(self.IOBase, abc.ABCMeta) 4049 self.assertIsInstance(self.RawIOBase, abc.ABCMeta) 4050 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta) 4051 self.assertIsInstance(self.TextIOBase, abc.ABCMeta) 4052 4053 def _check_abc_inheritance(self, abcmodule): 4054 with self.open(os_helper.TESTFN, "wb", buffering=0) as f: 4055 self.assertIsInstance(f, abcmodule.IOBase) 4056 self.assertIsInstance(f, abcmodule.RawIOBase) 4057 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 4058 self.assertNotIsInstance(f, abcmodule.TextIOBase) 4059 with self.open(os_helper.TESTFN, "wb") as f: 4060 self.assertIsInstance(f, abcmodule.IOBase) 4061 self.assertNotIsInstance(f, abcmodule.RawIOBase) 4062 self.assertIsInstance(f, abcmodule.BufferedIOBase) 4063 self.assertNotIsInstance(f, abcmodule.TextIOBase) 4064 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: 4065 self.assertIsInstance(f, abcmodule.IOBase) 4066 self.assertNotIsInstance(f, abcmodule.RawIOBase) 4067 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 4068 self.assertIsInstance(f, abcmodule.TextIOBase) 4069 4070 def test_abc_inheritance(self): 4071 # Test implementations inherit from their respective ABCs 4072 self._check_abc_inheritance(self) 4073 4074 def test_abc_inheritance_official(self): 4075 # Test implementations inherit from the official ABCs of the 4076 # baseline "io" module. 4077 self._check_abc_inheritance(io) 4078 4079 def _check_warn_on_dealloc(self, *args, **kwargs): 4080 f = open(*args, **kwargs) 4081 r = repr(f) 4082 with self.assertWarns(ResourceWarning) as cm: 4083 f = None 4084 support.gc_collect() 4085 self.assertIn(r, str(cm.warning.args[0])) 4086 4087 def test_warn_on_dealloc(self): 4088 self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0) 4089 self._check_warn_on_dealloc(os_helper.TESTFN, "wb") 4090 self._check_warn_on_dealloc(os_helper.TESTFN, "w", encoding="utf-8") 4091 4092 def _check_warn_on_dealloc_fd(self, *args, **kwargs): 4093 fds = [] 4094 def cleanup_fds(): 4095 for fd in fds: 4096 try: 4097 os.close(fd) 4098 except OSError as e: 4099 if e.errno != errno.EBADF: 4100 raise 4101 self.addCleanup(cleanup_fds) 4102 r, w = os.pipe() 4103 fds += r, w 4104 self._check_warn_on_dealloc(r, *args, **kwargs) 4105 # When using closefd=False, there's no warning 4106 r, w = os.pipe() 4107 fds += r, w 4108 with warnings_helper.check_no_resource_warning(self): 4109 open(r, *args, closefd=False, **kwargs) 4110 4111 def test_warn_on_dealloc_fd(self): 4112 self._check_warn_on_dealloc_fd("rb", buffering=0) 4113 self._check_warn_on_dealloc_fd("rb") 4114 self._check_warn_on_dealloc_fd("r", encoding="utf-8") 4115 4116 4117 def test_pickling(self): 4118 # Pickling file objects is forbidden 4119 for kwargs in [ 4120 {"mode": "w"}, 4121 {"mode": "wb"}, 4122 {"mode": "wb", "buffering": 0}, 4123 {"mode": "r"}, 4124 {"mode": "rb"}, 4125 {"mode": "rb", "buffering": 0}, 4126 {"mode": "w+"}, 4127 {"mode": "w+b"}, 4128 {"mode": "w+b", "buffering": 0}, 4129 ]: 4130 if "b" not in kwargs["mode"]: 4131 kwargs["encoding"] = "utf-8" 4132 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 4133 with self.open(os_helper.TESTFN, **kwargs) as f: 4134 self.assertRaises(TypeError, pickle.dumps, f, protocol) 4135 4136 def test_nonblock_pipe_write_bigbuf(self): 4137 self._test_nonblock_pipe_write(16*1024) 4138 4139 def test_nonblock_pipe_write_smallbuf(self): 4140 self._test_nonblock_pipe_write(1024) 4141 4142 @unittest.skipUnless(hasattr(os, 'set_blocking'), 4143 'os.set_blocking() required for this test') 4144 def _test_nonblock_pipe_write(self, bufsize): 4145 sent = [] 4146 received = [] 4147 r, w = os.pipe() 4148 os.set_blocking(r, False) 4149 os.set_blocking(w, False) 4150 4151 # To exercise all code paths in the C implementation we need 4152 # to play with buffer sizes. For instance, if we choose a 4153 # buffer size less than or equal to _PIPE_BUF (4096 on Linux) 4154 # then we will never get a partial write of the buffer. 4155 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize) 4156 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize) 4157 4158 with rf, wf: 4159 for N in 9999, 73, 7574: 4160 try: 4161 i = 0 4162 while True: 4163 msg = bytes([i % 26 + 97]) * N 4164 sent.append(msg) 4165 wf.write(msg) 4166 i += 1 4167 4168 except self.BlockingIOError as e: 4169 self.assertEqual(e.args[0], errno.EAGAIN) 4170 self.assertEqual(e.args[2], e.characters_written) 4171 sent[-1] = sent[-1][:e.characters_written] 4172 received.append(rf.read()) 4173 msg = b'BLOCKED' 4174 wf.write(msg) 4175 sent.append(msg) 4176 4177 while True: 4178 try: 4179 wf.flush() 4180 break 4181 except self.BlockingIOError as e: 4182 self.assertEqual(e.args[0], errno.EAGAIN) 4183 self.assertEqual(e.args[2], e.characters_written) 4184 self.assertEqual(e.characters_written, 0) 4185 received.append(rf.read()) 4186 4187 received += iter(rf.read, None) 4188 4189 sent, received = b''.join(sent), b''.join(received) 4190 self.assertEqual(sent, received) 4191 self.assertTrue(wf.closed) 4192 self.assertTrue(rf.closed) 4193 4194 def test_create_fail(self): 4195 # 'x' mode fails if file is existing 4196 with self.open(os_helper.TESTFN, 'w', encoding="utf-8"): 4197 pass 4198 self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x', encoding="utf-8") 4199 4200 def test_create_writes(self): 4201 # 'x' mode opens for writing 4202 with self.open(os_helper.TESTFN, 'xb') as f: 4203 f.write(b"spam") 4204 with self.open(os_helper.TESTFN, 'rb') as f: 4205 self.assertEqual(b"spam", f.read()) 4206 4207 def test_open_allargs(self): 4208 # there used to be a buffer overflow in the parser for rawmode 4209 self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+', encoding="utf-8") 4210 4211 def test_check_encoding_errors(self): 4212 # bpo-37388: open() and TextIOWrapper must check encoding and errors 4213 # arguments in dev mode 4214 mod = self.io.__name__ 4215 filename = __file__ 4216 invalid = 'Boom, Shaka Laka, Boom!' 4217 code = textwrap.dedent(f''' 4218 import sys 4219 from {mod} import open, TextIOWrapper 4220 4221 try: 4222 open({filename!r}, encoding={invalid!r}) 4223 except LookupError: 4224 pass 4225 else: 4226 sys.exit(21) 4227 4228 try: 4229 open({filename!r}, errors={invalid!r}) 4230 except LookupError: 4231 pass 4232 else: 4233 sys.exit(22) 4234 4235 fp = open({filename!r}, "rb") 4236 with fp: 4237 try: 4238 TextIOWrapper(fp, encoding={invalid!r}) 4239 except LookupError: 4240 pass 4241 else: 4242 sys.exit(23) 4243 4244 try: 4245 TextIOWrapper(fp, errors={invalid!r}) 4246 except LookupError: 4247 pass 4248 else: 4249 sys.exit(24) 4250 4251 sys.exit(10) 4252 ''') 4253 proc = assert_python_failure('-X', 'dev', '-c', code) 4254 self.assertEqual(proc.rc, 10, proc) 4255 4256 def test_check_encoding_warning(self): 4257 # PEP 597: Raise warning when encoding is not specified 4258 # and sys.flags.warn_default_encoding is set. 4259 mod = self.io.__name__ 4260 filename = __file__ 4261 code = textwrap.dedent(f'''\ 4262 import sys 4263 from {mod} import open, TextIOWrapper 4264 import pathlib 4265 4266 with open({filename!r}) as f: # line 5 4267 pass 4268 4269 pathlib.Path({filename!r}).read_text() # line 8 4270 ''') 4271 proc = assert_python_ok('-X', 'warn_default_encoding', '-c', code) 4272 warnings = proc.err.splitlines() 4273 self.assertEqual(len(warnings), 2) 4274 self.assertTrue( 4275 warnings[0].startswith(b"<string>:5: EncodingWarning: ")) 4276 self.assertTrue( 4277 warnings[1].startswith(b"<string>:8: EncodingWarning: ")) 4278 4279 @support.cpython_only 4280 # Depending if OpenWrapper was already created or not, the warning is 4281 # emitted or not. For example, the attribute is already created when this 4282 # test is run multiple times. 4283 @warnings_helper.ignore_warnings(category=DeprecationWarning) 4284 def test_openwrapper(self): 4285 self.assertIs(self.io.OpenWrapper, self.io.open) 4286 4287 4288class CMiscIOTest(MiscIOTest): 4289 io = io 4290 4291 def test_readinto_buffer_overflow(self): 4292 # Issue #18025 4293 class BadReader(self.io.BufferedIOBase): 4294 def read(self, n=-1): 4295 return b'x' * 10**6 4296 bufio = BadReader() 4297 b = bytearray(2) 4298 self.assertRaises(ValueError, bufio.readinto, b) 4299 4300 def check_daemon_threads_shutdown_deadlock(self, stream_name): 4301 # Issue #23309: deadlocks at shutdown should be avoided when a 4302 # daemon thread and the main thread both write to a file. 4303 code = """if 1: 4304 import sys 4305 import time 4306 import threading 4307 from test.support import SuppressCrashReport 4308 4309 file = sys.{stream_name} 4310 4311 def run(): 4312 while True: 4313 file.write('.') 4314 file.flush() 4315 4316 crash = SuppressCrashReport() 4317 crash.__enter__() 4318 # don't call __exit__(): the crash occurs at Python shutdown 4319 4320 thread = threading.Thread(target=run) 4321 thread.daemon = True 4322 thread.start() 4323 4324 time.sleep(0.5) 4325 file.write('!') 4326 file.flush() 4327 """.format_map(locals()) 4328 res, _ = run_python_until_end("-c", code) 4329 err = res.err.decode() 4330 if res.rc != 0: 4331 # Failure: should be a fatal error 4332 pattern = (r"Fatal Python error: _enter_buffered_busy: " 4333 r"could not acquire lock " 4334 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> " 4335 r"at interpreter shutdown, possibly due to " 4336 r"daemon threads".format_map(locals())) 4337 self.assertRegex(err, pattern) 4338 else: 4339 self.assertFalse(err.strip('.!')) 4340 4341 def test_daemon_threads_shutdown_stdout_deadlock(self): 4342 self.check_daemon_threads_shutdown_deadlock('stdout') 4343 4344 def test_daemon_threads_shutdown_stderr_deadlock(self): 4345 self.check_daemon_threads_shutdown_deadlock('stderr') 4346 4347 4348class PyMiscIOTest(MiscIOTest): 4349 io = pyio 4350 4351 4352@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.') 4353class SignalsTest(unittest.TestCase): 4354 4355 def setUp(self): 4356 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 4357 4358 def tearDown(self): 4359 signal.signal(signal.SIGALRM, self.oldalrm) 4360 4361 def alarm_interrupt(self, sig, frame): 4362 1/0 4363 4364 def check_interrupted_write(self, item, bytes, **fdopen_kwargs): 4365 """Check that a partial write, when it gets interrupted, properly 4366 invokes the signal handler, and bubbles up the exception raised 4367 in the latter.""" 4368 4369 # XXX This test has three flaws that appear when objects are 4370 # XXX not reference counted. 4371 4372 # - if wio.write() happens to trigger a garbage collection, 4373 # the signal exception may be raised when some __del__ 4374 # method is running; it will not reach the assertRaises() 4375 # call. 4376 4377 # - more subtle, if the wio object is not destroyed at once 4378 # and survives this function, the next opened file is likely 4379 # to have the same fileno (since the file descriptor was 4380 # actively closed). When wio.__del__ is finally called, it 4381 # will close the other's test file... To trigger this with 4382 # CPython, try adding "global wio" in this function. 4383 4384 # - This happens only for streams created by the _pyio module, 4385 # because a wio.close() that fails still consider that the 4386 # file needs to be closed again. You can try adding an 4387 # "assert wio.closed" at the end of the function. 4388 4389 # Fortunately, a little gc.collect() seems to be enough to 4390 # work around all these issues. 4391 support.gc_collect() # For PyPy or other GCs. 4392 4393 read_results = [] 4394 def _read(): 4395 s = os.read(r, 1) 4396 read_results.append(s) 4397 4398 t = threading.Thread(target=_read) 4399 t.daemon = True 4400 r, w = os.pipe() 4401 fdopen_kwargs["closefd"] = False 4402 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1) 4403 try: 4404 wio = self.io.open(w, **fdopen_kwargs) 4405 if hasattr(signal, 'pthread_sigmask'): 4406 # create the thread with SIGALRM signal blocked 4407 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM]) 4408 t.start() 4409 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM]) 4410 else: 4411 t.start() 4412 4413 # Fill the pipe enough that the write will be blocking. 4414 # It will be interrupted by the timer armed above. Since the 4415 # other thread has read one byte, the low-level write will 4416 # return with a successful (partial) result rather than an EINTR. 4417 # The buffered IO layer must check for pending signal 4418 # handlers, which in this case will invoke alarm_interrupt(). 4419 signal.alarm(1) 4420 try: 4421 self.assertRaises(ZeroDivisionError, wio.write, large_data) 4422 finally: 4423 signal.alarm(0) 4424 t.join() 4425 # We got one byte, get another one and check that it isn't a 4426 # repeat of the first one. 4427 read_results.append(os.read(r, 1)) 4428 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]]) 4429 finally: 4430 os.close(w) 4431 os.close(r) 4432 # This is deliberate. If we didn't close the file descriptor 4433 # before closing wio, wio would try to flush its internal 4434 # buffer, and block again. 4435 try: 4436 wio.close() 4437 except OSError as e: 4438 if e.errno != errno.EBADF: 4439 raise 4440 4441 def test_interrupted_write_unbuffered(self): 4442 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0) 4443 4444 def test_interrupted_write_buffered(self): 4445 self.check_interrupted_write(b"xy", b"xy", mode="wb") 4446 4447 def test_interrupted_write_text(self): 4448 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") 4449 4450 @support.no_tracing 4451 def check_reentrant_write(self, data, **fdopen_kwargs): 4452 def on_alarm(*args): 4453 # Will be called reentrantly from the same thread 4454 wio.write(data) 4455 1/0 4456 signal.signal(signal.SIGALRM, on_alarm) 4457 r, w = os.pipe() 4458 wio = self.io.open(w, **fdopen_kwargs) 4459 try: 4460 signal.alarm(1) 4461 # Either the reentrant call to wio.write() fails with RuntimeError, 4462 # or the signal handler raises ZeroDivisionError. 4463 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm: 4464 while 1: 4465 for i in range(100): 4466 wio.write(data) 4467 wio.flush() 4468 # Make sure the buffer doesn't fill up and block further writes 4469 os.read(r, len(data) * 100) 4470 exc = cm.exception 4471 if isinstance(exc, RuntimeError): 4472 self.assertTrue(str(exc).startswith("reentrant call"), str(exc)) 4473 finally: 4474 signal.alarm(0) 4475 wio.close() 4476 os.close(r) 4477 4478 def test_reentrant_write_buffered(self): 4479 self.check_reentrant_write(b"xy", mode="wb") 4480 4481 def test_reentrant_write_text(self): 4482 self.check_reentrant_write("xy", mode="w", encoding="ascii") 4483 4484 def check_interrupted_read_retry(self, decode, **fdopen_kwargs): 4485 """Check that a buffered read, when it gets interrupted (either 4486 returning a partial result or EINTR), properly invokes the signal 4487 handler and retries if the latter returned successfully.""" 4488 r, w = os.pipe() 4489 fdopen_kwargs["closefd"] = False 4490 def alarm_handler(sig, frame): 4491 os.write(w, b"bar") 4492 signal.signal(signal.SIGALRM, alarm_handler) 4493 try: 4494 rio = self.io.open(r, **fdopen_kwargs) 4495 os.write(w, b"foo") 4496 signal.alarm(1) 4497 # Expected behaviour: 4498 # - first raw read() returns partial b"foo" 4499 # - second raw read() returns EINTR 4500 # - third raw read() returns b"bar" 4501 self.assertEqual(decode(rio.read(6)), "foobar") 4502 finally: 4503 signal.alarm(0) 4504 rio.close() 4505 os.close(w) 4506 os.close(r) 4507 4508 def test_interrupted_read_retry_buffered(self): 4509 self.check_interrupted_read_retry(lambda x: x.decode('latin1'), 4510 mode="rb") 4511 4512 def test_interrupted_read_retry_text(self): 4513 self.check_interrupted_read_retry(lambda x: x, 4514 mode="r", encoding="latin1") 4515 4516 def check_interrupted_write_retry(self, item, **fdopen_kwargs): 4517 """Check that a buffered write, when it gets interrupted (either 4518 returning a partial result or EINTR), properly invokes the signal 4519 handler and retries if the latter returned successfully.""" 4520 select = import_helper.import_module("select") 4521 4522 # A quantity that exceeds the buffer size of an anonymous pipe's 4523 # write end. 4524 N = support.PIPE_MAX_SIZE 4525 r, w = os.pipe() 4526 fdopen_kwargs["closefd"] = False 4527 4528 # We need a separate thread to read from the pipe and allow the 4529 # write() to finish. This thread is started after the SIGALRM is 4530 # received (forcing a first EINTR in write()). 4531 read_results = [] 4532 write_finished = False 4533 error = None 4534 def _read(): 4535 try: 4536 while not write_finished: 4537 while r in select.select([r], [], [], 1.0)[0]: 4538 s = os.read(r, 1024) 4539 read_results.append(s) 4540 except BaseException as exc: 4541 nonlocal error 4542 error = exc 4543 t = threading.Thread(target=_read) 4544 t.daemon = True 4545 def alarm1(sig, frame): 4546 signal.signal(signal.SIGALRM, alarm2) 4547 signal.alarm(1) 4548 def alarm2(sig, frame): 4549 t.start() 4550 4551 large_data = item * N 4552 signal.signal(signal.SIGALRM, alarm1) 4553 try: 4554 wio = self.io.open(w, **fdopen_kwargs) 4555 signal.alarm(1) 4556 # Expected behaviour: 4557 # - first raw write() is partial (because of the limited pipe buffer 4558 # and the first alarm) 4559 # - second raw write() returns EINTR (because of the second alarm) 4560 # - subsequent write()s are successful (either partial or complete) 4561 written = wio.write(large_data) 4562 self.assertEqual(N, written) 4563 4564 wio.flush() 4565 write_finished = True 4566 t.join() 4567 4568 self.assertIsNone(error) 4569 self.assertEqual(N, sum(len(x) for x in read_results)) 4570 finally: 4571 signal.alarm(0) 4572 write_finished = True 4573 os.close(w) 4574 os.close(r) 4575 # This is deliberate. If we didn't close the file descriptor 4576 # before closing wio, wio would try to flush its internal 4577 # buffer, and could block (in case of failure). 4578 try: 4579 wio.close() 4580 except OSError as e: 4581 if e.errno != errno.EBADF: 4582 raise 4583 4584 def test_interrupted_write_retry_buffered(self): 4585 self.check_interrupted_write_retry(b"x", mode="wb") 4586 4587 def test_interrupted_write_retry_text(self): 4588 self.check_interrupted_write_retry("x", mode="w", encoding="latin1") 4589 4590 4591class CSignalsTest(SignalsTest): 4592 io = io 4593 4594class PySignalsTest(SignalsTest): 4595 io = pyio 4596 4597 # Handling reentrancy issues would slow down _pyio even more, so the 4598 # tests are disabled. 4599 test_reentrant_write_buffered = None 4600 test_reentrant_write_text = None 4601 4602 4603def load_tests(*args): 4604 tests = (CIOTest, PyIOTest, APIMismatchTest, 4605 CBufferedReaderTest, PyBufferedReaderTest, 4606 CBufferedWriterTest, PyBufferedWriterTest, 4607 CBufferedRWPairTest, PyBufferedRWPairTest, 4608 CBufferedRandomTest, PyBufferedRandomTest, 4609 StatefulIncrementalDecoderTest, 4610 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest, 4611 CTextIOWrapperTest, PyTextIOWrapperTest, 4612 CMiscIOTest, PyMiscIOTest, 4613 CSignalsTest, PySignalsTest, 4614 ) 4615 4616 # Put the namespaces of the IO module we are testing and some useful mock 4617 # classes in the __dict__ of each test. 4618 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, 4619 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead, 4620 SlowFlushRawIO) 4621 all_members = io.__all__ + ["IncrementalNewlineDecoder"] 4622 c_io_ns = {name : getattr(io, name) for name in all_members} 4623 py_io_ns = {name : getattr(pyio, name) for name in all_members} 4624 globs = globals() 4625 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks) 4626 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks) 4627 for test in tests: 4628 if test.__name__.startswith("C"): 4629 for name, obj in c_io_ns.items(): 4630 setattr(test, name, obj) 4631 elif test.__name__.startswith("Py"): 4632 for name, obj in py_io_ns.items(): 4633 setattr(test, name, obj) 4634 4635 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests]) 4636 return suite 4637 4638if __name__ == "__main__": 4639 unittest.main() 4640