1"""Unit tests for the io module.""" 2 3# Tests of io are scattered over the test suite: 4# * test_bufio - tests file buffering 5# * test_memoryio - tests BytesIO and StringIO 6# * test_fileio - tests FileIO 7# * test_file - tests the file interface 8# * test_io - tests everything else in the io module 9# * test_univnewlines - tests universal newline support 10# * test_largefile - tests operations on a file greater than 2**32 bytes 11# (only enabled with -ulargefile) 12 13################################################################################ 14# ATTENTION TEST WRITERS!!! 15################################################################################ 16# When writing tests for io, it's important to test both the C and Python 17# implementations. This is usually done by writing a base test that refers to 18# the type it is testing as an attribute. Then it provides custom subclasses to 19# test both implementations. This file has lots of examples. 20################################################################################ 21 22import abc 23import array 24import errno 25import locale 26import os 27import pickle 28import random 29import signal 30import sys 31import sysconfig 32import threading 33import time 34import unittest 35import warnings 36import weakref 37from collections import deque, UserList 38from itertools import cycle, count 39from test import support 40from test.support.script_helper import assert_python_ok, run_python_until_end 41from test.support import FakePath 42 43import codecs 44import io # C implementation of io 45import _pyio as pyio # Python implementation of io 46 47try: 48 import ctypes 49except ImportError: 50 def byteslike(*pos, **kw): 51 return array.array("b", bytes(*pos, **kw)) 52else: 53 def byteslike(*pos, **kw): 54 """Create a bytes-like object having no string or sequence methods""" 55 data = bytes(*pos, **kw) 56 obj = EmptyStruct() 57 ctypes.resize(obj, len(data)) 58 memoryview(obj).cast("B")[:] = data 59 return obj 60 class EmptyStruct(ctypes.Structure): 61 pass 62 63_cflags = sysconfig.get_config_var('CFLAGS') or '' 64_config_args = sysconfig.get_config_var('CONFIG_ARGS') or '' 65MEMORY_SANITIZER = ( 66 '-fsanitize=memory' in _cflags or 67 '--with-memory-sanitizer' in _config_args 68) 69 70def _default_chunk_size(): 71 """Get the default TextIOWrapper chunk size""" 72 with open(__file__, "r", encoding="latin-1") as f: 73 return f._CHUNK_SIZE 74 75 76class MockRawIOWithoutRead: 77 """A RawIO implementation without read(), so as to exercise the default 78 RawIO.read() which calls readinto().""" 79 80 def __init__(self, read_stack=()): 81 self._read_stack = list(read_stack) 82 self._write_stack = [] 83 self._reads = 0 84 self._extraneous_reads = 0 85 86 def write(self, b): 87 self._write_stack.append(bytes(b)) 88 return len(b) 89 90 def writable(self): 91 return True 92 93 def fileno(self): 94 return 42 95 96 def readable(self): 97 return True 98 99 def seekable(self): 100 return True 101 102 def seek(self, pos, whence): 103 return 0 # wrong but we gotta return something 104 105 def tell(self): 106 return 0 # same comment as above 107 108 def readinto(self, buf): 109 self._reads += 1 110 max_len = len(buf) 111 try: 112 data = self._read_stack[0] 113 except IndexError: 114 self._extraneous_reads += 1 115 return 0 116 if data is None: 117 del self._read_stack[0] 118 return None 119 n = len(data) 120 if len(data) <= max_len: 121 del self._read_stack[0] 122 buf[:n] = data 123 return n 124 else: 125 buf[:] = data[:max_len] 126 self._read_stack[0] = data[max_len:] 127 return max_len 128 129 def truncate(self, pos=None): 130 return pos 131 132class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase): 133 pass 134 135class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase): 136 pass 137 138 139class MockRawIO(MockRawIOWithoutRead): 140 141 def read(self, n=None): 142 self._reads += 1 143 try: 144 return self._read_stack.pop(0) 145 except: 146 self._extraneous_reads += 1 147 return b"" 148 149class CMockRawIO(MockRawIO, io.RawIOBase): 150 pass 151 152class PyMockRawIO(MockRawIO, pyio.RawIOBase): 153 pass 154 155 156class MisbehavedRawIO(MockRawIO): 157 def write(self, b): 158 return super().write(b) * 2 159 160 def read(self, n=None): 161 return super().read(n) * 2 162 163 def seek(self, pos, whence): 164 return -123 165 166 def tell(self): 167 return -456 168 169 def readinto(self, buf): 170 super().readinto(buf) 171 return len(buf) * 5 172 173class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase): 174 pass 175 176class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase): 177 pass 178 179 180class SlowFlushRawIO(MockRawIO): 181 def __init__(self): 182 super().__init__() 183 self.in_flush = threading.Event() 184 185 def flush(self): 186 self.in_flush.set() 187 time.sleep(0.25) 188 189class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase): 190 pass 191 192class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase): 193 pass 194 195 196class CloseFailureIO(MockRawIO): 197 closed = 0 198 199 def close(self): 200 if not self.closed: 201 self.closed = 1 202 raise OSError 203 204class CCloseFailureIO(CloseFailureIO, io.RawIOBase): 205 pass 206 207class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase): 208 pass 209 210 211class MockFileIO: 212 213 def __init__(self, data): 214 self.read_history = [] 215 super().__init__(data) 216 217 def read(self, n=None): 218 res = super().read(n) 219 self.read_history.append(None if res is None else len(res)) 220 return res 221 222 def readinto(self, b): 223 res = super().readinto(b) 224 self.read_history.append(res) 225 return res 226 227class CMockFileIO(MockFileIO, io.BytesIO): 228 pass 229 230class PyMockFileIO(MockFileIO, pyio.BytesIO): 231 pass 232 233 234class MockUnseekableIO: 235 def seekable(self): 236 return False 237 238 def seek(self, *args): 239 raise self.UnsupportedOperation("not seekable") 240 241 def tell(self, *args): 242 raise self.UnsupportedOperation("not seekable") 243 244 def truncate(self, *args): 245 raise self.UnsupportedOperation("not seekable") 246 247class CMockUnseekableIO(MockUnseekableIO, io.BytesIO): 248 UnsupportedOperation = io.UnsupportedOperation 249 250class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO): 251 UnsupportedOperation = pyio.UnsupportedOperation 252 253 254class MockNonBlockWriterIO: 255 256 def __init__(self): 257 self._write_stack = [] 258 self._blocker_char = None 259 260 def pop_written(self): 261 s = b"".join(self._write_stack) 262 self._write_stack[:] = [] 263 return s 264 265 def block_on(self, char): 266 """Block when a given char is encountered.""" 267 self._blocker_char = char 268 269 def readable(self): 270 return True 271 272 def seekable(self): 273 return True 274 275 def writable(self): 276 return True 277 278 def write(self, b): 279 b = bytes(b) 280 n = -1 281 if self._blocker_char: 282 try: 283 n = b.index(self._blocker_char) 284 except ValueError: 285 pass 286 else: 287 if n > 0: 288 # write data up to the first blocker 289 self._write_stack.append(b[:n]) 290 return n 291 else: 292 # cancel blocker and indicate would block 293 self._blocker_char = None 294 return None 295 self._write_stack.append(b) 296 return len(b) 297 298class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase): 299 BlockingIOError = io.BlockingIOError 300 301class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): 302 BlockingIOError = pyio.BlockingIOError 303 304 305class IOTest(unittest.TestCase): 306 307 def setUp(self): 308 support.unlink(support.TESTFN) 309 310 def tearDown(self): 311 support.unlink(support.TESTFN) 312 313 def write_ops(self, f): 314 self.assertEqual(f.write(b"blah."), 5) 315 f.truncate(0) 316 self.assertEqual(f.tell(), 5) 317 f.seek(0) 318 319 self.assertEqual(f.write(b"blah."), 5) 320 self.assertEqual(f.seek(0), 0) 321 self.assertEqual(f.write(b"Hello."), 6) 322 self.assertEqual(f.tell(), 6) 323 self.assertEqual(f.seek(-1, 1), 5) 324 self.assertEqual(f.tell(), 5) 325 buffer = bytearray(b" world\n\n\n") 326 self.assertEqual(f.write(buffer), 9) 327 buffer[:] = b"*" * 9 # Overwrite our copy of the data 328 self.assertEqual(f.seek(0), 0) 329 self.assertEqual(f.write(b"h"), 1) 330 self.assertEqual(f.seek(-1, 2), 13) 331 self.assertEqual(f.tell(), 13) 332 333 self.assertEqual(f.truncate(12), 12) 334 self.assertEqual(f.tell(), 13) 335 self.assertRaises(TypeError, f.seek, 0.0) 336 337 def read_ops(self, f, buffered=False): 338 data = f.read(5) 339 self.assertEqual(data, b"hello") 340 data = byteslike(data) 341 self.assertEqual(f.readinto(data), 5) 342 self.assertEqual(bytes(data), b" worl") 343 data = bytearray(5) 344 self.assertEqual(f.readinto(data), 2) 345 self.assertEqual(len(data), 5) 346 self.assertEqual(data[:2], b"d\n") 347 self.assertEqual(f.seek(0), 0) 348 self.assertEqual(f.read(20), b"hello world\n") 349 self.assertEqual(f.read(1), b"") 350 self.assertEqual(f.readinto(byteslike(b"x")), 0) 351 self.assertEqual(f.seek(-6, 2), 6) 352 self.assertEqual(f.read(5), b"world") 353 self.assertEqual(f.read(0), b"") 354 self.assertEqual(f.readinto(byteslike()), 0) 355 self.assertEqual(f.seek(-6, 1), 5) 356 self.assertEqual(f.read(5), b" worl") 357 self.assertEqual(f.tell(), 10) 358 self.assertRaises(TypeError, f.seek, 0.0) 359 if buffered: 360 f.seek(0) 361 self.assertEqual(f.read(), b"hello world\n") 362 f.seek(6) 363 self.assertEqual(f.read(), b"world\n") 364 self.assertEqual(f.read(), b"") 365 f.seek(0) 366 data = byteslike(5) 367 self.assertEqual(f.readinto1(data), 5) 368 self.assertEqual(bytes(data), b"hello") 369 370 LARGE = 2**31 371 372 def large_file_ops(self, f): 373 assert f.readable() 374 assert f.writable() 375 try: 376 self.assertEqual(f.seek(self.LARGE), self.LARGE) 377 except (OverflowError, ValueError): 378 self.skipTest("no largefile support") 379 self.assertEqual(f.tell(), self.LARGE) 380 self.assertEqual(f.write(b"xxx"), 3) 381 self.assertEqual(f.tell(), self.LARGE + 3) 382 self.assertEqual(f.seek(-1, 1), self.LARGE + 2) 383 self.assertEqual(f.truncate(), self.LARGE + 2) 384 self.assertEqual(f.tell(), self.LARGE + 2) 385 self.assertEqual(f.seek(0, 2), self.LARGE + 2) 386 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1) 387 self.assertEqual(f.tell(), self.LARGE + 2) 388 self.assertEqual(f.seek(0, 2), self.LARGE + 1) 389 self.assertEqual(f.seek(-1, 2), self.LARGE) 390 self.assertEqual(f.read(2), b"x") 391 392 def test_invalid_operations(self): 393 # Try writing on a file opened in read mode and vice-versa. 394 exc = self.UnsupportedOperation 395 for mode in ("w", "wb"): 396 with self.open(support.TESTFN, mode) as fp: 397 self.assertRaises(exc, fp.read) 398 self.assertRaises(exc, fp.readline) 399 with self.open(support.TESTFN, "wb", buffering=0) as fp: 400 self.assertRaises(exc, fp.read) 401 self.assertRaises(exc, fp.readline) 402 with self.open(support.TESTFN, "rb", buffering=0) as fp: 403 self.assertRaises(exc, fp.write, b"blah") 404 self.assertRaises(exc, fp.writelines, [b"blah\n"]) 405 with self.open(support.TESTFN, "rb") as fp: 406 self.assertRaises(exc, fp.write, b"blah") 407 self.assertRaises(exc, fp.writelines, [b"blah\n"]) 408 with self.open(support.TESTFN, "r") as fp: 409 self.assertRaises(exc, fp.write, "blah") 410 self.assertRaises(exc, fp.writelines, ["blah\n"]) 411 # Non-zero seeking from current or end pos 412 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR) 413 self.assertRaises(exc, fp.seek, -1, self.SEEK_END) 414 415 def test_optional_abilities(self): 416 # Test for OSError when optional APIs are not supported 417 # The purpose of this test is to try fileno(), reading, writing and 418 # seeking operations with various objects that indicate they do not 419 # support these operations. 420 421 def pipe_reader(): 422 [r, w] = os.pipe() 423 os.close(w) # So that read() is harmless 424 return self.FileIO(r, "r") 425 426 def pipe_writer(): 427 [r, w] = os.pipe() 428 self.addCleanup(os.close, r) 429 # Guarantee that we can write into the pipe without blocking 430 thread = threading.Thread(target=os.read, args=(r, 100)) 431 thread.start() 432 self.addCleanup(thread.join) 433 return self.FileIO(w, "w") 434 435 def buffered_reader(): 436 return self.BufferedReader(self.MockUnseekableIO()) 437 438 def buffered_writer(): 439 return self.BufferedWriter(self.MockUnseekableIO()) 440 441 def buffered_random(): 442 return self.BufferedRandom(self.BytesIO()) 443 444 def buffered_rw_pair(): 445 return self.BufferedRWPair(self.MockUnseekableIO(), 446 self.MockUnseekableIO()) 447 448 def text_reader(): 449 class UnseekableReader(self.MockUnseekableIO): 450 writable = self.BufferedIOBase.writable 451 write = self.BufferedIOBase.write 452 return self.TextIOWrapper(UnseekableReader(), "ascii") 453 454 def text_writer(): 455 class UnseekableWriter(self.MockUnseekableIO): 456 readable = self.BufferedIOBase.readable 457 read = self.BufferedIOBase.read 458 return self.TextIOWrapper(UnseekableWriter(), "ascii") 459 460 tests = ( 461 (pipe_reader, "fr"), (pipe_writer, "fw"), 462 (buffered_reader, "r"), (buffered_writer, "w"), 463 (buffered_random, "rws"), (buffered_rw_pair, "rw"), 464 (text_reader, "r"), (text_writer, "w"), 465 (self.BytesIO, "rws"), (self.StringIO, "rws"), 466 ) 467 for [test, abilities] in tests: 468 with self.subTest(test), test() as obj: 469 readable = "r" in abilities 470 self.assertEqual(obj.readable(), readable) 471 writable = "w" in abilities 472 self.assertEqual(obj.writable(), writable) 473 474 if isinstance(obj, self.TextIOBase): 475 data = "3" 476 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)): 477 data = b"3" 478 else: 479 self.fail("Unknown base class") 480 481 if "f" in abilities: 482 obj.fileno() 483 else: 484 self.assertRaises(OSError, obj.fileno) 485 486 if readable: 487 obj.read(1) 488 obj.read() 489 else: 490 self.assertRaises(OSError, obj.read, 1) 491 self.assertRaises(OSError, obj.read) 492 493 if writable: 494 obj.write(data) 495 else: 496 self.assertRaises(OSError, obj.write, data) 497 498 if sys.platform.startswith("win") and test in ( 499 pipe_reader, pipe_writer): 500 # Pipes seem to appear as seekable on Windows 501 continue 502 seekable = "s" in abilities 503 self.assertEqual(obj.seekable(), seekable) 504 505 if seekable: 506 obj.tell() 507 obj.seek(0) 508 else: 509 self.assertRaises(OSError, obj.tell) 510 self.assertRaises(OSError, obj.seek, 0) 511 512 if writable and seekable: 513 obj.truncate() 514 obj.truncate(0) 515 else: 516 self.assertRaises(OSError, obj.truncate) 517 self.assertRaises(OSError, obj.truncate, 0) 518 519 def test_open_handles_NUL_chars(self): 520 fn_with_NUL = 'foo\0bar' 521 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w') 522 523 bytes_fn = bytes(fn_with_NUL, 'ascii') 524 with warnings.catch_warnings(): 525 warnings.simplefilter("ignore", DeprecationWarning) 526 self.assertRaises(ValueError, self.open, bytes_fn, 'w') 527 528 def test_raw_file_io(self): 529 with self.open(support.TESTFN, "wb", buffering=0) as f: 530 self.assertEqual(f.readable(), False) 531 self.assertEqual(f.writable(), True) 532 self.assertEqual(f.seekable(), True) 533 self.write_ops(f) 534 with self.open(support.TESTFN, "rb", buffering=0) as f: 535 self.assertEqual(f.readable(), True) 536 self.assertEqual(f.writable(), False) 537 self.assertEqual(f.seekable(), True) 538 self.read_ops(f) 539 540 def test_buffered_file_io(self): 541 with self.open(support.TESTFN, "wb") as f: 542 self.assertEqual(f.readable(), False) 543 self.assertEqual(f.writable(), True) 544 self.assertEqual(f.seekable(), True) 545 self.write_ops(f) 546 with self.open(support.TESTFN, "rb") as f: 547 self.assertEqual(f.readable(), True) 548 self.assertEqual(f.writable(), False) 549 self.assertEqual(f.seekable(), True) 550 self.read_ops(f, True) 551 552 def test_readline(self): 553 with self.open(support.TESTFN, "wb") as f: 554 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line") 555 with self.open(support.TESTFN, "rb") as f: 556 self.assertEqual(f.readline(), b"abc\n") 557 self.assertEqual(f.readline(10), b"def\n") 558 self.assertEqual(f.readline(2), b"xy") 559 self.assertEqual(f.readline(4), b"zzy\n") 560 self.assertEqual(f.readline(), b"foo\x00bar\n") 561 self.assertEqual(f.readline(None), b"another line") 562 self.assertRaises(TypeError, f.readline, 5.3) 563 with self.open(support.TESTFN, "r") as f: 564 self.assertRaises(TypeError, f.readline, 5.3) 565 566 def test_readline_nonsizeable(self): 567 # Issue #30061 568 # Crash when readline() returns an object without __len__ 569 class R(self.IOBase): 570 def readline(self): 571 return None 572 self.assertRaises((TypeError, StopIteration), next, R()) 573 574 def test_next_nonsizeable(self): 575 # Issue #30061 576 # Crash when __next__() returns an object without __len__ 577 class R(self.IOBase): 578 def __next__(self): 579 return None 580 self.assertRaises(TypeError, R().readlines, 1) 581 582 def test_raw_bytes_io(self): 583 f = self.BytesIO() 584 self.write_ops(f) 585 data = f.getvalue() 586 self.assertEqual(data, b"hello world\n") 587 f = self.BytesIO(data) 588 self.read_ops(f, True) 589 590 def test_large_file_ops(self): 591 # On Windows and Mac OSX this test consumes large resources; It takes 592 # a long time to build the >2 GiB file and takes >2 GiB of disk space 593 # therefore the resource must be enabled to run this test. 594 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 595 support.requires( 596 'largefile', 597 'test requires %s bytes and a long time to run' % self.LARGE) 598 with self.open(support.TESTFN, "w+b", 0) as f: 599 self.large_file_ops(f) 600 with self.open(support.TESTFN, "w+b") as f: 601 self.large_file_ops(f) 602 603 def test_with_open(self): 604 for bufsize in (0, 1, 100): 605 f = None 606 with self.open(support.TESTFN, "wb", bufsize) as f: 607 f.write(b"xxx") 608 self.assertEqual(f.closed, True) 609 f = None 610 try: 611 with self.open(support.TESTFN, "wb", bufsize) as f: 612 1/0 613 except ZeroDivisionError: 614 self.assertEqual(f.closed, True) 615 else: 616 self.fail("1/0 didn't raise an exception") 617 618 # issue 5008 619 def test_append_mode_tell(self): 620 with self.open(support.TESTFN, "wb") as f: 621 f.write(b"xxx") 622 with self.open(support.TESTFN, "ab", buffering=0) as f: 623 self.assertEqual(f.tell(), 3) 624 with self.open(support.TESTFN, "ab") as f: 625 self.assertEqual(f.tell(), 3) 626 with self.open(support.TESTFN, "a") as f: 627 self.assertGreater(f.tell(), 0) 628 629 def test_destructor(self): 630 record = [] 631 class MyFileIO(self.FileIO): 632 def __del__(self): 633 record.append(1) 634 try: 635 f = super().__del__ 636 except AttributeError: 637 pass 638 else: 639 f() 640 def close(self): 641 record.append(2) 642 super().close() 643 def flush(self): 644 record.append(3) 645 super().flush() 646 with support.check_warnings(('', ResourceWarning)): 647 f = MyFileIO(support.TESTFN, "wb") 648 f.write(b"xxx") 649 del f 650 support.gc_collect() 651 self.assertEqual(record, [1, 2, 3]) 652 with self.open(support.TESTFN, "rb") as f: 653 self.assertEqual(f.read(), b"xxx") 654 655 def _check_base_destructor(self, base): 656 record = [] 657 class MyIO(base): 658 def __init__(self): 659 # This exercises the availability of attributes on object 660 # destruction. 661 # (in the C version, close() is called by the tp_dealloc 662 # function, not by __del__) 663 self.on_del = 1 664 self.on_close = 2 665 self.on_flush = 3 666 def __del__(self): 667 record.append(self.on_del) 668 try: 669 f = super().__del__ 670 except AttributeError: 671 pass 672 else: 673 f() 674 def close(self): 675 record.append(self.on_close) 676 super().close() 677 def flush(self): 678 record.append(self.on_flush) 679 super().flush() 680 f = MyIO() 681 del f 682 support.gc_collect() 683 self.assertEqual(record, [1, 2, 3]) 684 685 def test_IOBase_destructor(self): 686 self._check_base_destructor(self.IOBase) 687 688 def test_RawIOBase_destructor(self): 689 self._check_base_destructor(self.RawIOBase) 690 691 def test_BufferedIOBase_destructor(self): 692 self._check_base_destructor(self.BufferedIOBase) 693 694 def test_TextIOBase_destructor(self): 695 self._check_base_destructor(self.TextIOBase) 696 697 def test_close_flushes(self): 698 with self.open(support.TESTFN, "wb") as f: 699 f.write(b"xxx") 700 with self.open(support.TESTFN, "rb") as f: 701 self.assertEqual(f.read(), b"xxx") 702 703 def test_array_writes(self): 704 a = array.array('i', range(10)) 705 n = len(a.tobytes()) 706 def check(f): 707 with f: 708 self.assertEqual(f.write(a), n) 709 f.writelines((a,)) 710 check(self.BytesIO()) 711 check(self.FileIO(support.TESTFN, "w")) 712 check(self.BufferedWriter(self.MockRawIO())) 713 check(self.BufferedRandom(self.MockRawIO())) 714 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())) 715 716 def test_closefd(self): 717 self.assertRaises(ValueError, self.open, support.TESTFN, 'w', 718 closefd=False) 719 720 def test_read_closed(self): 721 with self.open(support.TESTFN, "w") as f: 722 f.write("egg\n") 723 with self.open(support.TESTFN, "r") as f: 724 file = self.open(f.fileno(), "r", closefd=False) 725 self.assertEqual(file.read(), "egg\n") 726 file.seek(0) 727 file.close() 728 self.assertRaises(ValueError, file.read) 729 730 def test_no_closefd_with_filename(self): 731 # can't use closefd in combination with a file name 732 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False) 733 734 def test_closefd_attr(self): 735 with self.open(support.TESTFN, "wb") as f: 736 f.write(b"egg\n") 737 with self.open(support.TESTFN, "r") as f: 738 self.assertEqual(f.buffer.raw.closefd, True) 739 file = self.open(f.fileno(), "r", closefd=False) 740 self.assertEqual(file.buffer.raw.closefd, False) 741 742 def test_garbage_collection(self): 743 # FileIO objects are collected, and collecting them flushes 744 # all data to disk. 745 with support.check_warnings(('', ResourceWarning)): 746 f = self.FileIO(support.TESTFN, "wb") 747 f.write(b"abcxxx") 748 f.f = f 749 wr = weakref.ref(f) 750 del f 751 support.gc_collect() 752 self.assertIsNone(wr(), wr) 753 with self.open(support.TESTFN, "rb") as f: 754 self.assertEqual(f.read(), b"abcxxx") 755 756 def test_unbounded_file(self): 757 # Issue #1174606: reading from an unbounded stream such as /dev/zero. 758 zero = "/dev/zero" 759 if not os.path.exists(zero): 760 self.skipTest("{0} does not exist".format(zero)) 761 if sys.maxsize > 0x7FFFFFFF: 762 self.skipTest("test can only run in a 32-bit address space") 763 if support.real_max_memuse < support._2G: 764 self.skipTest("test requires at least 2 GiB of memory") 765 with self.open(zero, "rb", buffering=0) as f: 766 self.assertRaises(OverflowError, f.read) 767 with self.open(zero, "rb") as f: 768 self.assertRaises(OverflowError, f.read) 769 with self.open(zero, "r") as f: 770 self.assertRaises(OverflowError, f.read) 771 772 def check_flush_error_on_close(self, *args, **kwargs): 773 # Test that the file is closed despite failed flush 774 # and that flush() is called before file closed. 775 f = self.open(*args, **kwargs) 776 closed = [] 777 def bad_flush(): 778 closed[:] = [f.closed] 779 raise OSError() 780 f.flush = bad_flush 781 self.assertRaises(OSError, f.close) # exception not swallowed 782 self.assertTrue(f.closed) 783 self.assertTrue(closed) # flush() called 784 self.assertFalse(closed[0]) # flush() called before file closed 785 f.flush = lambda: None # break reference loop 786 787 def test_flush_error_on_close(self): 788 # raw file 789 # Issue #5700: io.FileIO calls flush() after file closed 790 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0) 791 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 792 self.check_flush_error_on_close(fd, 'wb', buffering=0) 793 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 794 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False) 795 os.close(fd) 796 # buffered io 797 self.check_flush_error_on_close(support.TESTFN, 'wb') 798 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 799 self.check_flush_error_on_close(fd, 'wb') 800 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 801 self.check_flush_error_on_close(fd, 'wb', closefd=False) 802 os.close(fd) 803 # text io 804 self.check_flush_error_on_close(support.TESTFN, 'w') 805 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 806 self.check_flush_error_on_close(fd, 'w') 807 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 808 self.check_flush_error_on_close(fd, 'w', closefd=False) 809 os.close(fd) 810 811 def test_multi_close(self): 812 f = self.open(support.TESTFN, "wb", buffering=0) 813 f.close() 814 f.close() 815 f.close() 816 self.assertRaises(ValueError, f.flush) 817 818 def test_RawIOBase_read(self): 819 # Exercise the default limited RawIOBase.read(n) implementation (which 820 # calls readinto() internally). 821 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None)) 822 self.assertEqual(rawio.read(2), b"ab") 823 self.assertEqual(rawio.read(2), b"c") 824 self.assertEqual(rawio.read(2), b"d") 825 self.assertEqual(rawio.read(2), None) 826 self.assertEqual(rawio.read(2), b"ef") 827 self.assertEqual(rawio.read(2), b"g") 828 self.assertEqual(rawio.read(2), None) 829 self.assertEqual(rawio.read(2), b"") 830 831 def test_types_have_dict(self): 832 test = ( 833 self.IOBase(), 834 self.RawIOBase(), 835 self.TextIOBase(), 836 self.StringIO(), 837 self.BytesIO() 838 ) 839 for obj in test: 840 self.assertTrue(hasattr(obj, "__dict__")) 841 842 def test_opener(self): 843 with self.open(support.TESTFN, "w") as f: 844 f.write("egg\n") 845 fd = os.open(support.TESTFN, os.O_RDONLY) 846 def opener(path, flags): 847 return fd 848 with self.open("non-existent", "r", opener=opener) as f: 849 self.assertEqual(f.read(), "egg\n") 850 851 def test_bad_opener_negative_1(self): 852 # Issue #27066. 853 def badopener(fname, flags): 854 return -1 855 with self.assertRaises(ValueError) as cm: 856 open('non-existent', 'r', opener=badopener) 857 self.assertEqual(str(cm.exception), 'opener returned -1') 858 859 def test_bad_opener_other_negative(self): 860 # Issue #27066. 861 def badopener(fname, flags): 862 return -2 863 with self.assertRaises(ValueError) as cm: 864 open('non-existent', 'r', opener=badopener) 865 self.assertEqual(str(cm.exception), 'opener returned -2') 866 867 def test_fileio_closefd(self): 868 # Issue #4841 869 with self.open(__file__, 'rb') as f1, \ 870 self.open(__file__, 'rb') as f2: 871 fileio = self.FileIO(f1.fileno(), closefd=False) 872 # .__init__() must not close f1 873 fileio.__init__(f2.fileno(), closefd=False) 874 f1.readline() 875 # .close() must not close f2 876 fileio.close() 877 f2.readline() 878 879 def test_nonbuffered_textio(self): 880 with support.check_no_resource_warning(self): 881 with self.assertRaises(ValueError): 882 self.open(support.TESTFN, 'w', buffering=0) 883 884 def test_invalid_newline(self): 885 with support.check_no_resource_warning(self): 886 with self.assertRaises(ValueError): 887 self.open(support.TESTFN, 'w', newline='invalid') 888 889 def test_buffered_readinto_mixin(self): 890 # Test the implementation provided by BufferedIOBase 891 class Stream(self.BufferedIOBase): 892 def read(self, size): 893 return b"12345" 894 read1 = read 895 stream = Stream() 896 for method in ("readinto", "readinto1"): 897 with self.subTest(method): 898 buffer = byteslike(5) 899 self.assertEqual(getattr(stream, method)(buffer), 5) 900 self.assertEqual(bytes(buffer), b"12345") 901 902 def test_fspath_support(self): 903 def check_path_succeeds(path): 904 with self.open(path, "w") as f: 905 f.write("egg\n") 906 907 with self.open(path, "r") as f: 908 self.assertEqual(f.read(), "egg\n") 909 910 check_path_succeeds(FakePath(support.TESTFN)) 911 check_path_succeeds(FakePath(support.TESTFN.encode('utf-8'))) 912 913 with self.open(support.TESTFN, "w") as f: 914 bad_path = FakePath(f.fileno()) 915 with self.assertRaises(TypeError): 916 self.open(bad_path, 'w') 917 918 bad_path = FakePath(None) 919 with self.assertRaises(TypeError): 920 self.open(bad_path, 'w') 921 922 bad_path = FakePath(FloatingPointError) 923 with self.assertRaises(FloatingPointError): 924 self.open(bad_path, 'w') 925 926 # ensure that refcounting is correct with some error conditions 927 with self.assertRaisesRegex(ValueError, 'read/write/append mode'): 928 self.open(FakePath(support.TESTFN), 'rwxa') 929 930 def test_RawIOBase_readall(self): 931 # Exercise the default unlimited RawIOBase.read() and readall() 932 # implementations. 933 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg")) 934 self.assertEqual(rawio.read(), b"abcdefg") 935 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg")) 936 self.assertEqual(rawio.readall(), b"abcdefg") 937 938 def test_BufferedIOBase_readinto(self): 939 # Exercise the default BufferedIOBase.readinto() and readinto1() 940 # implementations (which call read() or read1() internally). 941 class Reader(self.BufferedIOBase): 942 def __init__(self, avail): 943 self.avail = avail 944 def read(self, size): 945 result = self.avail[:size] 946 self.avail = self.avail[size:] 947 return result 948 def read1(self, size): 949 """Returns no more than 5 bytes at once""" 950 return self.read(min(size, 5)) 951 tests = ( 952 # (test method, total data available, read buffer size, expected 953 # read size) 954 ("readinto", 10, 5, 5), 955 ("readinto", 10, 6, 6), # More than read1() can return 956 ("readinto", 5, 6, 5), # Buffer larger than total available 957 ("readinto", 6, 7, 6), 958 ("readinto", 10, 0, 0), # Empty buffer 959 ("readinto1", 10, 5, 5), # Result limited to single read1() call 960 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return 961 ("readinto1", 5, 6, 5), # Buffer larger than total available 962 ("readinto1", 6, 7, 5), 963 ("readinto1", 10, 0, 0), # Empty buffer 964 ) 965 UNUSED_BYTE = 0x81 966 for test in tests: 967 with self.subTest(test): 968 method, avail, request, result = test 969 reader = Reader(bytes(range(avail))) 970 buffer = bytearray((UNUSED_BYTE,) * request) 971 method = getattr(reader, method) 972 self.assertEqual(method(buffer), result) 973 self.assertEqual(len(buffer), request) 974 self.assertSequenceEqual(buffer[:result], range(result)) 975 unused = (UNUSED_BYTE,) * (request - result) 976 self.assertSequenceEqual(buffer[result:], unused) 977 self.assertEqual(len(reader.avail), avail - result) 978 979 def test_close_assert(self): 980 class R(self.IOBase): 981 def __setattr__(self, name, value): 982 pass 983 def flush(self): 984 raise OSError() 985 f = R() 986 # This would cause an assertion failure. 987 self.assertRaises(OSError, f.close) 988 989 990class CIOTest(IOTest): 991 992 def test_IOBase_finalize(self): 993 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a 994 # class which inherits IOBase and an object of this class are caught 995 # in a reference cycle and close() is already in the method cache. 996 class MyIO(self.IOBase): 997 def close(self): 998 pass 999 1000 # create an instance to populate the method cache 1001 MyIO() 1002 obj = MyIO() 1003 obj.obj = obj 1004 wr = weakref.ref(obj) 1005 del MyIO 1006 del obj 1007 support.gc_collect() 1008 self.assertIsNone(wr(), wr) 1009 1010class PyIOTest(IOTest): 1011 pass 1012 1013 1014@support.cpython_only 1015class APIMismatchTest(unittest.TestCase): 1016 1017 def test_RawIOBase_io_in_pyio_match(self): 1018 """Test that pyio RawIOBase class has all c RawIOBase methods""" 1019 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase, 1020 ignore=('__weakref__',)) 1021 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods') 1022 1023 def test_RawIOBase_pyio_in_io_match(self): 1024 """Test that c RawIOBase class has all pyio RawIOBase methods""" 1025 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase) 1026 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods') 1027 1028 1029class CommonBufferedTests: 1030 # Tests common to BufferedReader, BufferedWriter and BufferedRandom 1031 1032 def test_detach(self): 1033 raw = self.MockRawIO() 1034 buf = self.tp(raw) 1035 self.assertIs(buf.detach(), raw) 1036 self.assertRaises(ValueError, buf.detach) 1037 1038 repr(buf) # Should still work 1039 1040 def test_fileno(self): 1041 rawio = self.MockRawIO() 1042 bufio = self.tp(rawio) 1043 1044 self.assertEqual(42, bufio.fileno()) 1045 1046 def test_invalid_args(self): 1047 rawio = self.MockRawIO() 1048 bufio = self.tp(rawio) 1049 # Invalid whence 1050 self.assertRaises(ValueError, bufio.seek, 0, -1) 1051 self.assertRaises(ValueError, bufio.seek, 0, 9) 1052 1053 def test_override_destructor(self): 1054 tp = self.tp 1055 record = [] 1056 class MyBufferedIO(tp): 1057 def __del__(self): 1058 record.append(1) 1059 try: 1060 f = super().__del__ 1061 except AttributeError: 1062 pass 1063 else: 1064 f() 1065 def close(self): 1066 record.append(2) 1067 super().close() 1068 def flush(self): 1069 record.append(3) 1070 super().flush() 1071 rawio = self.MockRawIO() 1072 bufio = MyBufferedIO(rawio) 1073 del bufio 1074 support.gc_collect() 1075 self.assertEqual(record, [1, 2, 3]) 1076 1077 def test_context_manager(self): 1078 # Test usability as a context manager 1079 rawio = self.MockRawIO() 1080 bufio = self.tp(rawio) 1081 def _with(): 1082 with bufio: 1083 pass 1084 _with() 1085 # bufio should now be closed, and using it a second time should raise 1086 # a ValueError. 1087 self.assertRaises(ValueError, _with) 1088 1089 def test_error_through_destructor(self): 1090 # Test that the exception state is not modified by a destructor, 1091 # even if close() fails. 1092 rawio = self.CloseFailureIO() 1093 def f(): 1094 self.tp(rawio).xyzzy 1095 with support.captured_output("stderr") as s: 1096 self.assertRaises(AttributeError, f) 1097 s = s.getvalue().strip() 1098 if s: 1099 # The destructor *may* have printed an unraisable error, check it 1100 self.assertEqual(len(s.splitlines()), 1) 1101 self.assertTrue(s.startswith("Exception OSError: "), s) 1102 self.assertTrue(s.endswith(" ignored"), s) 1103 1104 def test_repr(self): 1105 raw = self.MockRawIO() 1106 b = self.tp(raw) 1107 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__) 1108 self.assertEqual(repr(b), "<%s>" % clsname) 1109 raw.name = "dummy" 1110 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname) 1111 raw.name = b"dummy" 1112 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname) 1113 1114 def test_recursive_repr(self): 1115 # Issue #25455 1116 raw = self.MockRawIO() 1117 b = self.tp(raw) 1118 with support.swap_attr(raw, 'name', b): 1119 try: 1120 repr(b) # Should not crash 1121 except RuntimeError: 1122 pass 1123 1124 def test_flush_error_on_close(self): 1125 # Test that buffered file is closed despite failed flush 1126 # and that flush() is called before file closed. 1127 raw = self.MockRawIO() 1128 closed = [] 1129 def bad_flush(): 1130 closed[:] = [b.closed, raw.closed] 1131 raise OSError() 1132 raw.flush = bad_flush 1133 b = self.tp(raw) 1134 self.assertRaises(OSError, b.close) # exception not swallowed 1135 self.assertTrue(b.closed) 1136 self.assertTrue(raw.closed) 1137 self.assertTrue(closed) # flush() called 1138 self.assertFalse(closed[0]) # flush() called before file closed 1139 self.assertFalse(closed[1]) 1140 raw.flush = lambda: None # break reference loop 1141 1142 def test_close_error_on_close(self): 1143 raw = self.MockRawIO() 1144 def bad_flush(): 1145 raise OSError('flush') 1146 def bad_close(): 1147 raise OSError('close') 1148 raw.close = bad_close 1149 b = self.tp(raw) 1150 b.flush = bad_flush 1151 with self.assertRaises(OSError) as err: # exception not swallowed 1152 b.close() 1153 self.assertEqual(err.exception.args, ('close',)) 1154 self.assertIsInstance(err.exception.__context__, OSError) 1155 self.assertEqual(err.exception.__context__.args, ('flush',)) 1156 self.assertFalse(b.closed) 1157 1158 def test_nonnormalized_close_error_on_close(self): 1159 # Issue #21677 1160 raw = self.MockRawIO() 1161 def bad_flush(): 1162 raise non_existing_flush 1163 def bad_close(): 1164 raise non_existing_close 1165 raw.close = bad_close 1166 b = self.tp(raw) 1167 b.flush = bad_flush 1168 with self.assertRaises(NameError) as err: # exception not swallowed 1169 b.close() 1170 self.assertIn('non_existing_close', str(err.exception)) 1171 self.assertIsInstance(err.exception.__context__, NameError) 1172 self.assertIn('non_existing_flush', str(err.exception.__context__)) 1173 self.assertFalse(b.closed) 1174 1175 def test_multi_close(self): 1176 raw = self.MockRawIO() 1177 b = self.tp(raw) 1178 b.close() 1179 b.close() 1180 b.close() 1181 self.assertRaises(ValueError, b.flush) 1182 1183 def test_unseekable(self): 1184 bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) 1185 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1186 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1187 1188 def test_readonly_attributes(self): 1189 raw = self.MockRawIO() 1190 buf = self.tp(raw) 1191 x = self.MockRawIO() 1192 with self.assertRaises(AttributeError): 1193 buf.raw = x 1194 1195 1196class SizeofTest: 1197 1198 @support.cpython_only 1199 def test_sizeof(self): 1200 bufsize1 = 4096 1201 bufsize2 = 8192 1202 rawio = self.MockRawIO() 1203 bufio = self.tp(rawio, buffer_size=bufsize1) 1204 size = sys.getsizeof(bufio) - bufsize1 1205 rawio = self.MockRawIO() 1206 bufio = self.tp(rawio, buffer_size=bufsize2) 1207 self.assertEqual(sys.getsizeof(bufio), size + bufsize2) 1208 1209 @support.cpython_only 1210 def test_buffer_freeing(self) : 1211 bufsize = 4096 1212 rawio = self.MockRawIO() 1213 bufio = self.tp(rawio, buffer_size=bufsize) 1214 size = sys.getsizeof(bufio) - bufsize 1215 bufio.close() 1216 self.assertEqual(sys.getsizeof(bufio), size) 1217 1218class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): 1219 read_mode = "rb" 1220 1221 def test_constructor(self): 1222 rawio = self.MockRawIO([b"abc"]) 1223 bufio = self.tp(rawio) 1224 bufio.__init__(rawio) 1225 bufio.__init__(rawio, buffer_size=1024) 1226 bufio.__init__(rawio, buffer_size=16) 1227 self.assertEqual(b"abc", bufio.read()) 1228 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1229 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1230 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1231 rawio = self.MockRawIO([b"abc"]) 1232 bufio.__init__(rawio) 1233 self.assertEqual(b"abc", bufio.read()) 1234 1235 def test_uninitialized(self): 1236 bufio = self.tp.__new__(self.tp) 1237 del bufio 1238 bufio = self.tp.__new__(self.tp) 1239 self.assertRaisesRegex((ValueError, AttributeError), 1240 'uninitialized|has no attribute', 1241 bufio.read, 0) 1242 bufio.__init__(self.MockRawIO()) 1243 self.assertEqual(bufio.read(0), b'') 1244 1245 def test_read(self): 1246 for arg in (None, 7): 1247 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1248 bufio = self.tp(rawio) 1249 self.assertEqual(b"abcdefg", bufio.read(arg)) 1250 # Invalid args 1251 self.assertRaises(ValueError, bufio.read, -2) 1252 1253 def test_read1(self): 1254 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1255 bufio = self.tp(rawio) 1256 self.assertEqual(b"a", bufio.read(1)) 1257 self.assertEqual(b"b", bufio.read1(1)) 1258 self.assertEqual(rawio._reads, 1) 1259 self.assertEqual(b"", bufio.read1(0)) 1260 self.assertEqual(b"c", bufio.read1(100)) 1261 self.assertEqual(rawio._reads, 1) 1262 self.assertEqual(b"d", bufio.read1(100)) 1263 self.assertEqual(rawio._reads, 2) 1264 self.assertEqual(b"efg", bufio.read1(100)) 1265 self.assertEqual(rawio._reads, 3) 1266 self.assertEqual(b"", bufio.read1(100)) 1267 self.assertEqual(rawio._reads, 4) 1268 1269 def test_read1_arbitrary(self): 1270 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1271 bufio = self.tp(rawio) 1272 self.assertEqual(b"a", bufio.read(1)) 1273 self.assertEqual(b"bc", bufio.read1()) 1274 self.assertEqual(b"d", bufio.read1()) 1275 self.assertEqual(b"efg", bufio.read1(-1)) 1276 self.assertEqual(rawio._reads, 3) 1277 self.assertEqual(b"", bufio.read1()) 1278 self.assertEqual(rawio._reads, 4) 1279 1280 def test_readinto(self): 1281 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1282 bufio = self.tp(rawio) 1283 b = bytearray(2) 1284 self.assertEqual(bufio.readinto(b), 2) 1285 self.assertEqual(b, b"ab") 1286 self.assertEqual(bufio.readinto(b), 2) 1287 self.assertEqual(b, b"cd") 1288 self.assertEqual(bufio.readinto(b), 2) 1289 self.assertEqual(b, b"ef") 1290 self.assertEqual(bufio.readinto(b), 1) 1291 self.assertEqual(b, b"gf") 1292 self.assertEqual(bufio.readinto(b), 0) 1293 self.assertEqual(b, b"gf") 1294 rawio = self.MockRawIO((b"abc", None)) 1295 bufio = self.tp(rawio) 1296 self.assertEqual(bufio.readinto(b), 2) 1297 self.assertEqual(b, b"ab") 1298 self.assertEqual(bufio.readinto(b), 1) 1299 self.assertEqual(b, b"cb") 1300 1301 def test_readinto1(self): 1302 buffer_size = 10 1303 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl")) 1304 bufio = self.tp(rawio, buffer_size=buffer_size) 1305 b = bytearray(2) 1306 self.assertEqual(bufio.peek(3), b'abc') 1307 self.assertEqual(rawio._reads, 1) 1308 self.assertEqual(bufio.readinto1(b), 2) 1309 self.assertEqual(b, b"ab") 1310 self.assertEqual(rawio._reads, 1) 1311 self.assertEqual(bufio.readinto1(b), 1) 1312 self.assertEqual(b[:1], b"c") 1313 self.assertEqual(rawio._reads, 1) 1314 self.assertEqual(bufio.readinto1(b), 2) 1315 self.assertEqual(b, b"de") 1316 self.assertEqual(rawio._reads, 2) 1317 b = bytearray(2*buffer_size) 1318 self.assertEqual(bufio.peek(3), b'fgh') 1319 self.assertEqual(rawio._reads, 3) 1320 self.assertEqual(bufio.readinto1(b), 6) 1321 self.assertEqual(b[:6], b"fghjkl") 1322 self.assertEqual(rawio._reads, 4) 1323 1324 def test_readinto_array(self): 1325 buffer_size = 60 1326 data = b"a" * 26 1327 rawio = self.MockRawIO((data,)) 1328 bufio = self.tp(rawio, buffer_size=buffer_size) 1329 1330 # Create an array with element size > 1 byte 1331 b = array.array('i', b'x' * 32) 1332 assert len(b) != 16 1333 1334 # Read into it. We should get as many *bytes* as we can fit into b 1335 # (which is more than the number of elements) 1336 n = bufio.readinto(b) 1337 self.assertGreater(n, len(b)) 1338 1339 # Check that old contents of b are preserved 1340 bm = memoryview(b).cast('B') 1341 self.assertLess(n, len(bm)) 1342 self.assertEqual(bm[:n], data[:n]) 1343 self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) 1344 1345 def test_readinto1_array(self): 1346 buffer_size = 60 1347 data = b"a" * 26 1348 rawio = self.MockRawIO((data,)) 1349 bufio = self.tp(rawio, buffer_size=buffer_size) 1350 1351 # Create an array with element size > 1 byte 1352 b = array.array('i', b'x' * 32) 1353 assert len(b) != 16 1354 1355 # Read into it. We should get as many *bytes* as we can fit into b 1356 # (which is more than the number of elements) 1357 n = bufio.readinto1(b) 1358 self.assertGreater(n, len(b)) 1359 1360 # Check that old contents of b are preserved 1361 bm = memoryview(b).cast('B') 1362 self.assertLess(n, len(bm)) 1363 self.assertEqual(bm[:n], data[:n]) 1364 self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) 1365 1366 def test_readlines(self): 1367 def bufio(): 1368 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) 1369 return self.tp(rawio) 1370 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"]) 1371 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"]) 1372 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"]) 1373 1374 def test_buffering(self): 1375 data = b"abcdefghi" 1376 dlen = len(data) 1377 1378 tests = [ 1379 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ], 1380 [ 100, [ 3, 3, 3], [ dlen ] ], 1381 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ], 1382 ] 1383 1384 for bufsize, buf_read_sizes, raw_read_sizes in tests: 1385 rawio = self.MockFileIO(data) 1386 bufio = self.tp(rawio, buffer_size=bufsize) 1387 pos = 0 1388 for nbytes in buf_read_sizes: 1389 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes]) 1390 pos += nbytes 1391 # this is mildly implementation-dependent 1392 self.assertEqual(rawio.read_history, raw_read_sizes) 1393 1394 def test_read_non_blocking(self): 1395 # Inject some None's in there to simulate EWOULDBLOCK 1396 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None)) 1397 bufio = self.tp(rawio) 1398 self.assertEqual(b"abcd", bufio.read(6)) 1399 self.assertEqual(b"e", bufio.read(1)) 1400 self.assertEqual(b"fg", bufio.read()) 1401 self.assertEqual(b"", bufio.peek(1)) 1402 self.assertIsNone(bufio.read()) 1403 self.assertEqual(b"", bufio.read()) 1404 1405 rawio = self.MockRawIO((b"a", None, None)) 1406 self.assertEqual(b"a", rawio.readall()) 1407 self.assertIsNone(rawio.readall()) 1408 1409 def test_read_past_eof(self): 1410 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1411 bufio = self.tp(rawio) 1412 1413 self.assertEqual(b"abcdefg", bufio.read(9000)) 1414 1415 def test_read_all(self): 1416 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1417 bufio = self.tp(rawio) 1418 1419 self.assertEqual(b"abcdefg", bufio.read()) 1420 1421 @support.requires_resource('cpu') 1422 def test_threads(self): 1423 try: 1424 # Write out many bytes with exactly the same number of 0's, 1425 # 1's... 255's. This will help us check that concurrent reading 1426 # doesn't duplicate or forget contents. 1427 N = 1000 1428 l = list(range(256)) * N 1429 random.shuffle(l) 1430 s = bytes(bytearray(l)) 1431 with self.open(support.TESTFN, "wb") as f: 1432 f.write(s) 1433 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw: 1434 bufio = self.tp(raw, 8) 1435 errors = [] 1436 results = [] 1437 def f(): 1438 try: 1439 # Intra-buffer read then buffer-flushing read 1440 for n in cycle([1, 19]): 1441 s = bufio.read(n) 1442 if not s: 1443 break 1444 # list.append() is atomic 1445 results.append(s) 1446 except Exception as e: 1447 errors.append(e) 1448 raise 1449 threads = [threading.Thread(target=f) for x in range(20)] 1450 with support.start_threads(threads): 1451 time.sleep(0.02) # yield 1452 self.assertFalse(errors, 1453 "the following exceptions were caught: %r" % errors) 1454 s = b''.join(results) 1455 for i in range(256): 1456 c = bytes(bytearray([i])) 1457 self.assertEqual(s.count(c), N) 1458 finally: 1459 support.unlink(support.TESTFN) 1460 1461 def test_unseekable(self): 1462 bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) 1463 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1464 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1465 bufio.read(1) 1466 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1467 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1468 1469 def test_misbehaved_io(self): 1470 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1471 bufio = self.tp(rawio) 1472 self.assertRaises(OSError, bufio.seek, 0) 1473 self.assertRaises(OSError, bufio.tell) 1474 1475 def test_no_extraneous_read(self): 1476 # Issue #9550; when the raw IO object has satisfied the read request, 1477 # we should not issue any additional reads, otherwise it may block 1478 # (e.g. socket). 1479 bufsize = 16 1480 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2): 1481 rawio = self.MockRawIO([b"x" * n]) 1482 bufio = self.tp(rawio, bufsize) 1483 self.assertEqual(bufio.read(n), b"x" * n) 1484 # Simple case: one raw read is enough to satisfy the request. 1485 self.assertEqual(rawio._extraneous_reads, 0, 1486 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1487 # A more complex case where two raw reads are needed to satisfy 1488 # the request. 1489 rawio = self.MockRawIO([b"x" * (n - 1), b"x"]) 1490 bufio = self.tp(rawio, bufsize) 1491 self.assertEqual(bufio.read(n), b"x" * n) 1492 self.assertEqual(rawio._extraneous_reads, 0, 1493 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1494 1495 def test_read_on_closed(self): 1496 # Issue #23796 1497 b = io.BufferedReader(io.BytesIO(b"12")) 1498 b.read(1) 1499 b.close() 1500 self.assertRaises(ValueError, b.peek) 1501 self.assertRaises(ValueError, b.read1, 1) 1502 1503 1504class CBufferedReaderTest(BufferedReaderTest, SizeofTest): 1505 tp = io.BufferedReader 1506 1507 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing " 1508 "instead of returning NULL for malloc failure.") 1509 def test_constructor(self): 1510 BufferedReaderTest.test_constructor(self) 1511 # The allocation can succeed on 32-bit builds, e.g. with more 1512 # than 2 GiB RAM and a 64-bit kernel. 1513 if sys.maxsize > 0x7FFFFFFF: 1514 rawio = self.MockRawIO() 1515 bufio = self.tp(rawio) 1516 self.assertRaises((OverflowError, MemoryError, ValueError), 1517 bufio.__init__, rawio, sys.maxsize) 1518 1519 def test_initialization(self): 1520 rawio = self.MockRawIO([b"abc"]) 1521 bufio = self.tp(rawio) 1522 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1523 self.assertRaises(ValueError, bufio.read) 1524 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1525 self.assertRaises(ValueError, bufio.read) 1526 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1527 self.assertRaises(ValueError, bufio.read) 1528 1529 def test_misbehaved_io_read(self): 1530 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1531 bufio = self.tp(rawio) 1532 # _pyio.BufferedReader seems to implement reading different, so that 1533 # checking this is not so easy. 1534 self.assertRaises(OSError, bufio.read, 10) 1535 1536 def test_garbage_collection(self): 1537 # C BufferedReader objects are collected. 1538 # The Python version has __del__, so it ends into gc.garbage instead 1539 self.addCleanup(support.unlink, support.TESTFN) 1540 with support.check_warnings(('', ResourceWarning)): 1541 rawio = self.FileIO(support.TESTFN, "w+b") 1542 f = self.tp(rawio) 1543 f.f = f 1544 wr = weakref.ref(f) 1545 del f 1546 support.gc_collect() 1547 self.assertIsNone(wr(), wr) 1548 1549 def test_args_error(self): 1550 # Issue #17275 1551 with self.assertRaisesRegex(TypeError, "BufferedReader"): 1552 self.tp(io.BytesIO(), 1024, 1024, 1024) 1553 1554 1555class PyBufferedReaderTest(BufferedReaderTest): 1556 tp = pyio.BufferedReader 1557 1558 1559class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): 1560 write_mode = "wb" 1561 1562 def test_constructor(self): 1563 rawio = self.MockRawIO() 1564 bufio = self.tp(rawio) 1565 bufio.__init__(rawio) 1566 bufio.__init__(rawio, buffer_size=1024) 1567 bufio.__init__(rawio, buffer_size=16) 1568 self.assertEqual(3, bufio.write(b"abc")) 1569 bufio.flush() 1570 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1571 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1572 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1573 bufio.__init__(rawio) 1574 self.assertEqual(3, bufio.write(b"ghi")) 1575 bufio.flush() 1576 self.assertEqual(b"".join(rawio._write_stack), b"abcghi") 1577 1578 def test_uninitialized(self): 1579 bufio = self.tp.__new__(self.tp) 1580 del bufio 1581 bufio = self.tp.__new__(self.tp) 1582 self.assertRaisesRegex((ValueError, AttributeError), 1583 'uninitialized|has no attribute', 1584 bufio.write, b'') 1585 bufio.__init__(self.MockRawIO()) 1586 self.assertEqual(bufio.write(b''), 0) 1587 1588 def test_detach_flush(self): 1589 raw = self.MockRawIO() 1590 buf = self.tp(raw) 1591 buf.write(b"howdy!") 1592 self.assertFalse(raw._write_stack) 1593 buf.detach() 1594 self.assertEqual(raw._write_stack, [b"howdy!"]) 1595 1596 def test_write(self): 1597 # Write to the buffered IO but don't overflow the buffer. 1598 writer = self.MockRawIO() 1599 bufio = self.tp(writer, 8) 1600 bufio.write(b"abc") 1601 self.assertFalse(writer._write_stack) 1602 buffer = bytearray(b"def") 1603 bufio.write(buffer) 1604 buffer[:] = b"***" # Overwrite our copy of the data 1605 bufio.flush() 1606 self.assertEqual(b"".join(writer._write_stack), b"abcdef") 1607 1608 def test_write_overflow(self): 1609 writer = self.MockRawIO() 1610 bufio = self.tp(writer, 8) 1611 contents = b"abcdefghijklmnop" 1612 for n in range(0, len(contents), 3): 1613 bufio.write(contents[n:n+3]) 1614 flushed = b"".join(writer._write_stack) 1615 # At least (total - 8) bytes were implicitly flushed, perhaps more 1616 # depending on the implementation. 1617 self.assertTrue(flushed.startswith(contents[:-8]), flushed) 1618 1619 def check_writes(self, intermediate_func): 1620 # Lots of writes, test the flushed output is as expected. 1621 contents = bytes(range(256)) * 1000 1622 n = 0 1623 writer = self.MockRawIO() 1624 bufio = self.tp(writer, 13) 1625 # Generator of write sizes: repeat each N 15 times then proceed to N+1 1626 def gen_sizes(): 1627 for size in count(1): 1628 for i in range(15): 1629 yield size 1630 sizes = gen_sizes() 1631 while n < len(contents): 1632 size = min(next(sizes), len(contents) - n) 1633 self.assertEqual(bufio.write(contents[n:n+size]), size) 1634 intermediate_func(bufio) 1635 n += size 1636 bufio.flush() 1637 self.assertEqual(contents, b"".join(writer._write_stack)) 1638 1639 def test_writes(self): 1640 self.check_writes(lambda bufio: None) 1641 1642 def test_writes_and_flushes(self): 1643 self.check_writes(lambda bufio: bufio.flush()) 1644 1645 def test_writes_and_seeks(self): 1646 def _seekabs(bufio): 1647 pos = bufio.tell() 1648 bufio.seek(pos + 1, 0) 1649 bufio.seek(pos - 1, 0) 1650 bufio.seek(pos, 0) 1651 self.check_writes(_seekabs) 1652 def _seekrel(bufio): 1653 pos = bufio.seek(0, 1) 1654 bufio.seek(+1, 1) 1655 bufio.seek(-1, 1) 1656 bufio.seek(pos, 0) 1657 self.check_writes(_seekrel) 1658 1659 def test_writes_and_truncates(self): 1660 self.check_writes(lambda bufio: bufio.truncate(bufio.tell())) 1661 1662 def test_write_non_blocking(self): 1663 raw = self.MockNonBlockWriterIO() 1664 bufio = self.tp(raw, 8) 1665 1666 self.assertEqual(bufio.write(b"abcd"), 4) 1667 self.assertEqual(bufio.write(b"efghi"), 5) 1668 # 1 byte will be written, the rest will be buffered 1669 raw.block_on(b"k") 1670 self.assertEqual(bufio.write(b"jklmn"), 5) 1671 1672 # 8 bytes will be written, 8 will be buffered and the rest will be lost 1673 raw.block_on(b"0") 1674 try: 1675 bufio.write(b"opqrwxyz0123456789") 1676 except self.BlockingIOError as e: 1677 written = e.characters_written 1678 else: 1679 self.fail("BlockingIOError should have been raised") 1680 self.assertEqual(written, 16) 1681 self.assertEqual(raw.pop_written(), 1682 b"abcdefghijklmnopqrwxyz") 1683 1684 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9) 1685 s = raw.pop_written() 1686 # Previously buffered bytes were flushed 1687 self.assertTrue(s.startswith(b"01234567A"), s) 1688 1689 def test_write_and_rewind(self): 1690 raw = io.BytesIO() 1691 bufio = self.tp(raw, 4) 1692 self.assertEqual(bufio.write(b"abcdef"), 6) 1693 self.assertEqual(bufio.tell(), 6) 1694 bufio.seek(0, 0) 1695 self.assertEqual(bufio.write(b"XY"), 2) 1696 bufio.seek(6, 0) 1697 self.assertEqual(raw.getvalue(), b"XYcdef") 1698 self.assertEqual(bufio.write(b"123456"), 6) 1699 bufio.flush() 1700 self.assertEqual(raw.getvalue(), b"XYcdef123456") 1701 1702 def test_flush(self): 1703 writer = self.MockRawIO() 1704 bufio = self.tp(writer, 8) 1705 bufio.write(b"abc") 1706 bufio.flush() 1707 self.assertEqual(b"abc", writer._write_stack[0]) 1708 1709 def test_writelines(self): 1710 l = [b'ab', b'cd', b'ef'] 1711 writer = self.MockRawIO() 1712 bufio = self.tp(writer, 8) 1713 bufio.writelines(l) 1714 bufio.flush() 1715 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1716 1717 def test_writelines_userlist(self): 1718 l = UserList([b'ab', b'cd', b'ef']) 1719 writer = self.MockRawIO() 1720 bufio = self.tp(writer, 8) 1721 bufio.writelines(l) 1722 bufio.flush() 1723 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1724 1725 def test_writelines_error(self): 1726 writer = self.MockRawIO() 1727 bufio = self.tp(writer, 8) 1728 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3]) 1729 self.assertRaises(TypeError, bufio.writelines, None) 1730 self.assertRaises(TypeError, bufio.writelines, 'abc') 1731 1732 def test_destructor(self): 1733 writer = self.MockRawIO() 1734 bufio = self.tp(writer, 8) 1735 bufio.write(b"abc") 1736 del bufio 1737 support.gc_collect() 1738 self.assertEqual(b"abc", writer._write_stack[0]) 1739 1740 def test_truncate(self): 1741 # Truncate implicitly flushes the buffer. 1742 self.addCleanup(support.unlink, support.TESTFN) 1743 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1744 bufio = self.tp(raw, 8) 1745 bufio.write(b"abcdef") 1746 self.assertEqual(bufio.truncate(3), 3) 1747 self.assertEqual(bufio.tell(), 6) 1748 with self.open(support.TESTFN, "rb", buffering=0) as f: 1749 self.assertEqual(f.read(), b"abc") 1750 1751 def test_truncate_after_write(self): 1752 # Ensure that truncate preserves the file position after 1753 # writes longer than the buffer size. 1754 # Issue: https://bugs.python.org/issue32228 1755 self.addCleanup(support.unlink, support.TESTFN) 1756 with self.open(support.TESTFN, "wb") as f: 1757 # Fill with some buffer 1758 f.write(b'\x00' * 10000) 1759 buffer_sizes = [8192, 4096, 200] 1760 for buffer_size in buffer_sizes: 1761 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f: 1762 f.write(b'\x00' * (buffer_size + 1)) 1763 # After write write_pos and write_end are set to 0 1764 f.read(1) 1765 # read operation makes sure that pos != raw_pos 1766 f.truncate() 1767 self.assertEqual(f.tell(), buffer_size + 2) 1768 1769 @support.requires_resource('cpu') 1770 def test_threads(self): 1771 try: 1772 # Write out many bytes from many threads and test they were 1773 # all flushed. 1774 N = 1000 1775 contents = bytes(range(256)) * N 1776 sizes = cycle([1, 19]) 1777 n = 0 1778 queue = deque() 1779 while n < len(contents): 1780 size = next(sizes) 1781 queue.append(contents[n:n+size]) 1782 n += size 1783 del contents 1784 # We use a real file object because it allows us to 1785 # exercise situations where the GIL is released before 1786 # writing the buffer to the raw streams. This is in addition 1787 # to concurrency issues due to switching threads in the middle 1788 # of Python code. 1789 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1790 bufio = self.tp(raw, 8) 1791 errors = [] 1792 def f(): 1793 try: 1794 while True: 1795 try: 1796 s = queue.popleft() 1797 except IndexError: 1798 return 1799 bufio.write(s) 1800 except Exception as e: 1801 errors.append(e) 1802 raise 1803 threads = [threading.Thread(target=f) for x in range(20)] 1804 with support.start_threads(threads): 1805 time.sleep(0.02) # yield 1806 self.assertFalse(errors, 1807 "the following exceptions were caught: %r" % errors) 1808 bufio.close() 1809 with self.open(support.TESTFN, "rb") as f: 1810 s = f.read() 1811 for i in range(256): 1812 self.assertEqual(s.count(bytes([i])), N) 1813 finally: 1814 support.unlink(support.TESTFN) 1815 1816 def test_misbehaved_io(self): 1817 rawio = self.MisbehavedRawIO() 1818 bufio = self.tp(rawio, 5) 1819 self.assertRaises(OSError, bufio.seek, 0) 1820 self.assertRaises(OSError, bufio.tell) 1821 self.assertRaises(OSError, bufio.write, b"abcdef") 1822 1823 def test_max_buffer_size_removal(self): 1824 with self.assertRaises(TypeError): 1825 self.tp(self.MockRawIO(), 8, 12) 1826 1827 def test_write_error_on_close(self): 1828 raw = self.MockRawIO() 1829 def bad_write(b): 1830 raise OSError() 1831 raw.write = bad_write 1832 b = self.tp(raw) 1833 b.write(b'spam') 1834 self.assertRaises(OSError, b.close) # exception not swallowed 1835 self.assertTrue(b.closed) 1836 1837 def test_slow_close_from_thread(self): 1838 # Issue #31976 1839 rawio = self.SlowFlushRawIO() 1840 bufio = self.tp(rawio, 8) 1841 t = threading.Thread(target=bufio.close) 1842 t.start() 1843 rawio.in_flush.wait() 1844 self.assertRaises(ValueError, bufio.write, b'spam') 1845 self.assertTrue(bufio.closed) 1846 t.join() 1847 1848 1849 1850class CBufferedWriterTest(BufferedWriterTest, SizeofTest): 1851 tp = io.BufferedWriter 1852 1853 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing " 1854 "instead of returning NULL for malloc failure.") 1855 def test_constructor(self): 1856 BufferedWriterTest.test_constructor(self) 1857 # The allocation can succeed on 32-bit builds, e.g. with more 1858 # than 2 GiB RAM and a 64-bit kernel. 1859 if sys.maxsize > 0x7FFFFFFF: 1860 rawio = self.MockRawIO() 1861 bufio = self.tp(rawio) 1862 self.assertRaises((OverflowError, MemoryError, ValueError), 1863 bufio.__init__, rawio, sys.maxsize) 1864 1865 def test_initialization(self): 1866 rawio = self.MockRawIO() 1867 bufio = self.tp(rawio) 1868 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1869 self.assertRaises(ValueError, bufio.write, b"def") 1870 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1871 self.assertRaises(ValueError, bufio.write, b"def") 1872 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1873 self.assertRaises(ValueError, bufio.write, b"def") 1874 1875 def test_garbage_collection(self): 1876 # C BufferedWriter objects are collected, and collecting them flushes 1877 # all data to disk. 1878 # The Python version has __del__, so it ends into gc.garbage instead 1879 self.addCleanup(support.unlink, support.TESTFN) 1880 with support.check_warnings(('', ResourceWarning)): 1881 rawio = self.FileIO(support.TESTFN, "w+b") 1882 f = self.tp(rawio) 1883 f.write(b"123xxx") 1884 f.x = f 1885 wr = weakref.ref(f) 1886 del f 1887 support.gc_collect() 1888 self.assertIsNone(wr(), wr) 1889 with self.open(support.TESTFN, "rb") as f: 1890 self.assertEqual(f.read(), b"123xxx") 1891 1892 def test_args_error(self): 1893 # Issue #17275 1894 with self.assertRaisesRegex(TypeError, "BufferedWriter"): 1895 self.tp(io.BytesIO(), 1024, 1024, 1024) 1896 1897 1898class PyBufferedWriterTest(BufferedWriterTest): 1899 tp = pyio.BufferedWriter 1900 1901class BufferedRWPairTest(unittest.TestCase): 1902 1903 def test_constructor(self): 1904 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1905 self.assertFalse(pair.closed) 1906 1907 def test_uninitialized(self): 1908 pair = self.tp.__new__(self.tp) 1909 del pair 1910 pair = self.tp.__new__(self.tp) 1911 self.assertRaisesRegex((ValueError, AttributeError), 1912 'uninitialized|has no attribute', 1913 pair.read, 0) 1914 self.assertRaisesRegex((ValueError, AttributeError), 1915 'uninitialized|has no attribute', 1916 pair.write, b'') 1917 pair.__init__(self.MockRawIO(), self.MockRawIO()) 1918 self.assertEqual(pair.read(0), b'') 1919 self.assertEqual(pair.write(b''), 0) 1920 1921 def test_detach(self): 1922 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1923 self.assertRaises(self.UnsupportedOperation, pair.detach) 1924 1925 def test_constructor_max_buffer_size_removal(self): 1926 with self.assertRaises(TypeError): 1927 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) 1928 1929 def test_constructor_with_not_readable(self): 1930 class NotReadable(MockRawIO): 1931 def readable(self): 1932 return False 1933 1934 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO()) 1935 1936 def test_constructor_with_not_writeable(self): 1937 class NotWriteable(MockRawIO): 1938 def writable(self): 1939 return False 1940 1941 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable()) 1942 1943 def test_read(self): 1944 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1945 1946 self.assertEqual(pair.read(3), b"abc") 1947 self.assertEqual(pair.read(1), b"d") 1948 self.assertEqual(pair.read(), b"ef") 1949 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO()) 1950 self.assertEqual(pair.read(None), b"abc") 1951 1952 def test_readlines(self): 1953 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO()) 1954 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1955 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1956 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"]) 1957 1958 def test_read1(self): 1959 # .read1() is delegated to the underlying reader object, so this test 1960 # can be shallow. 1961 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1962 1963 self.assertEqual(pair.read1(3), b"abc") 1964 self.assertEqual(pair.read1(), b"def") 1965 1966 def test_readinto(self): 1967 for method in ("readinto", "readinto1"): 1968 with self.subTest(method): 1969 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1970 1971 data = byteslike(b'\0' * 5) 1972 self.assertEqual(getattr(pair, method)(data), 5) 1973 self.assertEqual(bytes(data), b"abcde") 1974 1975 def test_write(self): 1976 w = self.MockRawIO() 1977 pair = self.tp(self.MockRawIO(), w) 1978 1979 pair.write(b"abc") 1980 pair.flush() 1981 buffer = bytearray(b"def") 1982 pair.write(buffer) 1983 buffer[:] = b"***" # Overwrite our copy of the data 1984 pair.flush() 1985 self.assertEqual(w._write_stack, [b"abc", b"def"]) 1986 1987 def test_peek(self): 1988 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1989 1990 self.assertTrue(pair.peek(3).startswith(b"abc")) 1991 self.assertEqual(pair.read(3), b"abc") 1992 1993 def test_readable(self): 1994 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1995 self.assertTrue(pair.readable()) 1996 1997 def test_writeable(self): 1998 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1999 self.assertTrue(pair.writable()) 2000 2001 def test_seekable(self): 2002 # BufferedRWPairs are never seekable, even if their readers and writers 2003 # are. 2004 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2005 self.assertFalse(pair.seekable()) 2006 2007 # .flush() is delegated to the underlying writer object and has been 2008 # tested in the test_write method. 2009 2010 def test_close_and_closed(self): 2011 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2012 self.assertFalse(pair.closed) 2013 pair.close() 2014 self.assertTrue(pair.closed) 2015 2016 def test_reader_close_error_on_close(self): 2017 def reader_close(): 2018 reader_non_existing 2019 reader = self.MockRawIO() 2020 reader.close = reader_close 2021 writer = self.MockRawIO() 2022 pair = self.tp(reader, writer) 2023 with self.assertRaises(NameError) as err: 2024 pair.close() 2025 self.assertIn('reader_non_existing', str(err.exception)) 2026 self.assertTrue(pair.closed) 2027 self.assertFalse(reader.closed) 2028 self.assertTrue(writer.closed) 2029 2030 def test_writer_close_error_on_close(self): 2031 def writer_close(): 2032 writer_non_existing 2033 reader = self.MockRawIO() 2034 writer = self.MockRawIO() 2035 writer.close = writer_close 2036 pair = self.tp(reader, writer) 2037 with self.assertRaises(NameError) as err: 2038 pair.close() 2039 self.assertIn('writer_non_existing', str(err.exception)) 2040 self.assertFalse(pair.closed) 2041 self.assertTrue(reader.closed) 2042 self.assertFalse(writer.closed) 2043 2044 def test_reader_writer_close_error_on_close(self): 2045 def reader_close(): 2046 reader_non_existing 2047 def writer_close(): 2048 writer_non_existing 2049 reader = self.MockRawIO() 2050 reader.close = reader_close 2051 writer = self.MockRawIO() 2052 writer.close = writer_close 2053 pair = self.tp(reader, writer) 2054 with self.assertRaises(NameError) as err: 2055 pair.close() 2056 self.assertIn('reader_non_existing', str(err.exception)) 2057 self.assertIsInstance(err.exception.__context__, NameError) 2058 self.assertIn('writer_non_existing', str(err.exception.__context__)) 2059 self.assertFalse(pair.closed) 2060 self.assertFalse(reader.closed) 2061 self.assertFalse(writer.closed) 2062 2063 def test_isatty(self): 2064 class SelectableIsAtty(MockRawIO): 2065 def __init__(self, isatty): 2066 MockRawIO.__init__(self) 2067 self._isatty = isatty 2068 2069 def isatty(self): 2070 return self._isatty 2071 2072 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False)) 2073 self.assertFalse(pair.isatty()) 2074 2075 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False)) 2076 self.assertTrue(pair.isatty()) 2077 2078 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True)) 2079 self.assertTrue(pair.isatty()) 2080 2081 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True)) 2082 self.assertTrue(pair.isatty()) 2083 2084 def test_weakref_clearing(self): 2085 brw = self.tp(self.MockRawIO(), self.MockRawIO()) 2086 ref = weakref.ref(brw) 2087 brw = None 2088 ref = None # Shouldn't segfault. 2089 2090class CBufferedRWPairTest(BufferedRWPairTest): 2091 tp = io.BufferedRWPair 2092 2093class PyBufferedRWPairTest(BufferedRWPairTest): 2094 tp = pyio.BufferedRWPair 2095 2096 2097class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): 2098 read_mode = "rb+" 2099 write_mode = "wb+" 2100 2101 def test_constructor(self): 2102 BufferedReaderTest.test_constructor(self) 2103 BufferedWriterTest.test_constructor(self) 2104 2105 def test_uninitialized(self): 2106 BufferedReaderTest.test_uninitialized(self) 2107 BufferedWriterTest.test_uninitialized(self) 2108 2109 def test_read_and_write(self): 2110 raw = self.MockRawIO((b"asdf", b"ghjk")) 2111 rw = self.tp(raw, 8) 2112 2113 self.assertEqual(b"as", rw.read(2)) 2114 rw.write(b"ddd") 2115 rw.write(b"eee") 2116 self.assertFalse(raw._write_stack) # Buffer writes 2117 self.assertEqual(b"ghjk", rw.read()) 2118 self.assertEqual(b"dddeee", raw._write_stack[0]) 2119 2120 def test_seek_and_tell(self): 2121 raw = self.BytesIO(b"asdfghjkl") 2122 rw = self.tp(raw) 2123 2124 self.assertEqual(b"as", rw.read(2)) 2125 self.assertEqual(2, rw.tell()) 2126 rw.seek(0, 0) 2127 self.assertEqual(b"asdf", rw.read(4)) 2128 2129 rw.write(b"123f") 2130 rw.seek(0, 0) 2131 self.assertEqual(b"asdf123fl", rw.read()) 2132 self.assertEqual(9, rw.tell()) 2133 rw.seek(-4, 2) 2134 self.assertEqual(5, rw.tell()) 2135 rw.seek(2, 1) 2136 self.assertEqual(7, rw.tell()) 2137 self.assertEqual(b"fl", rw.read(11)) 2138 rw.flush() 2139 self.assertEqual(b"asdf123fl", raw.getvalue()) 2140 2141 self.assertRaises(TypeError, rw.seek, 0.0) 2142 2143 def check_flush_and_read(self, read_func): 2144 raw = self.BytesIO(b"abcdefghi") 2145 bufio = self.tp(raw) 2146 2147 self.assertEqual(b"ab", read_func(bufio, 2)) 2148 bufio.write(b"12") 2149 self.assertEqual(b"ef", read_func(bufio, 2)) 2150 self.assertEqual(6, bufio.tell()) 2151 bufio.flush() 2152 self.assertEqual(6, bufio.tell()) 2153 self.assertEqual(b"ghi", read_func(bufio)) 2154 raw.seek(0, 0) 2155 raw.write(b"XYZ") 2156 # flush() resets the read buffer 2157 bufio.flush() 2158 bufio.seek(0, 0) 2159 self.assertEqual(b"XYZ", read_func(bufio, 3)) 2160 2161 def test_flush_and_read(self): 2162 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args)) 2163 2164 def test_flush_and_readinto(self): 2165 def _readinto(bufio, n=-1): 2166 b = bytearray(n if n >= 0 else 9999) 2167 n = bufio.readinto(b) 2168 return bytes(b[:n]) 2169 self.check_flush_and_read(_readinto) 2170 2171 def test_flush_and_peek(self): 2172 def _peek(bufio, n=-1): 2173 # This relies on the fact that the buffer can contain the whole 2174 # raw stream, otherwise peek() can return less. 2175 b = bufio.peek(n) 2176 if n != -1: 2177 b = b[:n] 2178 bufio.seek(len(b), 1) 2179 return b 2180 self.check_flush_and_read(_peek) 2181 2182 def test_flush_and_write(self): 2183 raw = self.BytesIO(b"abcdefghi") 2184 bufio = self.tp(raw) 2185 2186 bufio.write(b"123") 2187 bufio.flush() 2188 bufio.write(b"45") 2189 bufio.flush() 2190 bufio.seek(0, 0) 2191 self.assertEqual(b"12345fghi", raw.getvalue()) 2192 self.assertEqual(b"12345fghi", bufio.read()) 2193 2194 def test_threads(self): 2195 BufferedReaderTest.test_threads(self) 2196 BufferedWriterTest.test_threads(self) 2197 2198 def test_writes_and_peek(self): 2199 def _peek(bufio): 2200 bufio.peek(1) 2201 self.check_writes(_peek) 2202 def _peek(bufio): 2203 pos = bufio.tell() 2204 bufio.seek(-1, 1) 2205 bufio.peek(1) 2206 bufio.seek(pos, 0) 2207 self.check_writes(_peek) 2208 2209 def test_writes_and_reads(self): 2210 def _read(bufio): 2211 bufio.seek(-1, 1) 2212 bufio.read(1) 2213 self.check_writes(_read) 2214 2215 def test_writes_and_read1s(self): 2216 def _read1(bufio): 2217 bufio.seek(-1, 1) 2218 bufio.read1(1) 2219 self.check_writes(_read1) 2220 2221 def test_writes_and_readintos(self): 2222 def _read(bufio): 2223 bufio.seek(-1, 1) 2224 bufio.readinto(bytearray(1)) 2225 self.check_writes(_read) 2226 2227 def test_write_after_readahead(self): 2228 # Issue #6629: writing after the buffer was filled by readahead should 2229 # first rewind the raw stream. 2230 for overwrite_size in [1, 5]: 2231 raw = self.BytesIO(b"A" * 10) 2232 bufio = self.tp(raw, 4) 2233 # Trigger readahead 2234 self.assertEqual(bufio.read(1), b"A") 2235 self.assertEqual(bufio.tell(), 1) 2236 # Overwriting should rewind the raw stream if it needs so 2237 bufio.write(b"B" * overwrite_size) 2238 self.assertEqual(bufio.tell(), overwrite_size + 1) 2239 # If the write size was smaller than the buffer size, flush() and 2240 # check that rewind happens. 2241 bufio.flush() 2242 self.assertEqual(bufio.tell(), overwrite_size + 1) 2243 s = raw.getvalue() 2244 self.assertEqual(s, 2245 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size)) 2246 2247 def test_write_rewind_write(self): 2248 # Various combinations of reading / writing / seeking backwards / writing again 2249 def mutate(bufio, pos1, pos2): 2250 assert pos2 >= pos1 2251 # Fill the buffer 2252 bufio.seek(pos1) 2253 bufio.read(pos2 - pos1) 2254 bufio.write(b'\x02') 2255 # This writes earlier than the previous write, but still inside 2256 # the buffer. 2257 bufio.seek(pos1) 2258 bufio.write(b'\x01') 2259 2260 b = b"\x80\x81\x82\x83\x84" 2261 for i in range(0, len(b)): 2262 for j in range(i, len(b)): 2263 raw = self.BytesIO(b) 2264 bufio = self.tp(raw, 100) 2265 mutate(bufio, i, j) 2266 bufio.flush() 2267 expected = bytearray(b) 2268 expected[j] = 2 2269 expected[i] = 1 2270 self.assertEqual(raw.getvalue(), expected, 2271 "failed result for i=%d, j=%d" % (i, j)) 2272 2273 def test_truncate_after_read_or_write(self): 2274 raw = self.BytesIO(b"A" * 10) 2275 bufio = self.tp(raw, 100) 2276 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled 2277 self.assertEqual(bufio.truncate(), 2) 2278 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases 2279 self.assertEqual(bufio.truncate(), 4) 2280 2281 def test_misbehaved_io(self): 2282 BufferedReaderTest.test_misbehaved_io(self) 2283 BufferedWriterTest.test_misbehaved_io(self) 2284 2285 def test_interleaved_read_write(self): 2286 # Test for issue #12213 2287 with self.BytesIO(b'abcdefgh') as raw: 2288 with self.tp(raw, 100) as f: 2289 f.write(b"1") 2290 self.assertEqual(f.read(1), b'b') 2291 f.write(b'2') 2292 self.assertEqual(f.read1(1), b'd') 2293 f.write(b'3') 2294 buf = bytearray(1) 2295 f.readinto(buf) 2296 self.assertEqual(buf, b'f') 2297 f.write(b'4') 2298 self.assertEqual(f.peek(1), b'h') 2299 f.flush() 2300 self.assertEqual(raw.getvalue(), b'1b2d3f4h') 2301 2302 with self.BytesIO(b'abc') as raw: 2303 with self.tp(raw, 100) as f: 2304 self.assertEqual(f.read(1), b'a') 2305 f.write(b"2") 2306 self.assertEqual(f.read(1), b'c') 2307 f.flush() 2308 self.assertEqual(raw.getvalue(), b'a2c') 2309 2310 def test_interleaved_readline_write(self): 2311 with self.BytesIO(b'ab\ncdef\ng\n') as raw: 2312 with self.tp(raw) as f: 2313 f.write(b'1') 2314 self.assertEqual(f.readline(), b'b\n') 2315 f.write(b'2') 2316 self.assertEqual(f.readline(), b'def\n') 2317 f.write(b'3') 2318 self.assertEqual(f.readline(), b'\n') 2319 f.flush() 2320 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n') 2321 2322 # You can't construct a BufferedRandom over a non-seekable stream. 2323 test_unseekable = None 2324 2325 2326class CBufferedRandomTest(BufferedRandomTest, SizeofTest): 2327 tp = io.BufferedRandom 2328 2329 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing " 2330 "instead of returning NULL for malloc failure.") 2331 def test_constructor(self): 2332 BufferedRandomTest.test_constructor(self) 2333 # The allocation can succeed on 32-bit builds, e.g. with more 2334 # than 2 GiB RAM and a 64-bit kernel. 2335 if sys.maxsize > 0x7FFFFFFF: 2336 rawio = self.MockRawIO() 2337 bufio = self.tp(rawio) 2338 self.assertRaises((OverflowError, MemoryError, ValueError), 2339 bufio.__init__, rawio, sys.maxsize) 2340 2341 def test_garbage_collection(self): 2342 CBufferedReaderTest.test_garbage_collection(self) 2343 CBufferedWriterTest.test_garbage_collection(self) 2344 2345 def test_args_error(self): 2346 # Issue #17275 2347 with self.assertRaisesRegex(TypeError, "BufferedRandom"): 2348 self.tp(io.BytesIO(), 1024, 1024, 1024) 2349 2350 2351class PyBufferedRandomTest(BufferedRandomTest): 2352 tp = pyio.BufferedRandom 2353 2354 2355# To fully exercise seek/tell, the StatefulIncrementalDecoder has these 2356# properties: 2357# - A single output character can correspond to many bytes of input. 2358# - The number of input bytes to complete the character can be 2359# undetermined until the last input byte is received. 2360# - The number of input bytes can vary depending on previous input. 2361# - A single input byte can correspond to many characters of output. 2362# - The number of output characters can be undetermined until the 2363# last input byte is received. 2364# - The number of output characters can vary depending on previous input. 2365 2366class StatefulIncrementalDecoder(codecs.IncrementalDecoder): 2367 """ 2368 For testing seek/tell behavior with a stateful, buffering decoder. 2369 2370 Input is a sequence of words. Words may be fixed-length (length set 2371 by input) or variable-length (period-terminated). In variable-length 2372 mode, extra periods are ignored. Possible words are: 2373 - 'i' followed by a number sets the input length, I (maximum 99). 2374 When I is set to 0, words are space-terminated. 2375 - 'o' followed by a number sets the output length, O (maximum 99). 2376 - Any other word is converted into a word followed by a period on 2377 the output. The output word consists of the input word truncated 2378 or padded out with hyphens to make its length equal to O. If O 2379 is 0, the word is output verbatim without truncating or padding. 2380 I and O are initially set to 1. When I changes, any buffered input is 2381 re-scanned according to the new I. EOF also terminates the last word. 2382 """ 2383 2384 def __init__(self, errors='strict'): 2385 codecs.IncrementalDecoder.__init__(self, errors) 2386 self.reset() 2387 2388 def __repr__(self): 2389 return '<SID %x>' % id(self) 2390 2391 def reset(self): 2392 self.i = 1 2393 self.o = 1 2394 self.buffer = bytearray() 2395 2396 def getstate(self): 2397 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset() 2398 return bytes(self.buffer), i*100 + o 2399 2400 def setstate(self, state): 2401 buffer, io = state 2402 self.buffer = bytearray(buffer) 2403 i, o = divmod(io, 100) 2404 self.i, self.o = i ^ 1, o ^ 1 2405 2406 def decode(self, input, final=False): 2407 output = '' 2408 for b in input: 2409 if self.i == 0: # variable-length, terminated with period 2410 if b == ord('.'): 2411 if self.buffer: 2412 output += self.process_word() 2413 else: 2414 self.buffer.append(b) 2415 else: # fixed-length, terminate after self.i bytes 2416 self.buffer.append(b) 2417 if len(self.buffer) == self.i: 2418 output += self.process_word() 2419 if final and self.buffer: # EOF terminates the last word 2420 output += self.process_word() 2421 return output 2422 2423 def process_word(self): 2424 output = '' 2425 if self.buffer[0] == ord('i'): 2426 self.i = min(99, int(self.buffer[1:] or 0)) # set input length 2427 elif self.buffer[0] == ord('o'): 2428 self.o = min(99, int(self.buffer[1:] or 0)) # set output length 2429 else: 2430 output = self.buffer.decode('ascii') 2431 if len(output) < self.o: 2432 output += '-'*self.o # pad out with hyphens 2433 if self.o: 2434 output = output[:self.o] # truncate to output length 2435 output += '.' 2436 self.buffer = bytearray() 2437 return output 2438 2439 codecEnabled = False 2440 2441 @classmethod 2442 def lookupTestDecoder(cls, name): 2443 if cls.codecEnabled and name == 'test_decoder': 2444 latin1 = codecs.lookup('latin-1') 2445 return codecs.CodecInfo( 2446 name='test_decoder', encode=latin1.encode, decode=None, 2447 incrementalencoder=None, 2448 streamreader=None, streamwriter=None, 2449 incrementaldecoder=cls) 2450 2451# Register the previous decoder for testing. 2452# Disabled by default, tests will enable it. 2453codecs.register(StatefulIncrementalDecoder.lookupTestDecoder) 2454 2455 2456class StatefulIncrementalDecoderTest(unittest.TestCase): 2457 """ 2458 Make sure the StatefulIncrementalDecoder actually works. 2459 """ 2460 2461 test_cases = [ 2462 # I=1, O=1 (fixed-length input == fixed-length output) 2463 (b'abcd', False, 'a.b.c.d.'), 2464 # I=0, O=0 (variable-length input, variable-length output) 2465 (b'oiabcd', True, 'abcd.'), 2466 # I=0, O=0 (should ignore extra periods) 2467 (b'oi...abcd...', True, 'abcd.'), 2468 # I=0, O=6 (variable-length input, fixed-length output) 2469 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'), 2470 # I=2, O=6 (fixed-length input < fixed-length output) 2471 (b'i.i2.o6xyz', True, 'xy----.z-----.'), 2472 # I=6, O=3 (fixed-length input > fixed-length output) 2473 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'), 2474 # I=0, then 3; O=29, then 15 (with longer output) 2475 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True, 2476 'a----------------------------.' + 2477 'b----------------------------.' + 2478 'cde--------------------------.' + 2479 'abcdefghijabcde.' + 2480 'a.b------------.' + 2481 '.c.------------.' + 2482 'd.e------------.' + 2483 'k--------------.' + 2484 'l--------------.' + 2485 'm--------------.') 2486 ] 2487 2488 def test_decoder(self): 2489 # Try a few one-shot test cases. 2490 for input, eof, output in self.test_cases: 2491 d = StatefulIncrementalDecoder() 2492 self.assertEqual(d.decode(input, eof), output) 2493 2494 # Also test an unfinished decode, followed by forcing EOF. 2495 d = StatefulIncrementalDecoder() 2496 self.assertEqual(d.decode(b'oiabcd'), '') 2497 self.assertEqual(d.decode(b'', 1), 'abcd.') 2498 2499class TextIOWrapperTest(unittest.TestCase): 2500 2501 def setUp(self): 2502 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" 2503 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") 2504 support.unlink(support.TESTFN) 2505 2506 def tearDown(self): 2507 support.unlink(support.TESTFN) 2508 2509 def test_constructor(self): 2510 r = self.BytesIO(b"\xc3\xa9\n\n") 2511 b = self.BufferedReader(r, 1000) 2512 t = self.TextIOWrapper(b) 2513 t.__init__(b, encoding="latin-1", newline="\r\n") 2514 self.assertEqual(t.encoding, "latin-1") 2515 self.assertEqual(t.line_buffering, False) 2516 t.__init__(b, encoding="utf-8", line_buffering=True) 2517 self.assertEqual(t.encoding, "utf-8") 2518 self.assertEqual(t.line_buffering, True) 2519 self.assertEqual("\xe9\n", t.readline()) 2520 self.assertRaises(TypeError, t.__init__, b, newline=42) 2521 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 2522 2523 def test_uninitialized(self): 2524 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2525 del t 2526 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2527 self.assertRaises(Exception, repr, t) 2528 self.assertRaisesRegex((ValueError, AttributeError), 2529 'uninitialized|has no attribute', 2530 t.read, 0) 2531 t.__init__(self.MockRawIO()) 2532 self.assertEqual(t.read(0), '') 2533 2534 def test_non_text_encoding_codecs_are_rejected(self): 2535 # Ensure the constructor complains if passed a codec that isn't 2536 # marked as a text encoding 2537 # http://bugs.python.org/issue20404 2538 r = self.BytesIO() 2539 b = self.BufferedWriter(r) 2540 with self.assertRaisesRegex(LookupError, "is not a text encoding"): 2541 self.TextIOWrapper(b, encoding="hex") 2542 2543 def test_detach(self): 2544 r = self.BytesIO() 2545 b = self.BufferedWriter(r) 2546 t = self.TextIOWrapper(b) 2547 self.assertIs(t.detach(), b) 2548 2549 t = self.TextIOWrapper(b, encoding="ascii") 2550 t.write("howdy") 2551 self.assertFalse(r.getvalue()) 2552 t.detach() 2553 self.assertEqual(r.getvalue(), b"howdy") 2554 self.assertRaises(ValueError, t.detach) 2555 2556 # Operations independent of the detached stream should still work 2557 repr(t) 2558 self.assertEqual(t.encoding, "ascii") 2559 self.assertEqual(t.errors, "strict") 2560 self.assertFalse(t.line_buffering) 2561 self.assertFalse(t.write_through) 2562 2563 def test_repr(self): 2564 raw = self.BytesIO("hello".encode("utf-8")) 2565 b = self.BufferedReader(raw) 2566 t = self.TextIOWrapper(b, encoding="utf-8") 2567 modname = self.TextIOWrapper.__module__ 2568 self.assertEqual(repr(t), 2569 "<%s.TextIOWrapper encoding='utf-8'>" % modname) 2570 raw.name = "dummy" 2571 self.assertEqual(repr(t), 2572 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname) 2573 t.mode = "r" 2574 self.assertEqual(repr(t), 2575 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname) 2576 raw.name = b"dummy" 2577 self.assertEqual(repr(t), 2578 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname) 2579 2580 t.buffer.detach() 2581 repr(t) # Should not raise an exception 2582 2583 def test_recursive_repr(self): 2584 # Issue #25455 2585 raw = self.BytesIO() 2586 t = self.TextIOWrapper(raw) 2587 with support.swap_attr(raw, 'name', t): 2588 try: 2589 repr(t) # Should not crash 2590 except RuntimeError: 2591 pass 2592 2593 def test_line_buffering(self): 2594 r = self.BytesIO() 2595 b = self.BufferedWriter(r, 1000) 2596 t = self.TextIOWrapper(b, newline="\n", line_buffering=True) 2597 t.write("X") 2598 self.assertEqual(r.getvalue(), b"") # No flush happened 2599 t.write("Y\nZ") 2600 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed 2601 t.write("A\rB") 2602 self.assertEqual(r.getvalue(), b"XY\nZA\rB") 2603 2604 def test_reconfigure_line_buffering(self): 2605 r = self.BytesIO() 2606 b = self.BufferedWriter(r, 1000) 2607 t = self.TextIOWrapper(b, newline="\n", line_buffering=False) 2608 t.write("AB\nC") 2609 self.assertEqual(r.getvalue(), b"") 2610 2611 t.reconfigure(line_buffering=True) # implicit flush 2612 self.assertEqual(r.getvalue(), b"AB\nC") 2613 t.write("DEF\nG") 2614 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") 2615 t.write("H") 2616 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") 2617 t.reconfigure(line_buffering=False) # implicit flush 2618 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") 2619 t.write("IJ") 2620 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") 2621 2622 # Keeping default value 2623 t.reconfigure() 2624 t.reconfigure(line_buffering=None) 2625 self.assertEqual(t.line_buffering, False) 2626 t.reconfigure(line_buffering=True) 2627 t.reconfigure() 2628 t.reconfigure(line_buffering=None) 2629 self.assertEqual(t.line_buffering, True) 2630 2631 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled") 2632 def test_default_encoding(self): 2633 old_environ = dict(os.environ) 2634 try: 2635 # try to get a user preferred encoding different than the current 2636 # locale encoding to check that TextIOWrapper() uses the current 2637 # locale encoding and not the user preferred encoding 2638 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'): 2639 if key in os.environ: 2640 del os.environ[key] 2641 2642 current_locale_encoding = locale.getpreferredencoding(False) 2643 b = self.BytesIO() 2644 t = self.TextIOWrapper(b) 2645 self.assertEqual(t.encoding, current_locale_encoding) 2646 finally: 2647 os.environ.clear() 2648 os.environ.update(old_environ) 2649 2650 @support.cpython_only 2651 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled") 2652 def test_device_encoding(self): 2653 # Issue 15989 2654 import _testcapi 2655 b = self.BytesIO() 2656 b.fileno = lambda: _testcapi.INT_MAX + 1 2657 self.assertRaises(OverflowError, self.TextIOWrapper, b) 2658 b.fileno = lambda: _testcapi.UINT_MAX + 1 2659 self.assertRaises(OverflowError, self.TextIOWrapper, b) 2660 2661 def test_encoding(self): 2662 # Check the encoding attribute is always set, and valid 2663 b = self.BytesIO() 2664 t = self.TextIOWrapper(b, encoding="utf-8") 2665 self.assertEqual(t.encoding, "utf-8") 2666 t = self.TextIOWrapper(b) 2667 self.assertIsNotNone(t.encoding) 2668 codecs.lookup(t.encoding) 2669 2670 def test_encoding_errors_reading(self): 2671 # (1) default 2672 b = self.BytesIO(b"abc\n\xff\n") 2673 t = self.TextIOWrapper(b, encoding="ascii") 2674 self.assertRaises(UnicodeError, t.read) 2675 # (2) explicit strict 2676 b = self.BytesIO(b"abc\n\xff\n") 2677 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2678 self.assertRaises(UnicodeError, t.read) 2679 # (3) ignore 2680 b = self.BytesIO(b"abc\n\xff\n") 2681 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") 2682 self.assertEqual(t.read(), "abc\n\n") 2683 # (4) replace 2684 b = self.BytesIO(b"abc\n\xff\n") 2685 t = self.TextIOWrapper(b, encoding="ascii", errors="replace") 2686 self.assertEqual(t.read(), "abc\n\ufffd\n") 2687 2688 def test_encoding_errors_writing(self): 2689 # (1) default 2690 b = self.BytesIO() 2691 t = self.TextIOWrapper(b, encoding="ascii") 2692 self.assertRaises(UnicodeError, t.write, "\xff") 2693 # (2) explicit strict 2694 b = self.BytesIO() 2695 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2696 self.assertRaises(UnicodeError, t.write, "\xff") 2697 # (3) ignore 2698 b = self.BytesIO() 2699 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", 2700 newline="\n") 2701 t.write("abc\xffdef\n") 2702 t.flush() 2703 self.assertEqual(b.getvalue(), b"abcdef\n") 2704 # (4) replace 2705 b = self.BytesIO() 2706 t = self.TextIOWrapper(b, encoding="ascii", errors="replace", 2707 newline="\n") 2708 t.write("abc\xffdef\n") 2709 t.flush() 2710 self.assertEqual(b.getvalue(), b"abc?def\n") 2711 2712 def test_newlines(self): 2713 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] 2714 2715 tests = [ 2716 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], 2717 [ '', input_lines ], 2718 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ], 2719 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], 2720 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], 2721 ] 2722 encodings = ( 2723 'utf-8', 'latin-1', 2724 'utf-16', 'utf-16-le', 'utf-16-be', 2725 'utf-32', 'utf-32-le', 'utf-32-be', 2726 ) 2727 2728 # Try a range of buffer sizes to test the case where \r is the last 2729 # character in TextIOWrapper._pending_line. 2730 for encoding in encodings: 2731 # XXX: str.encode() should return bytes 2732 data = bytes(''.join(input_lines).encode(encoding)) 2733 for do_reads in (False, True): 2734 for bufsize in range(1, 10): 2735 for newline, exp_lines in tests: 2736 bufio = self.BufferedReader(self.BytesIO(data), bufsize) 2737 textio = self.TextIOWrapper(bufio, newline=newline, 2738 encoding=encoding) 2739 if do_reads: 2740 got_lines = [] 2741 while True: 2742 c2 = textio.read(2) 2743 if c2 == '': 2744 break 2745 self.assertEqual(len(c2), 2) 2746 got_lines.append(c2 + textio.readline()) 2747 else: 2748 got_lines = list(textio) 2749 2750 for got_line, exp_line in zip(got_lines, exp_lines): 2751 self.assertEqual(got_line, exp_line) 2752 self.assertEqual(len(got_lines), len(exp_lines)) 2753 2754 def test_newlines_input(self): 2755 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" 2756 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") 2757 for newline, expected in [ 2758 (None, normalized.decode("ascii").splitlines(keepends=True)), 2759 ("", testdata.decode("ascii").splitlines(keepends=True)), 2760 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2761 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2762 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), 2763 ]: 2764 buf = self.BytesIO(testdata) 2765 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2766 self.assertEqual(txt.readlines(), expected) 2767 txt.seek(0) 2768 self.assertEqual(txt.read(), "".join(expected)) 2769 2770 def test_newlines_output(self): 2771 testdict = { 2772 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2773 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2774 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", 2775 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", 2776 } 2777 tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) 2778 for newline, expected in tests: 2779 buf = self.BytesIO() 2780 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2781 txt.write("AAA\nB") 2782 txt.write("BB\nCCC\n") 2783 txt.write("X\rY\r\nZ") 2784 txt.flush() 2785 self.assertEqual(buf.closed, False) 2786 self.assertEqual(buf.getvalue(), expected) 2787 2788 def test_destructor(self): 2789 l = [] 2790 base = self.BytesIO 2791 class MyBytesIO(base): 2792 def close(self): 2793 l.append(self.getvalue()) 2794 base.close(self) 2795 b = MyBytesIO() 2796 t = self.TextIOWrapper(b, encoding="ascii") 2797 t.write("abc") 2798 del t 2799 support.gc_collect() 2800 self.assertEqual([b"abc"], l) 2801 2802 def test_override_destructor(self): 2803 record = [] 2804 class MyTextIO(self.TextIOWrapper): 2805 def __del__(self): 2806 record.append(1) 2807 try: 2808 f = super().__del__ 2809 except AttributeError: 2810 pass 2811 else: 2812 f() 2813 def close(self): 2814 record.append(2) 2815 super().close() 2816 def flush(self): 2817 record.append(3) 2818 super().flush() 2819 b = self.BytesIO() 2820 t = MyTextIO(b, encoding="ascii") 2821 del t 2822 support.gc_collect() 2823 self.assertEqual(record, [1, 2, 3]) 2824 2825 def test_error_through_destructor(self): 2826 # Test that the exception state is not modified by a destructor, 2827 # even if close() fails. 2828 rawio = self.CloseFailureIO() 2829 def f(): 2830 self.TextIOWrapper(rawio).xyzzy 2831 with support.captured_output("stderr") as s: 2832 self.assertRaises(AttributeError, f) 2833 s = s.getvalue().strip() 2834 if s: 2835 # The destructor *may* have printed an unraisable error, check it 2836 self.assertEqual(len(s.splitlines()), 1) 2837 self.assertTrue(s.startswith("Exception OSError: "), s) 2838 self.assertTrue(s.endswith(" ignored"), s) 2839 2840 # Systematic tests of the text I/O API 2841 2842 def test_basic_io(self): 2843 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): 2844 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le": 2845 f = self.open(support.TESTFN, "w+", encoding=enc) 2846 f._CHUNK_SIZE = chunksize 2847 self.assertEqual(f.write("abc"), 3) 2848 f.close() 2849 f = self.open(support.TESTFN, "r+", encoding=enc) 2850 f._CHUNK_SIZE = chunksize 2851 self.assertEqual(f.tell(), 0) 2852 self.assertEqual(f.read(), "abc") 2853 cookie = f.tell() 2854 self.assertEqual(f.seek(0), 0) 2855 self.assertEqual(f.read(None), "abc") 2856 f.seek(0) 2857 self.assertEqual(f.read(2), "ab") 2858 self.assertEqual(f.read(1), "c") 2859 self.assertEqual(f.read(1), "") 2860 self.assertEqual(f.read(), "") 2861 self.assertEqual(f.tell(), cookie) 2862 self.assertEqual(f.seek(0), 0) 2863 self.assertEqual(f.seek(0, 2), cookie) 2864 self.assertEqual(f.write("def"), 3) 2865 self.assertEqual(f.seek(cookie), cookie) 2866 self.assertEqual(f.read(), "def") 2867 if enc.startswith("utf"): 2868 self.multi_line_test(f, enc) 2869 f.close() 2870 2871 def multi_line_test(self, f, enc): 2872 f.seek(0) 2873 f.truncate() 2874 sample = "s\xff\u0fff\uffff" 2875 wlines = [] 2876 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): 2877 chars = [] 2878 for i in range(size): 2879 chars.append(sample[i % len(sample)]) 2880 line = "".join(chars) + "\n" 2881 wlines.append((f.tell(), line)) 2882 f.write(line) 2883 f.seek(0) 2884 rlines = [] 2885 while True: 2886 pos = f.tell() 2887 line = f.readline() 2888 if not line: 2889 break 2890 rlines.append((pos, line)) 2891 self.assertEqual(rlines, wlines) 2892 2893 def test_telling(self): 2894 f = self.open(support.TESTFN, "w+", encoding="utf-8") 2895 p0 = f.tell() 2896 f.write("\xff\n") 2897 p1 = f.tell() 2898 f.write("\xff\n") 2899 p2 = f.tell() 2900 f.seek(0) 2901 self.assertEqual(f.tell(), p0) 2902 self.assertEqual(f.readline(), "\xff\n") 2903 self.assertEqual(f.tell(), p1) 2904 self.assertEqual(f.readline(), "\xff\n") 2905 self.assertEqual(f.tell(), p2) 2906 f.seek(0) 2907 for line in f: 2908 self.assertEqual(line, "\xff\n") 2909 self.assertRaises(OSError, f.tell) 2910 self.assertEqual(f.tell(), p2) 2911 f.close() 2912 2913 def test_seeking(self): 2914 chunk_size = _default_chunk_size() 2915 prefix_size = chunk_size - 2 2916 u_prefix = "a" * prefix_size 2917 prefix = bytes(u_prefix.encode("utf-8")) 2918 self.assertEqual(len(u_prefix), len(prefix)) 2919 u_suffix = "\u8888\n" 2920 suffix = bytes(u_suffix.encode("utf-8")) 2921 line = prefix + suffix 2922 with self.open(support.TESTFN, "wb") as f: 2923 f.write(line*2) 2924 with self.open(support.TESTFN, "r", encoding="utf-8") as f: 2925 s = f.read(prefix_size) 2926 self.assertEqual(s, str(prefix, "ascii")) 2927 self.assertEqual(f.tell(), prefix_size) 2928 self.assertEqual(f.readline(), u_suffix) 2929 2930 def test_seeking_too(self): 2931 # Regression test for a specific bug 2932 data = b'\xe0\xbf\xbf\n' 2933 with self.open(support.TESTFN, "wb") as f: 2934 f.write(data) 2935 with self.open(support.TESTFN, "r", encoding="utf-8") as f: 2936 f._CHUNK_SIZE # Just test that it exists 2937 f._CHUNK_SIZE = 2 2938 f.readline() 2939 f.tell() 2940 2941 def test_seek_and_tell(self): 2942 #Test seek/tell using the StatefulIncrementalDecoder. 2943 # Make test faster by doing smaller seeks 2944 CHUNK_SIZE = 128 2945 2946 def test_seek_and_tell_with_data(data, min_pos=0): 2947 """Tell/seek to various points within a data stream and ensure 2948 that the decoded data returned by read() is consistent.""" 2949 f = self.open(support.TESTFN, 'wb') 2950 f.write(data) 2951 f.close() 2952 f = self.open(support.TESTFN, encoding='test_decoder') 2953 f._CHUNK_SIZE = CHUNK_SIZE 2954 decoded = f.read() 2955 f.close() 2956 2957 for i in range(min_pos, len(decoded) + 1): # seek positions 2958 for j in [1, 5, len(decoded) - i]: # read lengths 2959 f = self.open(support.TESTFN, encoding='test_decoder') 2960 self.assertEqual(f.read(i), decoded[:i]) 2961 cookie = f.tell() 2962 self.assertEqual(f.read(j), decoded[i:i + j]) 2963 f.seek(cookie) 2964 self.assertEqual(f.read(), decoded[i:]) 2965 f.close() 2966 2967 # Enable the test decoder. 2968 StatefulIncrementalDecoder.codecEnabled = 1 2969 2970 # Run the tests. 2971 try: 2972 # Try each test case. 2973 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 2974 test_seek_and_tell_with_data(input) 2975 2976 # Position each test case so that it crosses a chunk boundary. 2977 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 2978 offset = CHUNK_SIZE - len(input)//2 2979 prefix = b'.'*offset 2980 # Don't bother seeking into the prefix (takes too long). 2981 min_pos = offset*2 2982 test_seek_and_tell_with_data(prefix + input, min_pos) 2983 2984 # Ensure our test decoder won't interfere with subsequent tests. 2985 finally: 2986 StatefulIncrementalDecoder.codecEnabled = 0 2987 2988 def test_encoded_writes(self): 2989 data = "1234567890" 2990 tests = ("utf-16", 2991 "utf-16-le", 2992 "utf-16-be", 2993 "utf-32", 2994 "utf-32-le", 2995 "utf-32-be") 2996 for encoding in tests: 2997 buf = self.BytesIO() 2998 f = self.TextIOWrapper(buf, encoding=encoding) 2999 # Check if the BOM is written only once (see issue1753). 3000 f.write(data) 3001 f.write(data) 3002 f.seek(0) 3003 self.assertEqual(f.read(), data * 2) 3004 f.seek(0) 3005 self.assertEqual(f.read(), data * 2) 3006 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding)) 3007 3008 def test_unreadable(self): 3009 class UnReadable(self.BytesIO): 3010 def readable(self): 3011 return False 3012 txt = self.TextIOWrapper(UnReadable()) 3013 self.assertRaises(OSError, txt.read) 3014 3015 def test_read_one_by_one(self): 3016 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB")) 3017 reads = "" 3018 while True: 3019 c = txt.read(1) 3020 if not c: 3021 break 3022 reads += c 3023 self.assertEqual(reads, "AA\nBB") 3024 3025 def test_readlines(self): 3026 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC")) 3027 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) 3028 txt.seek(0) 3029 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) 3030 txt.seek(0) 3031 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"]) 3032 3033 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. 3034 def test_read_by_chunk(self): 3035 # make sure "\r\n" straddles 128 char boundary. 3036 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB")) 3037 reads = "" 3038 while True: 3039 c = txt.read(128) 3040 if not c: 3041 break 3042 reads += c 3043 self.assertEqual(reads, "A"*127+"\nB") 3044 3045 def test_writelines(self): 3046 l = ['ab', 'cd', 'ef'] 3047 buf = self.BytesIO() 3048 txt = self.TextIOWrapper(buf) 3049 txt.writelines(l) 3050 txt.flush() 3051 self.assertEqual(buf.getvalue(), b'abcdef') 3052 3053 def test_writelines_userlist(self): 3054 l = UserList(['ab', 'cd', 'ef']) 3055 buf = self.BytesIO() 3056 txt = self.TextIOWrapper(buf) 3057 txt.writelines(l) 3058 txt.flush() 3059 self.assertEqual(buf.getvalue(), b'abcdef') 3060 3061 def test_writelines_error(self): 3062 txt = self.TextIOWrapper(self.BytesIO()) 3063 self.assertRaises(TypeError, txt.writelines, [1, 2, 3]) 3064 self.assertRaises(TypeError, txt.writelines, None) 3065 self.assertRaises(TypeError, txt.writelines, b'abc') 3066 3067 def test_issue1395_1(self): 3068 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3069 3070 # read one char at a time 3071 reads = "" 3072 while True: 3073 c = txt.read(1) 3074 if not c: 3075 break 3076 reads += c 3077 self.assertEqual(reads, self.normalized) 3078 3079 def test_issue1395_2(self): 3080 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3081 txt._CHUNK_SIZE = 4 3082 3083 reads = "" 3084 while True: 3085 c = txt.read(4) 3086 if not c: 3087 break 3088 reads += c 3089 self.assertEqual(reads, self.normalized) 3090 3091 def test_issue1395_3(self): 3092 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3093 txt._CHUNK_SIZE = 4 3094 3095 reads = txt.read(4) 3096 reads += txt.read(4) 3097 reads += txt.readline() 3098 reads += txt.readline() 3099 reads += txt.readline() 3100 self.assertEqual(reads, self.normalized) 3101 3102 def test_issue1395_4(self): 3103 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3104 txt._CHUNK_SIZE = 4 3105 3106 reads = txt.read(4) 3107 reads += txt.read() 3108 self.assertEqual(reads, self.normalized) 3109 3110 def test_issue1395_5(self): 3111 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3112 txt._CHUNK_SIZE = 4 3113 3114 reads = txt.read(4) 3115 pos = txt.tell() 3116 txt.seek(0) 3117 txt.seek(pos) 3118 self.assertEqual(txt.read(4), "BBB\n") 3119 3120 def test_issue2282(self): 3121 buffer = self.BytesIO(self.testdata) 3122 txt = self.TextIOWrapper(buffer, encoding="ascii") 3123 3124 self.assertEqual(buffer.seekable(), txt.seekable()) 3125 3126 def test_append_bom(self): 3127 # The BOM is not written again when appending to a non-empty file 3128 filename = support.TESTFN 3129 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3130 with self.open(filename, 'w', encoding=charset) as f: 3131 f.write('aaa') 3132 pos = f.tell() 3133 with self.open(filename, 'rb') as f: 3134 self.assertEqual(f.read(), 'aaa'.encode(charset)) 3135 3136 with self.open(filename, 'a', encoding=charset) as f: 3137 f.write('xxx') 3138 with self.open(filename, 'rb') as f: 3139 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 3140 3141 def test_seek_bom(self): 3142 # Same test, but when seeking manually 3143 filename = support.TESTFN 3144 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3145 with self.open(filename, 'w', encoding=charset) as f: 3146 f.write('aaa') 3147 pos = f.tell() 3148 with self.open(filename, 'r+', encoding=charset) as f: 3149 f.seek(pos) 3150 f.write('zzz') 3151 f.seek(0) 3152 f.write('bbb') 3153 with self.open(filename, 'rb') as f: 3154 self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) 3155 3156 def test_seek_append_bom(self): 3157 # Same test, but first seek to the start and then to the end 3158 filename = support.TESTFN 3159 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3160 with self.open(filename, 'w', encoding=charset) as f: 3161 f.write('aaa') 3162 with self.open(filename, 'a', encoding=charset) as f: 3163 f.seek(0) 3164 f.seek(0, self.SEEK_END) 3165 f.write('xxx') 3166 with self.open(filename, 'rb') as f: 3167 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 3168 3169 def test_errors_property(self): 3170 with self.open(support.TESTFN, "w") as f: 3171 self.assertEqual(f.errors, "strict") 3172 with self.open(support.TESTFN, "w", errors="replace") as f: 3173 self.assertEqual(f.errors, "replace") 3174 3175 @support.no_tracing 3176 def test_threads_write(self): 3177 # Issue6750: concurrent writes could duplicate data 3178 event = threading.Event() 3179 with self.open(support.TESTFN, "w", buffering=1) as f: 3180 def run(n): 3181 text = "Thread%03d\n" % n 3182 event.wait() 3183 f.write(text) 3184 threads = [threading.Thread(target=run, args=(x,)) 3185 for x in range(20)] 3186 with support.start_threads(threads, event.set): 3187 time.sleep(0.02) 3188 with self.open(support.TESTFN) as f: 3189 content = f.read() 3190 for n in range(20): 3191 self.assertEqual(content.count("Thread%03d\n" % n), 1) 3192 3193 def test_flush_error_on_close(self): 3194 # Test that text file is closed despite failed flush 3195 # and that flush() is called before file closed. 3196 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3197 closed = [] 3198 def bad_flush(): 3199 closed[:] = [txt.closed, txt.buffer.closed] 3200 raise OSError() 3201 txt.flush = bad_flush 3202 self.assertRaises(OSError, txt.close) # exception not swallowed 3203 self.assertTrue(txt.closed) 3204 self.assertTrue(txt.buffer.closed) 3205 self.assertTrue(closed) # flush() called 3206 self.assertFalse(closed[0]) # flush() called before file closed 3207 self.assertFalse(closed[1]) 3208 txt.flush = lambda: None # break reference loop 3209 3210 def test_close_error_on_close(self): 3211 buffer = self.BytesIO(self.testdata) 3212 def bad_flush(): 3213 raise OSError('flush') 3214 def bad_close(): 3215 raise OSError('close') 3216 buffer.close = bad_close 3217 txt = self.TextIOWrapper(buffer, encoding="ascii") 3218 txt.flush = bad_flush 3219 with self.assertRaises(OSError) as err: # exception not swallowed 3220 txt.close() 3221 self.assertEqual(err.exception.args, ('close',)) 3222 self.assertIsInstance(err.exception.__context__, OSError) 3223 self.assertEqual(err.exception.__context__.args, ('flush',)) 3224 self.assertFalse(txt.closed) 3225 3226 def test_nonnormalized_close_error_on_close(self): 3227 # Issue #21677 3228 buffer = self.BytesIO(self.testdata) 3229 def bad_flush(): 3230 raise non_existing_flush 3231 def bad_close(): 3232 raise non_existing_close 3233 buffer.close = bad_close 3234 txt = self.TextIOWrapper(buffer, encoding="ascii") 3235 txt.flush = bad_flush 3236 with self.assertRaises(NameError) as err: # exception not swallowed 3237 txt.close() 3238 self.assertIn('non_existing_close', str(err.exception)) 3239 self.assertIsInstance(err.exception.__context__, NameError) 3240 self.assertIn('non_existing_flush', str(err.exception.__context__)) 3241 self.assertFalse(txt.closed) 3242 3243 def test_multi_close(self): 3244 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3245 txt.close() 3246 txt.close() 3247 txt.close() 3248 self.assertRaises(ValueError, txt.flush) 3249 3250 def test_unseekable(self): 3251 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata)) 3252 self.assertRaises(self.UnsupportedOperation, txt.tell) 3253 self.assertRaises(self.UnsupportedOperation, txt.seek, 0) 3254 3255 def test_readonly_attributes(self): 3256 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3257 buf = self.BytesIO(self.testdata) 3258 with self.assertRaises(AttributeError): 3259 txt.buffer = buf 3260 3261 def test_rawio(self): 3262 # Issue #12591: TextIOWrapper must work with raw I/O objects, so 3263 # that subprocess.Popen() can have the required unbuffered 3264 # semantics with universal_newlines=True. 3265 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) 3266 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3267 # Reads 3268 self.assertEqual(txt.read(4), 'abcd') 3269 self.assertEqual(txt.readline(), 'efghi\n') 3270 self.assertEqual(list(txt), ['jkl\n', 'opq\n']) 3271 3272 def test_rawio_write_through(self): 3273 # Issue #12591: with write_through=True, writes don't need a flush 3274 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) 3275 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n', 3276 write_through=True) 3277 txt.write('1') 3278 txt.write('23\n4') 3279 txt.write('5') 3280 self.assertEqual(b''.join(raw._write_stack), b'123\n45') 3281 3282 def test_bufio_write_through(self): 3283 # Issue #21396: write_through=True doesn't force a flush() 3284 # on the underlying binary buffered object. 3285 flush_called, write_called = [], [] 3286 class BufferedWriter(self.BufferedWriter): 3287 def flush(self, *args, **kwargs): 3288 flush_called.append(True) 3289 return super().flush(*args, **kwargs) 3290 def write(self, *args, **kwargs): 3291 write_called.append(True) 3292 return super().write(*args, **kwargs) 3293 3294 rawio = self.BytesIO() 3295 data = b"a" 3296 bufio = BufferedWriter(rawio, len(data)*2) 3297 textio = self.TextIOWrapper(bufio, encoding='ascii', 3298 write_through=True) 3299 # write to the buffered io but don't overflow the buffer 3300 text = data.decode('ascii') 3301 textio.write(text) 3302 3303 # buffer.flush is not called with write_through=True 3304 self.assertFalse(flush_called) 3305 # buffer.write *is* called with write_through=True 3306 self.assertTrue(write_called) 3307 self.assertEqual(rawio.getvalue(), b"") # no flush 3308 3309 write_called = [] # reset 3310 textio.write(text * 10) # total content is larger than bufio buffer 3311 self.assertTrue(write_called) 3312 self.assertEqual(rawio.getvalue(), data * 11) # all flushed 3313 3314 def test_reconfigure_write_through(self): 3315 raw = self.MockRawIO([]) 3316 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3317 t.write('1') 3318 t.reconfigure(write_through=True) # implied flush 3319 self.assertEqual(t.write_through, True) 3320 self.assertEqual(b''.join(raw._write_stack), b'1') 3321 t.write('23') 3322 self.assertEqual(b''.join(raw._write_stack), b'123') 3323 t.reconfigure(write_through=False) 3324 self.assertEqual(t.write_through, False) 3325 t.write('45') 3326 t.flush() 3327 self.assertEqual(b''.join(raw._write_stack), b'12345') 3328 # Keeping default value 3329 t.reconfigure() 3330 t.reconfigure(write_through=None) 3331 self.assertEqual(t.write_through, False) 3332 t.reconfigure(write_through=True) 3333 t.reconfigure() 3334 t.reconfigure(write_through=None) 3335 self.assertEqual(t.write_through, True) 3336 3337 def test_read_nonbytes(self): 3338 # Issue #17106 3339 # Crash when underlying read() returns non-bytes 3340 t = self.TextIOWrapper(self.StringIO('a')) 3341 self.assertRaises(TypeError, t.read, 1) 3342 t = self.TextIOWrapper(self.StringIO('a')) 3343 self.assertRaises(TypeError, t.readline) 3344 t = self.TextIOWrapper(self.StringIO('a')) 3345 self.assertRaises(TypeError, t.read) 3346 3347 def test_illegal_encoder(self): 3348 # Issue 31271: Calling write() while the return value of encoder's 3349 # encode() is invalid shouldn't cause an assertion failure. 3350 rot13 = codecs.lookup("rot13") 3351 with support.swap_attr(rot13, '_is_text_encoding', True): 3352 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13") 3353 self.assertRaises(TypeError, t.write, 'bar') 3354 3355 def test_illegal_decoder(self): 3356 # Issue #17106 3357 # Bypass the early encoding check added in issue 20404 3358 def _make_illegal_wrapper(): 3359 quopri = codecs.lookup("quopri") 3360 quopri._is_text_encoding = True 3361 try: 3362 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), 3363 newline='\n', encoding="quopri") 3364 finally: 3365 quopri._is_text_encoding = False 3366 return t 3367 # Crash when decoder returns non-string 3368 t = _make_illegal_wrapper() 3369 self.assertRaises(TypeError, t.read, 1) 3370 t = _make_illegal_wrapper() 3371 self.assertRaises(TypeError, t.readline) 3372 t = _make_illegal_wrapper() 3373 self.assertRaises(TypeError, t.read) 3374 3375 # Issue 31243: calling read() while the return value of decoder's 3376 # getstate() is invalid should neither crash the interpreter nor 3377 # raise a SystemError. 3378 def _make_very_illegal_wrapper(getstate_ret_val): 3379 class BadDecoder: 3380 def getstate(self): 3381 return getstate_ret_val 3382 def _get_bad_decoder(dummy): 3383 return BadDecoder() 3384 quopri = codecs.lookup("quopri") 3385 with support.swap_attr(quopri, 'incrementaldecoder', 3386 _get_bad_decoder): 3387 return _make_illegal_wrapper() 3388 t = _make_very_illegal_wrapper(42) 3389 self.assertRaises(TypeError, t.read, 42) 3390 t = _make_very_illegal_wrapper(()) 3391 self.assertRaises(TypeError, t.read, 42) 3392 t = _make_very_illegal_wrapper((1, 2)) 3393 self.assertRaises(TypeError, t.read, 42) 3394 3395 def _check_create_at_shutdown(self, **kwargs): 3396 # Issue #20037: creating a TextIOWrapper at shutdown 3397 # shouldn't crash the interpreter. 3398 iomod = self.io.__name__ 3399 code = """if 1: 3400 import codecs 3401 import {iomod} as io 3402 3403 # Avoid looking up codecs at shutdown 3404 codecs.lookup('utf-8') 3405 3406 class C: 3407 def __init__(self): 3408 self.buf = io.BytesIO() 3409 def __del__(self): 3410 io.TextIOWrapper(self.buf, **{kwargs}) 3411 print("ok") 3412 c = C() 3413 """.format(iomod=iomod, kwargs=kwargs) 3414 return assert_python_ok("-c", code) 3415 3416 @support.requires_type_collecting 3417 def test_create_at_shutdown_without_encoding(self): 3418 rc, out, err = self._check_create_at_shutdown() 3419 if err: 3420 # Can error out with a RuntimeError if the module state 3421 # isn't found. 3422 self.assertIn(self.shutdown_error, err.decode()) 3423 else: 3424 self.assertEqual("ok", out.decode().strip()) 3425 3426 @support.requires_type_collecting 3427 def test_create_at_shutdown_with_encoding(self): 3428 rc, out, err = self._check_create_at_shutdown(encoding='utf-8', 3429 errors='strict') 3430 self.assertFalse(err) 3431 self.assertEqual("ok", out.decode().strip()) 3432 3433 def test_read_byteslike(self): 3434 r = MemviewBytesIO(b'Just some random string\n') 3435 t = self.TextIOWrapper(r, 'utf-8') 3436 3437 # TextIOwrapper will not read the full string, because 3438 # we truncate it to a multiple of the native int size 3439 # so that we can construct a more complex memoryview. 3440 bytes_val = _to_memoryview(r.getvalue()).tobytes() 3441 3442 self.assertEqual(t.read(200), bytes_val.decode('utf-8')) 3443 3444 def test_issue22849(self): 3445 class F(object): 3446 def readable(self): return True 3447 def writable(self): return True 3448 def seekable(self): return True 3449 3450 for i in range(10): 3451 try: 3452 self.TextIOWrapper(F(), encoding='utf-8') 3453 except Exception: 3454 pass 3455 3456 F.tell = lambda x: 0 3457 t = self.TextIOWrapper(F(), encoding='utf-8') 3458 3459 def test_reconfigure_encoding_read(self): 3460 # latin1 -> utf8 3461 # (latin1 can decode utf-8 encoded string) 3462 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8') 3463 raw = self.BytesIO(data) 3464 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') 3465 self.assertEqual(txt.readline(), 'abc\xe9\n') 3466 with self.assertRaises(self.UnsupportedOperation): 3467 txt.reconfigure(encoding='utf-8') 3468 with self.assertRaises(self.UnsupportedOperation): 3469 txt.reconfigure(newline=None) 3470 3471 def test_reconfigure_write_fromascii(self): 3472 # ascii has a specific encodefunc in the C implementation, 3473 # but utf-8-sig has not. Make sure that we get rid of the 3474 # cached encodefunc when we switch encoders. 3475 raw = self.BytesIO() 3476 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3477 txt.write('foo\n') 3478 txt.reconfigure(encoding='utf-8-sig') 3479 txt.write('\xe9\n') 3480 txt.flush() 3481 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n') 3482 3483 def test_reconfigure_write(self): 3484 # latin -> utf8 3485 raw = self.BytesIO() 3486 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') 3487 txt.write('abc\xe9\n') 3488 txt.reconfigure(encoding='utf-8') 3489 self.assertEqual(raw.getvalue(), b'abc\xe9\n') 3490 txt.write('d\xe9f\n') 3491 txt.flush() 3492 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n') 3493 3494 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of 3495 # the file 3496 raw = self.BytesIO() 3497 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3498 txt.write('abc\n') 3499 txt.reconfigure(encoding='utf-8-sig') 3500 txt.write('d\xe9f\n') 3501 txt.flush() 3502 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n') 3503 3504 def test_reconfigure_write_non_seekable(self): 3505 raw = self.BytesIO() 3506 raw.seekable = lambda: False 3507 raw.seek = None 3508 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3509 txt.write('abc\n') 3510 txt.reconfigure(encoding='utf-8-sig') 3511 txt.write('d\xe9f\n') 3512 txt.flush() 3513 3514 # If the raw stream is not seekable, there'll be a BOM 3515 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n') 3516 3517 def test_reconfigure_defaults(self): 3518 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n') 3519 txt.reconfigure(encoding=None) 3520 self.assertEqual(txt.encoding, 'ascii') 3521 self.assertEqual(txt.errors, 'replace') 3522 txt.write('LF\n') 3523 3524 txt.reconfigure(newline='\r\n') 3525 self.assertEqual(txt.encoding, 'ascii') 3526 self.assertEqual(txt.errors, 'replace') 3527 3528 txt.reconfigure(errors='ignore') 3529 self.assertEqual(txt.encoding, 'ascii') 3530 self.assertEqual(txt.errors, 'ignore') 3531 txt.write('CRLF\n') 3532 3533 txt.reconfigure(encoding='utf-8', newline=None) 3534 self.assertEqual(txt.errors, 'strict') 3535 txt.seek(0) 3536 self.assertEqual(txt.read(), 'LF\nCRLF\n') 3537 3538 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n') 3539 3540 def test_reconfigure_newline(self): 3541 raw = self.BytesIO(b'CR\rEOF') 3542 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3543 txt.reconfigure(newline=None) 3544 self.assertEqual(txt.readline(), 'CR\n') 3545 raw = self.BytesIO(b'CR\rEOF') 3546 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3547 txt.reconfigure(newline='') 3548 self.assertEqual(txt.readline(), 'CR\r') 3549 raw = self.BytesIO(b'CR\rLF\nEOF') 3550 txt = self.TextIOWrapper(raw, 'ascii', newline='\r') 3551 txt.reconfigure(newline='\n') 3552 self.assertEqual(txt.readline(), 'CR\rLF\n') 3553 raw = self.BytesIO(b'LF\nCR\rEOF') 3554 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3555 txt.reconfigure(newline='\r') 3556 self.assertEqual(txt.readline(), 'LF\nCR\r') 3557 raw = self.BytesIO(b'CR\rCRLF\r\nEOF') 3558 txt = self.TextIOWrapper(raw, 'ascii', newline='\r') 3559 txt.reconfigure(newline='\r\n') 3560 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n') 3561 3562 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r') 3563 txt.reconfigure(newline=None) 3564 txt.write('linesep\n') 3565 txt.reconfigure(newline='') 3566 txt.write('LF\n') 3567 txt.reconfigure(newline='\n') 3568 txt.write('LF\n') 3569 txt.reconfigure(newline='\r') 3570 txt.write('CR\n') 3571 txt.reconfigure(newline='\r\n') 3572 txt.write('CRLF\n') 3573 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n' 3574 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected) 3575 3576 def test_issue25862(self): 3577 # Assertion failures occurred in tell() after read() and write(). 3578 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') 3579 t.read(1) 3580 t.read() 3581 t.tell() 3582 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') 3583 t.read(1) 3584 t.write('x') 3585 t.tell() 3586 3587 3588class MemviewBytesIO(io.BytesIO): 3589 '''A BytesIO object whose read method returns memoryviews 3590 rather than bytes''' 3591 3592 def read1(self, len_): 3593 return _to_memoryview(super().read1(len_)) 3594 3595 def read(self, len_): 3596 return _to_memoryview(super().read(len_)) 3597 3598def _to_memoryview(buf): 3599 '''Convert bytes-object *buf* to a non-trivial memoryview''' 3600 3601 arr = array.array('i') 3602 idx = len(buf) - len(buf) % arr.itemsize 3603 arr.frombytes(buf[:idx]) 3604 return memoryview(arr) 3605 3606 3607class CTextIOWrapperTest(TextIOWrapperTest): 3608 io = io 3609 shutdown_error = "RuntimeError: could not find io module state" 3610 3611 def test_initialization(self): 3612 r = self.BytesIO(b"\xc3\xa9\n\n") 3613 b = self.BufferedReader(r, 1000) 3614 t = self.TextIOWrapper(b) 3615 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 3616 self.assertRaises(ValueError, t.read) 3617 3618 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 3619 self.assertRaises(Exception, repr, t) 3620 3621 def test_garbage_collection(self): 3622 # C TextIOWrapper objects are collected, and collecting them flushes 3623 # all data to disk. 3624 # The Python version has __del__, so it ends in gc.garbage instead. 3625 with support.check_warnings(('', ResourceWarning)): 3626 rawio = io.FileIO(support.TESTFN, "wb") 3627 b = self.BufferedWriter(rawio) 3628 t = self.TextIOWrapper(b, encoding="ascii") 3629 t.write("456def") 3630 t.x = t 3631 wr = weakref.ref(t) 3632 del t 3633 support.gc_collect() 3634 self.assertIsNone(wr(), wr) 3635 with self.open(support.TESTFN, "rb") as f: 3636 self.assertEqual(f.read(), b"456def") 3637 3638 def test_rwpair_cleared_before_textio(self): 3639 # Issue 13070: TextIOWrapper's finalization would crash when called 3640 # after the reference to the underlying BufferedRWPair's writer got 3641 # cleared by the GC. 3642 for i in range(1000): 3643 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 3644 t1 = self.TextIOWrapper(b1, encoding="ascii") 3645 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 3646 t2 = self.TextIOWrapper(b2, encoding="ascii") 3647 # circular references 3648 t1.buddy = t2 3649 t2.buddy = t1 3650 support.gc_collect() 3651 3652 def test_del__CHUNK_SIZE_SystemError(self): 3653 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii') 3654 with self.assertRaises(AttributeError): 3655 del t._CHUNK_SIZE 3656 3657 3658class PyTextIOWrapperTest(TextIOWrapperTest): 3659 io = pyio 3660 shutdown_error = "LookupError: unknown encoding: ascii" 3661 3662 3663class IncrementalNewlineDecoderTest(unittest.TestCase): 3664 3665 def check_newline_decoding_utf8(self, decoder): 3666 # UTF-8 specific tests for a newline decoder 3667 def _check_decode(b, s, **kwargs): 3668 # We exercise getstate() / setstate() as well as decode() 3669 state = decoder.getstate() 3670 self.assertEqual(decoder.decode(b, **kwargs), s) 3671 decoder.setstate(state) 3672 self.assertEqual(decoder.decode(b, **kwargs), s) 3673 3674 _check_decode(b'\xe8\xa2\x88', "\u8888") 3675 3676 _check_decode(b'\xe8', "") 3677 _check_decode(b'\xa2', "") 3678 _check_decode(b'\x88', "\u8888") 3679 3680 _check_decode(b'\xe8', "") 3681 _check_decode(b'\xa2', "") 3682 _check_decode(b'\x88', "\u8888") 3683 3684 _check_decode(b'\xe8', "") 3685 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True) 3686 3687 decoder.reset() 3688 _check_decode(b'\n', "\n") 3689 _check_decode(b'\r', "") 3690 _check_decode(b'', "\n", final=True) 3691 _check_decode(b'\r', "\n", final=True) 3692 3693 _check_decode(b'\r', "") 3694 _check_decode(b'a', "\na") 3695 3696 _check_decode(b'\r\r\n', "\n\n") 3697 _check_decode(b'\r', "") 3698 _check_decode(b'\r', "\n") 3699 _check_decode(b'\na', "\na") 3700 3701 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n") 3702 _check_decode(b'\xe8\xa2\x88', "\u8888") 3703 _check_decode(b'\n', "\n") 3704 _check_decode(b'\xe8\xa2\x88\r', "\u8888") 3705 _check_decode(b'\n', "\n") 3706 3707 def check_newline_decoding(self, decoder, encoding): 3708 result = [] 3709 if encoding is not None: 3710 encoder = codecs.getincrementalencoder(encoding)() 3711 def _decode_bytewise(s): 3712 # Decode one byte at a time 3713 for b in encoder.encode(s): 3714 result.append(decoder.decode(bytes([b]))) 3715 else: 3716 encoder = None 3717 def _decode_bytewise(s): 3718 # Decode one char at a time 3719 for c in s: 3720 result.append(decoder.decode(c)) 3721 self.assertEqual(decoder.newlines, None) 3722 _decode_bytewise("abc\n\r") 3723 self.assertEqual(decoder.newlines, '\n') 3724 _decode_bytewise("\nabc") 3725 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 3726 _decode_bytewise("abc\r") 3727 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 3728 _decode_bytewise("abc") 3729 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) 3730 _decode_bytewise("abc\r") 3731 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") 3732 decoder.reset() 3733 input = "abc" 3734 if encoder is not None: 3735 encoder.reset() 3736 input = encoder.encode(input) 3737 self.assertEqual(decoder.decode(input), "abc") 3738 self.assertEqual(decoder.newlines, None) 3739 3740 def test_newline_decoder(self): 3741 encodings = ( 3742 # None meaning the IncrementalNewlineDecoder takes unicode input 3743 # rather than bytes input 3744 None, 'utf-8', 'latin-1', 3745 'utf-16', 'utf-16-le', 'utf-16-be', 3746 'utf-32', 'utf-32-le', 'utf-32-be', 3747 ) 3748 for enc in encodings: 3749 decoder = enc and codecs.getincrementaldecoder(enc)() 3750 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 3751 self.check_newline_decoding(decoder, enc) 3752 decoder = codecs.getincrementaldecoder("utf-8")() 3753 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 3754 self.check_newline_decoding_utf8(decoder) 3755 self.assertRaises(TypeError, decoder.setstate, 42) 3756 3757 def test_newline_bytes(self): 3758 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder 3759 def _check(dec): 3760 self.assertEqual(dec.newlines, None) 3761 self.assertEqual(dec.decode("\u0D00"), "\u0D00") 3762 self.assertEqual(dec.newlines, None) 3763 self.assertEqual(dec.decode("\u0A00"), "\u0A00") 3764 self.assertEqual(dec.newlines, None) 3765 dec = self.IncrementalNewlineDecoder(None, translate=False) 3766 _check(dec) 3767 dec = self.IncrementalNewlineDecoder(None, translate=True) 3768 _check(dec) 3769 3770 def test_translate(self): 3771 # issue 35062 3772 for translate in (-2, -1, 1, 2): 3773 decoder = codecs.getincrementaldecoder("utf-8")() 3774 decoder = self.IncrementalNewlineDecoder(decoder, translate) 3775 self.check_newline_decoding_utf8(decoder) 3776 decoder = codecs.getincrementaldecoder("utf-8")() 3777 decoder = self.IncrementalNewlineDecoder(decoder, translate=0) 3778 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n") 3779 3780class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 3781 pass 3782 3783class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 3784 pass 3785 3786 3787# XXX Tests for open() 3788 3789class MiscIOTest(unittest.TestCase): 3790 3791 def tearDown(self): 3792 support.unlink(support.TESTFN) 3793 3794 def test___all__(self): 3795 for name in self.io.__all__: 3796 obj = getattr(self.io, name, None) 3797 self.assertIsNotNone(obj, name) 3798 if name == "open": 3799 continue 3800 elif "error" in name.lower() or name == "UnsupportedOperation": 3801 self.assertTrue(issubclass(obj, Exception), name) 3802 elif not name.startswith("SEEK_"): 3803 self.assertTrue(issubclass(obj, self.IOBase)) 3804 3805 def test_attributes(self): 3806 f = self.open(support.TESTFN, "wb", buffering=0) 3807 self.assertEqual(f.mode, "wb") 3808 f.close() 3809 3810 with support.check_warnings(('', DeprecationWarning)): 3811 f = self.open(support.TESTFN, "U") 3812 self.assertEqual(f.name, support.TESTFN) 3813 self.assertEqual(f.buffer.name, support.TESTFN) 3814 self.assertEqual(f.buffer.raw.name, support.TESTFN) 3815 self.assertEqual(f.mode, "U") 3816 self.assertEqual(f.buffer.mode, "rb") 3817 self.assertEqual(f.buffer.raw.mode, "rb") 3818 f.close() 3819 3820 f = self.open(support.TESTFN, "w+") 3821 self.assertEqual(f.mode, "w+") 3822 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter? 3823 self.assertEqual(f.buffer.raw.mode, "rb+") 3824 3825 g = self.open(f.fileno(), "wb", closefd=False) 3826 self.assertEqual(g.mode, "wb") 3827 self.assertEqual(g.raw.mode, "wb") 3828 self.assertEqual(g.name, f.fileno()) 3829 self.assertEqual(g.raw.name, f.fileno()) 3830 f.close() 3831 g.close() 3832 3833 def test_io_after_close(self): 3834 for kwargs in [ 3835 {"mode": "w"}, 3836 {"mode": "wb"}, 3837 {"mode": "w", "buffering": 1}, 3838 {"mode": "w", "buffering": 2}, 3839 {"mode": "wb", "buffering": 0}, 3840 {"mode": "r"}, 3841 {"mode": "rb"}, 3842 {"mode": "r", "buffering": 1}, 3843 {"mode": "r", "buffering": 2}, 3844 {"mode": "rb", "buffering": 0}, 3845 {"mode": "w+"}, 3846 {"mode": "w+b"}, 3847 {"mode": "w+", "buffering": 1}, 3848 {"mode": "w+", "buffering": 2}, 3849 {"mode": "w+b", "buffering": 0}, 3850 ]: 3851 f = self.open(support.TESTFN, **kwargs) 3852 f.close() 3853 self.assertRaises(ValueError, f.flush) 3854 self.assertRaises(ValueError, f.fileno) 3855 self.assertRaises(ValueError, f.isatty) 3856 self.assertRaises(ValueError, f.__iter__) 3857 if hasattr(f, "peek"): 3858 self.assertRaises(ValueError, f.peek, 1) 3859 self.assertRaises(ValueError, f.read) 3860 if hasattr(f, "read1"): 3861 self.assertRaises(ValueError, f.read1, 1024) 3862 self.assertRaises(ValueError, f.read1) 3863 if hasattr(f, "readall"): 3864 self.assertRaises(ValueError, f.readall) 3865 if hasattr(f, "readinto"): 3866 self.assertRaises(ValueError, f.readinto, bytearray(1024)) 3867 if hasattr(f, "readinto1"): 3868 self.assertRaises(ValueError, f.readinto1, bytearray(1024)) 3869 self.assertRaises(ValueError, f.readline) 3870 self.assertRaises(ValueError, f.readlines) 3871 self.assertRaises(ValueError, f.readlines, 1) 3872 self.assertRaises(ValueError, f.seek, 0) 3873 self.assertRaises(ValueError, f.tell) 3874 self.assertRaises(ValueError, f.truncate) 3875 self.assertRaises(ValueError, f.write, 3876 b"" if "b" in kwargs['mode'] else "") 3877 self.assertRaises(ValueError, f.writelines, []) 3878 self.assertRaises(ValueError, next, f) 3879 3880 def test_blockingioerror(self): 3881 # Various BlockingIOError issues 3882 class C(str): 3883 pass 3884 c = C("") 3885 b = self.BlockingIOError(1, c) 3886 c.b = b 3887 b.c = c 3888 wr = weakref.ref(c) 3889 del c, b 3890 support.gc_collect() 3891 self.assertIsNone(wr(), wr) 3892 3893 def test_abcs(self): 3894 # Test the visible base classes are ABCs. 3895 self.assertIsInstance(self.IOBase, abc.ABCMeta) 3896 self.assertIsInstance(self.RawIOBase, abc.ABCMeta) 3897 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta) 3898 self.assertIsInstance(self.TextIOBase, abc.ABCMeta) 3899 3900 def _check_abc_inheritance(self, abcmodule): 3901 with self.open(support.TESTFN, "wb", buffering=0) as f: 3902 self.assertIsInstance(f, abcmodule.IOBase) 3903 self.assertIsInstance(f, abcmodule.RawIOBase) 3904 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 3905 self.assertNotIsInstance(f, abcmodule.TextIOBase) 3906 with self.open(support.TESTFN, "wb") as f: 3907 self.assertIsInstance(f, abcmodule.IOBase) 3908 self.assertNotIsInstance(f, abcmodule.RawIOBase) 3909 self.assertIsInstance(f, abcmodule.BufferedIOBase) 3910 self.assertNotIsInstance(f, abcmodule.TextIOBase) 3911 with self.open(support.TESTFN, "w") as f: 3912 self.assertIsInstance(f, abcmodule.IOBase) 3913 self.assertNotIsInstance(f, abcmodule.RawIOBase) 3914 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 3915 self.assertIsInstance(f, abcmodule.TextIOBase) 3916 3917 def test_abc_inheritance(self): 3918 # Test implementations inherit from their respective ABCs 3919 self._check_abc_inheritance(self) 3920 3921 def test_abc_inheritance_official(self): 3922 # Test implementations inherit from the official ABCs of the 3923 # baseline "io" module. 3924 self._check_abc_inheritance(io) 3925 3926 def _check_warn_on_dealloc(self, *args, **kwargs): 3927 f = open(*args, **kwargs) 3928 r = repr(f) 3929 with self.assertWarns(ResourceWarning) as cm: 3930 f = None 3931 support.gc_collect() 3932 self.assertIn(r, str(cm.warning.args[0])) 3933 3934 def test_warn_on_dealloc(self): 3935 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0) 3936 self._check_warn_on_dealloc(support.TESTFN, "wb") 3937 self._check_warn_on_dealloc(support.TESTFN, "w") 3938 3939 def _check_warn_on_dealloc_fd(self, *args, **kwargs): 3940 fds = [] 3941 def cleanup_fds(): 3942 for fd in fds: 3943 try: 3944 os.close(fd) 3945 except OSError as e: 3946 if e.errno != errno.EBADF: 3947 raise 3948 self.addCleanup(cleanup_fds) 3949 r, w = os.pipe() 3950 fds += r, w 3951 self._check_warn_on_dealloc(r, *args, **kwargs) 3952 # When using closefd=False, there's no warning 3953 r, w = os.pipe() 3954 fds += r, w 3955 with support.check_no_resource_warning(self): 3956 open(r, *args, closefd=False, **kwargs) 3957 3958 def test_warn_on_dealloc_fd(self): 3959 self._check_warn_on_dealloc_fd("rb", buffering=0) 3960 self._check_warn_on_dealloc_fd("rb") 3961 self._check_warn_on_dealloc_fd("r") 3962 3963 3964 def test_pickling(self): 3965 # Pickling file objects is forbidden 3966 for kwargs in [ 3967 {"mode": "w"}, 3968 {"mode": "wb"}, 3969 {"mode": "wb", "buffering": 0}, 3970 {"mode": "r"}, 3971 {"mode": "rb"}, 3972 {"mode": "rb", "buffering": 0}, 3973 {"mode": "w+"}, 3974 {"mode": "w+b"}, 3975 {"mode": "w+b", "buffering": 0}, 3976 ]: 3977 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 3978 with self.open(support.TESTFN, **kwargs) as f: 3979 self.assertRaises(TypeError, pickle.dumps, f, protocol) 3980 3981 def test_nonblock_pipe_write_bigbuf(self): 3982 self._test_nonblock_pipe_write(16*1024) 3983 3984 def test_nonblock_pipe_write_smallbuf(self): 3985 self._test_nonblock_pipe_write(1024) 3986 3987 @unittest.skipUnless(hasattr(os, 'set_blocking'), 3988 'os.set_blocking() required for this test') 3989 def _test_nonblock_pipe_write(self, bufsize): 3990 sent = [] 3991 received = [] 3992 r, w = os.pipe() 3993 os.set_blocking(r, False) 3994 os.set_blocking(w, False) 3995 3996 # To exercise all code paths in the C implementation we need 3997 # to play with buffer sizes. For instance, if we choose a 3998 # buffer size less than or equal to _PIPE_BUF (4096 on Linux) 3999 # then we will never get a partial write of the buffer. 4000 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize) 4001 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize) 4002 4003 with rf, wf: 4004 for N in 9999, 73, 7574: 4005 try: 4006 i = 0 4007 while True: 4008 msg = bytes([i % 26 + 97]) * N 4009 sent.append(msg) 4010 wf.write(msg) 4011 i += 1 4012 4013 except self.BlockingIOError as e: 4014 self.assertEqual(e.args[0], errno.EAGAIN) 4015 self.assertEqual(e.args[2], e.characters_written) 4016 sent[-1] = sent[-1][:e.characters_written] 4017 received.append(rf.read()) 4018 msg = b'BLOCKED' 4019 wf.write(msg) 4020 sent.append(msg) 4021 4022 while True: 4023 try: 4024 wf.flush() 4025 break 4026 except self.BlockingIOError as e: 4027 self.assertEqual(e.args[0], errno.EAGAIN) 4028 self.assertEqual(e.args[2], e.characters_written) 4029 self.assertEqual(e.characters_written, 0) 4030 received.append(rf.read()) 4031 4032 received += iter(rf.read, None) 4033 4034 sent, received = b''.join(sent), b''.join(received) 4035 self.assertEqual(sent, received) 4036 self.assertTrue(wf.closed) 4037 self.assertTrue(rf.closed) 4038 4039 def test_create_fail(self): 4040 # 'x' mode fails if file is existing 4041 with self.open(support.TESTFN, 'w'): 4042 pass 4043 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x') 4044 4045 def test_create_writes(self): 4046 # 'x' mode opens for writing 4047 with self.open(support.TESTFN, 'xb') as f: 4048 f.write(b"spam") 4049 with self.open(support.TESTFN, 'rb') as f: 4050 self.assertEqual(b"spam", f.read()) 4051 4052 def test_open_allargs(self): 4053 # there used to be a buffer overflow in the parser for rawmode 4054 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+') 4055 4056 4057class CMiscIOTest(MiscIOTest): 4058 io = io 4059 4060 def test_readinto_buffer_overflow(self): 4061 # Issue #18025 4062 class BadReader(self.io.BufferedIOBase): 4063 def read(self, n=-1): 4064 return b'x' * 10**6 4065 bufio = BadReader() 4066 b = bytearray(2) 4067 self.assertRaises(ValueError, bufio.readinto, b) 4068 4069 def check_daemon_threads_shutdown_deadlock(self, stream_name): 4070 # Issue #23309: deadlocks at shutdown should be avoided when a 4071 # daemon thread and the main thread both write to a file. 4072 code = """if 1: 4073 import sys 4074 import time 4075 import threading 4076 from test.support import SuppressCrashReport 4077 4078 file = sys.{stream_name} 4079 4080 def run(): 4081 while True: 4082 file.write('.') 4083 file.flush() 4084 4085 crash = SuppressCrashReport() 4086 crash.__enter__() 4087 # don't call __exit__(): the crash occurs at Python shutdown 4088 4089 thread = threading.Thread(target=run) 4090 thread.daemon = True 4091 thread.start() 4092 4093 time.sleep(0.5) 4094 file.write('!') 4095 file.flush() 4096 """.format_map(locals()) 4097 res, _ = run_python_until_end("-c", code) 4098 err = res.err.decode() 4099 if res.rc != 0: 4100 # Failure: should be a fatal error 4101 self.assertIn("Fatal Python error: could not acquire lock " 4102 "for <_io.BufferedWriter name='<{stream_name}>'> " 4103 "at interpreter shutdown, possibly due to " 4104 "daemon threads".format_map(locals()), 4105 err) 4106 else: 4107 self.assertFalse(err.strip('.!')) 4108 4109 def test_daemon_threads_shutdown_stdout_deadlock(self): 4110 self.check_daemon_threads_shutdown_deadlock('stdout') 4111 4112 def test_daemon_threads_shutdown_stderr_deadlock(self): 4113 self.check_daemon_threads_shutdown_deadlock('stderr') 4114 4115 4116class PyMiscIOTest(MiscIOTest): 4117 io = pyio 4118 4119 4120@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.') 4121class SignalsTest(unittest.TestCase): 4122 4123 def setUp(self): 4124 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 4125 4126 def tearDown(self): 4127 signal.signal(signal.SIGALRM, self.oldalrm) 4128 4129 def alarm_interrupt(self, sig, frame): 4130 1/0 4131 4132 def check_interrupted_write(self, item, bytes, **fdopen_kwargs): 4133 """Check that a partial write, when it gets interrupted, properly 4134 invokes the signal handler, and bubbles up the exception raised 4135 in the latter.""" 4136 read_results = [] 4137 def _read(): 4138 s = os.read(r, 1) 4139 read_results.append(s) 4140 4141 t = threading.Thread(target=_read) 4142 t.daemon = True 4143 r, w = os.pipe() 4144 fdopen_kwargs["closefd"] = False 4145 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1) 4146 try: 4147 wio = self.io.open(w, **fdopen_kwargs) 4148 if hasattr(signal, 'pthread_sigmask'): 4149 # create the thread with SIGALRM signal blocked 4150 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM]) 4151 t.start() 4152 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM]) 4153 else: 4154 t.start() 4155 4156 # Fill the pipe enough that the write will be blocking. 4157 # It will be interrupted by the timer armed above. Since the 4158 # other thread has read one byte, the low-level write will 4159 # return with a successful (partial) result rather than an EINTR. 4160 # The buffered IO layer must check for pending signal 4161 # handlers, which in this case will invoke alarm_interrupt(). 4162 signal.alarm(1) 4163 try: 4164 self.assertRaises(ZeroDivisionError, wio.write, large_data) 4165 finally: 4166 signal.alarm(0) 4167 t.join() 4168 # We got one byte, get another one and check that it isn't a 4169 # repeat of the first one. 4170 read_results.append(os.read(r, 1)) 4171 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]]) 4172 finally: 4173 os.close(w) 4174 os.close(r) 4175 # This is deliberate. If we didn't close the file descriptor 4176 # before closing wio, wio would try to flush its internal 4177 # buffer, and block again. 4178 try: 4179 wio.close() 4180 except OSError as e: 4181 if e.errno != errno.EBADF: 4182 raise 4183 4184 def test_interrupted_write_unbuffered(self): 4185 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0) 4186 4187 def test_interrupted_write_buffered(self): 4188 self.check_interrupted_write(b"xy", b"xy", mode="wb") 4189 4190 def test_interrupted_write_text(self): 4191 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") 4192 4193 @support.no_tracing 4194 def check_reentrant_write(self, data, **fdopen_kwargs): 4195 def on_alarm(*args): 4196 # Will be called reentrantly from the same thread 4197 wio.write(data) 4198 1/0 4199 signal.signal(signal.SIGALRM, on_alarm) 4200 r, w = os.pipe() 4201 wio = self.io.open(w, **fdopen_kwargs) 4202 try: 4203 signal.alarm(1) 4204 # Either the reentrant call to wio.write() fails with RuntimeError, 4205 # or the signal handler raises ZeroDivisionError. 4206 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm: 4207 while 1: 4208 for i in range(100): 4209 wio.write(data) 4210 wio.flush() 4211 # Make sure the buffer doesn't fill up and block further writes 4212 os.read(r, len(data) * 100) 4213 exc = cm.exception 4214 if isinstance(exc, RuntimeError): 4215 self.assertTrue(str(exc).startswith("reentrant call"), str(exc)) 4216 finally: 4217 signal.alarm(0) 4218 wio.close() 4219 os.close(r) 4220 4221 def test_reentrant_write_buffered(self): 4222 self.check_reentrant_write(b"xy", mode="wb") 4223 4224 def test_reentrant_write_text(self): 4225 self.check_reentrant_write("xy", mode="w", encoding="ascii") 4226 4227 def check_interrupted_read_retry(self, decode, **fdopen_kwargs): 4228 """Check that a buffered read, when it gets interrupted (either 4229 returning a partial result or EINTR), properly invokes the signal 4230 handler and retries if the latter returned successfully.""" 4231 r, w = os.pipe() 4232 fdopen_kwargs["closefd"] = False 4233 def alarm_handler(sig, frame): 4234 os.write(w, b"bar") 4235 signal.signal(signal.SIGALRM, alarm_handler) 4236 try: 4237 rio = self.io.open(r, **fdopen_kwargs) 4238 os.write(w, b"foo") 4239 signal.alarm(1) 4240 # Expected behaviour: 4241 # - first raw read() returns partial b"foo" 4242 # - second raw read() returns EINTR 4243 # - third raw read() returns b"bar" 4244 self.assertEqual(decode(rio.read(6)), "foobar") 4245 finally: 4246 signal.alarm(0) 4247 rio.close() 4248 os.close(w) 4249 os.close(r) 4250 4251 def test_interrupted_read_retry_buffered(self): 4252 self.check_interrupted_read_retry(lambda x: x.decode('latin1'), 4253 mode="rb") 4254 4255 def test_interrupted_read_retry_text(self): 4256 self.check_interrupted_read_retry(lambda x: x, 4257 mode="r") 4258 4259 def check_interrupted_write_retry(self, item, **fdopen_kwargs): 4260 """Check that a buffered write, when it gets interrupted (either 4261 returning a partial result or EINTR), properly invokes the signal 4262 handler and retries if the latter returned successfully.""" 4263 select = support.import_module("select") 4264 4265 # A quantity that exceeds the buffer size of an anonymous pipe's 4266 # write end. 4267 N = support.PIPE_MAX_SIZE 4268 r, w = os.pipe() 4269 fdopen_kwargs["closefd"] = False 4270 4271 # We need a separate thread to read from the pipe and allow the 4272 # write() to finish. This thread is started after the SIGALRM is 4273 # received (forcing a first EINTR in write()). 4274 read_results = [] 4275 write_finished = False 4276 error = None 4277 def _read(): 4278 try: 4279 while not write_finished: 4280 while r in select.select([r], [], [], 1.0)[0]: 4281 s = os.read(r, 1024) 4282 read_results.append(s) 4283 except BaseException as exc: 4284 nonlocal error 4285 error = exc 4286 t = threading.Thread(target=_read) 4287 t.daemon = True 4288 def alarm1(sig, frame): 4289 signal.signal(signal.SIGALRM, alarm2) 4290 signal.alarm(1) 4291 def alarm2(sig, frame): 4292 t.start() 4293 4294 large_data = item * N 4295 signal.signal(signal.SIGALRM, alarm1) 4296 try: 4297 wio = self.io.open(w, **fdopen_kwargs) 4298 signal.alarm(1) 4299 # Expected behaviour: 4300 # - first raw write() is partial (because of the limited pipe buffer 4301 # and the first alarm) 4302 # - second raw write() returns EINTR (because of the second alarm) 4303 # - subsequent write()s are successful (either partial or complete) 4304 written = wio.write(large_data) 4305 self.assertEqual(N, written) 4306 4307 wio.flush() 4308 write_finished = True 4309 t.join() 4310 4311 self.assertIsNone(error) 4312 self.assertEqual(N, sum(len(x) for x in read_results)) 4313 finally: 4314 signal.alarm(0) 4315 write_finished = True 4316 os.close(w) 4317 os.close(r) 4318 # This is deliberate. If we didn't close the file descriptor 4319 # before closing wio, wio would try to flush its internal 4320 # buffer, and could block (in case of failure). 4321 try: 4322 wio.close() 4323 except OSError as e: 4324 if e.errno != errno.EBADF: 4325 raise 4326 4327 def test_interrupted_write_retry_buffered(self): 4328 self.check_interrupted_write_retry(b"x", mode="wb") 4329 4330 def test_interrupted_write_retry_text(self): 4331 self.check_interrupted_write_retry("x", mode="w", encoding="latin1") 4332 4333 4334class CSignalsTest(SignalsTest): 4335 io = io 4336 4337class PySignalsTest(SignalsTest): 4338 io = pyio 4339 4340 # Handling reentrancy issues would slow down _pyio even more, so the 4341 # tests are disabled. 4342 test_reentrant_write_buffered = None 4343 test_reentrant_write_text = None 4344 4345 4346def load_tests(*args): 4347 tests = (CIOTest, PyIOTest, APIMismatchTest, 4348 CBufferedReaderTest, PyBufferedReaderTest, 4349 CBufferedWriterTest, PyBufferedWriterTest, 4350 CBufferedRWPairTest, PyBufferedRWPairTest, 4351 CBufferedRandomTest, PyBufferedRandomTest, 4352 StatefulIncrementalDecoderTest, 4353 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest, 4354 CTextIOWrapperTest, PyTextIOWrapperTest, 4355 CMiscIOTest, PyMiscIOTest, 4356 CSignalsTest, PySignalsTest, 4357 ) 4358 4359 # Put the namespaces of the IO module we are testing and some useful mock 4360 # classes in the __dict__ of each test. 4361 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, 4362 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead, 4363 SlowFlushRawIO) 4364 all_members = io.__all__ + ["IncrementalNewlineDecoder"] 4365 c_io_ns = {name : getattr(io, name) for name in all_members} 4366 py_io_ns = {name : getattr(pyio, name) for name in all_members} 4367 globs = globals() 4368 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks) 4369 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks) 4370 # Avoid turning open into a bound method. 4371 py_io_ns["open"] = pyio.OpenWrapper 4372 for test in tests: 4373 if test.__name__.startswith("C"): 4374 for name, obj in c_io_ns.items(): 4375 setattr(test, name, obj) 4376 elif test.__name__.startswith("Py"): 4377 for name, obj in py_io_ns.items(): 4378 setattr(test, name, obj) 4379 4380 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests]) 4381 return suite 4382 4383if __name__ == "__main__": 4384 unittest.main() 4385