1"""Unit tests for the io module.""" 2 3# Tests of io are scattered over the test suite: 4# * test_bufio - tests file buffering 5# * test_memoryio - tests BytesIO and StringIO 6# * test_fileio - tests FileIO 7# * test_file - tests the file interface 8# * test_io - tests everything else in the io module 9# * test_univnewlines - tests universal newline support 10# * test_largefile - tests operations on a file greater than 2**32 bytes 11# (only enabled with -ulargefile) 12 13################################################################################ 14# ATTENTION TEST WRITERS!!! 15################################################################################ 16# When writing tests for io, it's important to test both the C and Python 17# implementations. This is usually done by writing a base test that refers to 18# the type it is testing as an attribute. Then it provides custom subclasses to 19# test both implementations. This file has lots of examples. 20################################################################################ 21 22import abc 23import array 24import errno 25import locale 26import os 27import pickle 28import random 29import signal 30import sys 31import sysconfig 32import threading 33import time 34import unittest 35import warnings 36import weakref 37from collections import deque, UserList 38from itertools import cycle, count 39from test import support 40from test.support.script_helper import assert_python_ok, run_python_until_end 41from test.support import FakePath 42 43import codecs 44import io # C implementation of io 45import _pyio as pyio # Python implementation of io 46 47try: 48 import ctypes 49except ImportError: 50 def byteslike(*pos, **kw): 51 return array.array("b", bytes(*pos, **kw)) 52else: 53 def byteslike(*pos, **kw): 54 """Create a bytes-like object having no string or sequence methods""" 55 data = bytes(*pos, **kw) 56 obj = EmptyStruct() 57 ctypes.resize(obj, len(data)) 58 memoryview(obj).cast("B")[:] = data 59 return obj 60 class EmptyStruct(ctypes.Structure): 61 pass 62 63_cflags = sysconfig.get_config_var('CFLAGS') or '' 64_config_args = sysconfig.get_config_var('CONFIG_ARGS') or '' 65MEMORY_SANITIZER = ( 66 '-fsanitize=memory' in _cflags or 67 '--with-memory-sanitizer' in _config_args 68) 69 70# Does io.IOBase finalizer log the exception if the close() method fails? 71# The exception is ignored silently by default in release build. 72IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode) 73 74 75def _default_chunk_size(): 76 """Get the default TextIOWrapper chunk size""" 77 with open(__file__, "r", encoding="latin-1") as f: 78 return f._CHUNK_SIZE 79 80 81class MockRawIOWithoutRead: 82 """A RawIO implementation without read(), so as to exercise the default 83 RawIO.read() which calls readinto().""" 84 85 def __init__(self, read_stack=()): 86 self._read_stack = list(read_stack) 87 self._write_stack = [] 88 self._reads = 0 89 self._extraneous_reads = 0 90 91 def write(self, b): 92 self._write_stack.append(bytes(b)) 93 return len(b) 94 95 def writable(self): 96 return True 97 98 def fileno(self): 99 return 42 100 101 def readable(self): 102 return True 103 104 def seekable(self): 105 return True 106 107 def seek(self, pos, whence): 108 return 0 # wrong but we gotta return something 109 110 def tell(self): 111 return 0 # same comment as above 112 113 def readinto(self, buf): 114 self._reads += 1 115 max_len = len(buf) 116 try: 117 data = self._read_stack[0] 118 except IndexError: 119 self._extraneous_reads += 1 120 return 0 121 if data is None: 122 del self._read_stack[0] 123 return None 124 n = len(data) 125 if len(data) <= max_len: 126 del self._read_stack[0] 127 buf[:n] = data 128 return n 129 else: 130 buf[:] = data[:max_len] 131 self._read_stack[0] = data[max_len:] 132 return max_len 133 134 def truncate(self, pos=None): 135 return pos 136 137class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase): 138 pass 139 140class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase): 141 pass 142 143 144class MockRawIO(MockRawIOWithoutRead): 145 146 def read(self, n=None): 147 self._reads += 1 148 try: 149 return self._read_stack.pop(0) 150 except: 151 self._extraneous_reads += 1 152 return b"" 153 154class CMockRawIO(MockRawIO, io.RawIOBase): 155 pass 156 157class PyMockRawIO(MockRawIO, pyio.RawIOBase): 158 pass 159 160 161class MisbehavedRawIO(MockRawIO): 162 def write(self, b): 163 return super().write(b) * 2 164 165 def read(self, n=None): 166 return super().read(n) * 2 167 168 def seek(self, pos, whence): 169 return -123 170 171 def tell(self): 172 return -456 173 174 def readinto(self, buf): 175 super().readinto(buf) 176 return len(buf) * 5 177 178class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase): 179 pass 180 181class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase): 182 pass 183 184 185class SlowFlushRawIO(MockRawIO): 186 def __init__(self): 187 super().__init__() 188 self.in_flush = threading.Event() 189 190 def flush(self): 191 self.in_flush.set() 192 time.sleep(0.25) 193 194class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase): 195 pass 196 197class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase): 198 pass 199 200 201class CloseFailureIO(MockRawIO): 202 closed = 0 203 204 def close(self): 205 if not self.closed: 206 self.closed = 1 207 raise OSError 208 209class CCloseFailureIO(CloseFailureIO, io.RawIOBase): 210 pass 211 212class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase): 213 pass 214 215 216class MockFileIO: 217 218 def __init__(self, data): 219 self.read_history = [] 220 super().__init__(data) 221 222 def read(self, n=None): 223 res = super().read(n) 224 self.read_history.append(None if res is None else len(res)) 225 return res 226 227 def readinto(self, b): 228 res = super().readinto(b) 229 self.read_history.append(res) 230 return res 231 232class CMockFileIO(MockFileIO, io.BytesIO): 233 pass 234 235class PyMockFileIO(MockFileIO, pyio.BytesIO): 236 pass 237 238 239class MockUnseekableIO: 240 def seekable(self): 241 return False 242 243 def seek(self, *args): 244 raise self.UnsupportedOperation("not seekable") 245 246 def tell(self, *args): 247 raise self.UnsupportedOperation("not seekable") 248 249 def truncate(self, *args): 250 raise self.UnsupportedOperation("not seekable") 251 252class CMockUnseekableIO(MockUnseekableIO, io.BytesIO): 253 UnsupportedOperation = io.UnsupportedOperation 254 255class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO): 256 UnsupportedOperation = pyio.UnsupportedOperation 257 258 259class MockNonBlockWriterIO: 260 261 def __init__(self): 262 self._write_stack = [] 263 self._blocker_char = None 264 265 def pop_written(self): 266 s = b"".join(self._write_stack) 267 self._write_stack[:] = [] 268 return s 269 270 def block_on(self, char): 271 """Block when a given char is encountered.""" 272 self._blocker_char = char 273 274 def readable(self): 275 return True 276 277 def seekable(self): 278 return True 279 280 def seek(self, pos, whence=0): 281 # naive implementation, enough for tests 282 return 0 283 284 def writable(self): 285 return True 286 287 def write(self, b): 288 b = bytes(b) 289 n = -1 290 if self._blocker_char: 291 try: 292 n = b.index(self._blocker_char) 293 except ValueError: 294 pass 295 else: 296 if n > 0: 297 # write data up to the first blocker 298 self._write_stack.append(b[:n]) 299 return n 300 else: 301 # cancel blocker and indicate would block 302 self._blocker_char = None 303 return None 304 self._write_stack.append(b) 305 return len(b) 306 307class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase): 308 BlockingIOError = io.BlockingIOError 309 310class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): 311 BlockingIOError = pyio.BlockingIOError 312 313 314class IOTest(unittest.TestCase): 315 316 def setUp(self): 317 support.unlink(support.TESTFN) 318 319 def tearDown(self): 320 support.unlink(support.TESTFN) 321 322 def write_ops(self, f): 323 self.assertEqual(f.write(b"blah."), 5) 324 f.truncate(0) 325 self.assertEqual(f.tell(), 5) 326 f.seek(0) 327 328 self.assertEqual(f.write(b"blah."), 5) 329 self.assertEqual(f.seek(0), 0) 330 self.assertEqual(f.write(b"Hello."), 6) 331 self.assertEqual(f.tell(), 6) 332 self.assertEqual(f.seek(-1, 1), 5) 333 self.assertEqual(f.tell(), 5) 334 buffer = bytearray(b" world\n\n\n") 335 self.assertEqual(f.write(buffer), 9) 336 buffer[:] = b"*" * 9 # Overwrite our copy of the data 337 self.assertEqual(f.seek(0), 0) 338 self.assertEqual(f.write(b"h"), 1) 339 self.assertEqual(f.seek(-1, 2), 13) 340 self.assertEqual(f.tell(), 13) 341 342 self.assertEqual(f.truncate(12), 12) 343 self.assertEqual(f.tell(), 13) 344 self.assertRaises(TypeError, f.seek, 0.0) 345 346 def read_ops(self, f, buffered=False): 347 data = f.read(5) 348 self.assertEqual(data, b"hello") 349 data = byteslike(data) 350 self.assertEqual(f.readinto(data), 5) 351 self.assertEqual(bytes(data), b" worl") 352 data = bytearray(5) 353 self.assertEqual(f.readinto(data), 2) 354 self.assertEqual(len(data), 5) 355 self.assertEqual(data[:2], b"d\n") 356 self.assertEqual(f.seek(0), 0) 357 self.assertEqual(f.read(20), b"hello world\n") 358 self.assertEqual(f.read(1), b"") 359 self.assertEqual(f.readinto(byteslike(b"x")), 0) 360 self.assertEqual(f.seek(-6, 2), 6) 361 self.assertEqual(f.read(5), b"world") 362 self.assertEqual(f.read(0), b"") 363 self.assertEqual(f.readinto(byteslike()), 0) 364 self.assertEqual(f.seek(-6, 1), 5) 365 self.assertEqual(f.read(5), b" worl") 366 self.assertEqual(f.tell(), 10) 367 self.assertRaises(TypeError, f.seek, 0.0) 368 if buffered: 369 f.seek(0) 370 self.assertEqual(f.read(), b"hello world\n") 371 f.seek(6) 372 self.assertEqual(f.read(), b"world\n") 373 self.assertEqual(f.read(), b"") 374 f.seek(0) 375 data = byteslike(5) 376 self.assertEqual(f.readinto1(data), 5) 377 self.assertEqual(bytes(data), b"hello") 378 379 LARGE = 2**31 380 381 def large_file_ops(self, f): 382 assert f.readable() 383 assert f.writable() 384 try: 385 self.assertEqual(f.seek(self.LARGE), self.LARGE) 386 except (OverflowError, ValueError): 387 self.skipTest("no largefile support") 388 self.assertEqual(f.tell(), self.LARGE) 389 self.assertEqual(f.write(b"xxx"), 3) 390 self.assertEqual(f.tell(), self.LARGE + 3) 391 self.assertEqual(f.seek(-1, 1), self.LARGE + 2) 392 self.assertEqual(f.truncate(), self.LARGE + 2) 393 self.assertEqual(f.tell(), self.LARGE + 2) 394 self.assertEqual(f.seek(0, 2), self.LARGE + 2) 395 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1) 396 self.assertEqual(f.tell(), self.LARGE + 2) 397 self.assertEqual(f.seek(0, 2), self.LARGE + 1) 398 self.assertEqual(f.seek(-1, 2), self.LARGE) 399 self.assertEqual(f.read(2), b"x") 400 401 def test_invalid_operations(self): 402 # Try writing on a file opened in read mode and vice-versa. 403 exc = self.UnsupportedOperation 404 for mode in ("w", "wb"): 405 with self.open(support.TESTFN, mode) as fp: 406 self.assertRaises(exc, fp.read) 407 self.assertRaises(exc, fp.readline) 408 with self.open(support.TESTFN, "wb", buffering=0) as fp: 409 self.assertRaises(exc, fp.read) 410 self.assertRaises(exc, fp.readline) 411 with self.open(support.TESTFN, "rb", buffering=0) as fp: 412 self.assertRaises(exc, fp.write, b"blah") 413 self.assertRaises(exc, fp.writelines, [b"blah\n"]) 414 with self.open(support.TESTFN, "rb") as fp: 415 self.assertRaises(exc, fp.write, b"blah") 416 self.assertRaises(exc, fp.writelines, [b"blah\n"]) 417 with self.open(support.TESTFN, "r") as fp: 418 self.assertRaises(exc, fp.write, "blah") 419 self.assertRaises(exc, fp.writelines, ["blah\n"]) 420 # Non-zero seeking from current or end pos 421 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR) 422 self.assertRaises(exc, fp.seek, -1, self.SEEK_END) 423 424 def test_optional_abilities(self): 425 # Test for OSError when optional APIs are not supported 426 # The purpose of this test is to try fileno(), reading, writing and 427 # seeking operations with various objects that indicate they do not 428 # support these operations. 429 430 def pipe_reader(): 431 [r, w] = os.pipe() 432 os.close(w) # So that read() is harmless 433 return self.FileIO(r, "r") 434 435 def pipe_writer(): 436 [r, w] = os.pipe() 437 self.addCleanup(os.close, r) 438 # Guarantee that we can write into the pipe without blocking 439 thread = threading.Thread(target=os.read, args=(r, 100)) 440 thread.start() 441 self.addCleanup(thread.join) 442 return self.FileIO(w, "w") 443 444 def buffered_reader(): 445 return self.BufferedReader(self.MockUnseekableIO()) 446 447 def buffered_writer(): 448 return self.BufferedWriter(self.MockUnseekableIO()) 449 450 def buffered_random(): 451 return self.BufferedRandom(self.BytesIO()) 452 453 def buffered_rw_pair(): 454 return self.BufferedRWPair(self.MockUnseekableIO(), 455 self.MockUnseekableIO()) 456 457 def text_reader(): 458 class UnseekableReader(self.MockUnseekableIO): 459 writable = self.BufferedIOBase.writable 460 write = self.BufferedIOBase.write 461 return self.TextIOWrapper(UnseekableReader(), "ascii") 462 463 def text_writer(): 464 class UnseekableWriter(self.MockUnseekableIO): 465 readable = self.BufferedIOBase.readable 466 read = self.BufferedIOBase.read 467 return self.TextIOWrapper(UnseekableWriter(), "ascii") 468 469 tests = ( 470 (pipe_reader, "fr"), (pipe_writer, "fw"), 471 (buffered_reader, "r"), (buffered_writer, "w"), 472 (buffered_random, "rws"), (buffered_rw_pair, "rw"), 473 (text_reader, "r"), (text_writer, "w"), 474 (self.BytesIO, "rws"), (self.StringIO, "rws"), 475 ) 476 for [test, abilities] in tests: 477 with self.subTest(test), test() as obj: 478 readable = "r" in abilities 479 self.assertEqual(obj.readable(), readable) 480 writable = "w" in abilities 481 self.assertEqual(obj.writable(), writable) 482 483 if isinstance(obj, self.TextIOBase): 484 data = "3" 485 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)): 486 data = b"3" 487 else: 488 self.fail("Unknown base class") 489 490 if "f" in abilities: 491 obj.fileno() 492 else: 493 self.assertRaises(OSError, obj.fileno) 494 495 if readable: 496 obj.read(1) 497 obj.read() 498 else: 499 self.assertRaises(OSError, obj.read, 1) 500 self.assertRaises(OSError, obj.read) 501 502 if writable: 503 obj.write(data) 504 else: 505 self.assertRaises(OSError, obj.write, data) 506 507 if sys.platform.startswith("win") and test in ( 508 pipe_reader, pipe_writer): 509 # Pipes seem to appear as seekable on Windows 510 continue 511 seekable = "s" in abilities 512 self.assertEqual(obj.seekable(), seekable) 513 514 if seekable: 515 obj.tell() 516 obj.seek(0) 517 else: 518 self.assertRaises(OSError, obj.tell) 519 self.assertRaises(OSError, obj.seek, 0) 520 521 if writable and seekable: 522 obj.truncate() 523 obj.truncate(0) 524 else: 525 self.assertRaises(OSError, obj.truncate) 526 self.assertRaises(OSError, obj.truncate, 0) 527 528 def test_open_handles_NUL_chars(self): 529 fn_with_NUL = 'foo\0bar' 530 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w') 531 532 bytes_fn = bytes(fn_with_NUL, 'ascii') 533 with warnings.catch_warnings(): 534 warnings.simplefilter("ignore", DeprecationWarning) 535 self.assertRaises(ValueError, self.open, bytes_fn, 'w') 536 537 def test_raw_file_io(self): 538 with self.open(support.TESTFN, "wb", buffering=0) as f: 539 self.assertEqual(f.readable(), False) 540 self.assertEqual(f.writable(), True) 541 self.assertEqual(f.seekable(), True) 542 self.write_ops(f) 543 with self.open(support.TESTFN, "rb", buffering=0) as f: 544 self.assertEqual(f.readable(), True) 545 self.assertEqual(f.writable(), False) 546 self.assertEqual(f.seekable(), True) 547 self.read_ops(f) 548 549 def test_buffered_file_io(self): 550 with self.open(support.TESTFN, "wb") as f: 551 self.assertEqual(f.readable(), False) 552 self.assertEqual(f.writable(), True) 553 self.assertEqual(f.seekable(), True) 554 self.write_ops(f) 555 with self.open(support.TESTFN, "rb") as f: 556 self.assertEqual(f.readable(), True) 557 self.assertEqual(f.writable(), False) 558 self.assertEqual(f.seekable(), True) 559 self.read_ops(f, True) 560 561 def test_readline(self): 562 with self.open(support.TESTFN, "wb") as f: 563 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line") 564 with self.open(support.TESTFN, "rb") as f: 565 self.assertEqual(f.readline(), b"abc\n") 566 self.assertEqual(f.readline(10), b"def\n") 567 self.assertEqual(f.readline(2), b"xy") 568 self.assertEqual(f.readline(4), b"zzy\n") 569 self.assertEqual(f.readline(), b"foo\x00bar\n") 570 self.assertEqual(f.readline(None), b"another line") 571 self.assertRaises(TypeError, f.readline, 5.3) 572 with self.open(support.TESTFN, "r") as f: 573 self.assertRaises(TypeError, f.readline, 5.3) 574 575 def test_readline_nonsizeable(self): 576 # Issue #30061 577 # Crash when readline() returns an object without __len__ 578 class R(self.IOBase): 579 def readline(self): 580 return None 581 self.assertRaises((TypeError, StopIteration), next, R()) 582 583 def test_next_nonsizeable(self): 584 # Issue #30061 585 # Crash when __next__() returns an object without __len__ 586 class R(self.IOBase): 587 def __next__(self): 588 return None 589 self.assertRaises(TypeError, R().readlines, 1) 590 591 def test_raw_bytes_io(self): 592 f = self.BytesIO() 593 self.write_ops(f) 594 data = f.getvalue() 595 self.assertEqual(data, b"hello world\n") 596 f = self.BytesIO(data) 597 self.read_ops(f, True) 598 599 def test_large_file_ops(self): 600 # On Windows and Mac OSX this test consumes large resources; It takes 601 # a long time to build the >2 GiB file and takes >2 GiB of disk space 602 # therefore the resource must be enabled to run this test. 603 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 604 support.requires( 605 'largefile', 606 'test requires %s bytes and a long time to run' % self.LARGE) 607 with self.open(support.TESTFN, "w+b", 0) as f: 608 self.large_file_ops(f) 609 with self.open(support.TESTFN, "w+b") as f: 610 self.large_file_ops(f) 611 612 def test_with_open(self): 613 for bufsize in (0, 100): 614 f = None 615 with self.open(support.TESTFN, "wb", bufsize) as f: 616 f.write(b"xxx") 617 self.assertEqual(f.closed, True) 618 f = None 619 try: 620 with self.open(support.TESTFN, "wb", bufsize) as f: 621 1/0 622 except ZeroDivisionError: 623 self.assertEqual(f.closed, True) 624 else: 625 self.fail("1/0 didn't raise an exception") 626 627 # issue 5008 628 def test_append_mode_tell(self): 629 with self.open(support.TESTFN, "wb") as f: 630 f.write(b"xxx") 631 with self.open(support.TESTFN, "ab", buffering=0) as f: 632 self.assertEqual(f.tell(), 3) 633 with self.open(support.TESTFN, "ab") as f: 634 self.assertEqual(f.tell(), 3) 635 with self.open(support.TESTFN, "a") as f: 636 self.assertGreater(f.tell(), 0) 637 638 def test_destructor(self): 639 record = [] 640 class MyFileIO(self.FileIO): 641 def __del__(self): 642 record.append(1) 643 try: 644 f = super().__del__ 645 except AttributeError: 646 pass 647 else: 648 f() 649 def close(self): 650 record.append(2) 651 super().close() 652 def flush(self): 653 record.append(3) 654 super().flush() 655 with support.check_warnings(('', ResourceWarning)): 656 f = MyFileIO(support.TESTFN, "wb") 657 f.write(b"xxx") 658 del f 659 support.gc_collect() 660 self.assertEqual(record, [1, 2, 3]) 661 with self.open(support.TESTFN, "rb") as f: 662 self.assertEqual(f.read(), b"xxx") 663 664 def _check_base_destructor(self, base): 665 record = [] 666 class MyIO(base): 667 def __init__(self): 668 # This exercises the availability of attributes on object 669 # destruction. 670 # (in the C version, close() is called by the tp_dealloc 671 # function, not by __del__) 672 self.on_del = 1 673 self.on_close = 2 674 self.on_flush = 3 675 def __del__(self): 676 record.append(self.on_del) 677 try: 678 f = super().__del__ 679 except AttributeError: 680 pass 681 else: 682 f() 683 def close(self): 684 record.append(self.on_close) 685 super().close() 686 def flush(self): 687 record.append(self.on_flush) 688 super().flush() 689 f = MyIO() 690 del f 691 support.gc_collect() 692 self.assertEqual(record, [1, 2, 3]) 693 694 def test_IOBase_destructor(self): 695 self._check_base_destructor(self.IOBase) 696 697 def test_RawIOBase_destructor(self): 698 self._check_base_destructor(self.RawIOBase) 699 700 def test_BufferedIOBase_destructor(self): 701 self._check_base_destructor(self.BufferedIOBase) 702 703 def test_TextIOBase_destructor(self): 704 self._check_base_destructor(self.TextIOBase) 705 706 def test_close_flushes(self): 707 with self.open(support.TESTFN, "wb") as f: 708 f.write(b"xxx") 709 with self.open(support.TESTFN, "rb") as f: 710 self.assertEqual(f.read(), b"xxx") 711 712 def test_array_writes(self): 713 a = array.array('i', range(10)) 714 n = len(a.tobytes()) 715 def check(f): 716 with f: 717 self.assertEqual(f.write(a), n) 718 f.writelines((a,)) 719 check(self.BytesIO()) 720 check(self.FileIO(support.TESTFN, "w")) 721 check(self.BufferedWriter(self.MockRawIO())) 722 check(self.BufferedRandom(self.MockRawIO())) 723 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())) 724 725 def test_closefd(self): 726 self.assertRaises(ValueError, self.open, support.TESTFN, 'w', 727 closefd=False) 728 729 def test_read_closed(self): 730 with self.open(support.TESTFN, "w") as f: 731 f.write("egg\n") 732 with self.open(support.TESTFN, "r") as f: 733 file = self.open(f.fileno(), "r", closefd=False) 734 self.assertEqual(file.read(), "egg\n") 735 file.seek(0) 736 file.close() 737 self.assertRaises(ValueError, file.read) 738 with self.open(support.TESTFN, "rb") as f: 739 file = self.open(f.fileno(), "rb", closefd=False) 740 self.assertEqual(file.read()[:3], b"egg") 741 file.close() 742 self.assertRaises(ValueError, file.readinto, bytearray(1)) 743 744 def test_no_closefd_with_filename(self): 745 # can't use closefd in combination with a file name 746 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False) 747 748 def test_closefd_attr(self): 749 with self.open(support.TESTFN, "wb") as f: 750 f.write(b"egg\n") 751 with self.open(support.TESTFN, "r") as f: 752 self.assertEqual(f.buffer.raw.closefd, True) 753 file = self.open(f.fileno(), "r", closefd=False) 754 self.assertEqual(file.buffer.raw.closefd, False) 755 756 def test_garbage_collection(self): 757 # FileIO objects are collected, and collecting them flushes 758 # all data to disk. 759 with support.check_warnings(('', ResourceWarning)): 760 f = self.FileIO(support.TESTFN, "wb") 761 f.write(b"abcxxx") 762 f.f = f 763 wr = weakref.ref(f) 764 del f 765 support.gc_collect() 766 self.assertIsNone(wr(), wr) 767 with self.open(support.TESTFN, "rb") as f: 768 self.assertEqual(f.read(), b"abcxxx") 769 770 def test_unbounded_file(self): 771 # Issue #1174606: reading from an unbounded stream such as /dev/zero. 772 zero = "/dev/zero" 773 if not os.path.exists(zero): 774 self.skipTest("{0} does not exist".format(zero)) 775 if sys.maxsize > 0x7FFFFFFF: 776 self.skipTest("test can only run in a 32-bit address space") 777 if support.real_max_memuse < support._2G: 778 self.skipTest("test requires at least 2 GiB of memory") 779 with self.open(zero, "rb", buffering=0) as f: 780 self.assertRaises(OverflowError, f.read) 781 with self.open(zero, "rb") as f: 782 self.assertRaises(OverflowError, f.read) 783 with self.open(zero, "r") as f: 784 self.assertRaises(OverflowError, f.read) 785 786 def check_flush_error_on_close(self, *args, **kwargs): 787 # Test that the file is closed despite failed flush 788 # and that flush() is called before file closed. 789 f = self.open(*args, **kwargs) 790 closed = [] 791 def bad_flush(): 792 closed[:] = [f.closed] 793 raise OSError() 794 f.flush = bad_flush 795 self.assertRaises(OSError, f.close) # exception not swallowed 796 self.assertTrue(f.closed) 797 self.assertTrue(closed) # flush() called 798 self.assertFalse(closed[0]) # flush() called before file closed 799 f.flush = lambda: None # break reference loop 800 801 def test_flush_error_on_close(self): 802 # raw file 803 # Issue #5700: io.FileIO calls flush() after file closed 804 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0) 805 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 806 self.check_flush_error_on_close(fd, 'wb', buffering=0) 807 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 808 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False) 809 os.close(fd) 810 # buffered io 811 self.check_flush_error_on_close(support.TESTFN, 'wb') 812 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 813 self.check_flush_error_on_close(fd, 'wb') 814 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 815 self.check_flush_error_on_close(fd, 'wb', closefd=False) 816 os.close(fd) 817 # text io 818 self.check_flush_error_on_close(support.TESTFN, 'w') 819 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 820 self.check_flush_error_on_close(fd, 'w') 821 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 822 self.check_flush_error_on_close(fd, 'w', closefd=False) 823 os.close(fd) 824 825 def test_multi_close(self): 826 f = self.open(support.TESTFN, "wb", buffering=0) 827 f.close() 828 f.close() 829 f.close() 830 self.assertRaises(ValueError, f.flush) 831 832 def test_RawIOBase_read(self): 833 # Exercise the default limited RawIOBase.read(n) implementation (which 834 # calls readinto() internally). 835 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None)) 836 self.assertEqual(rawio.read(2), b"ab") 837 self.assertEqual(rawio.read(2), b"c") 838 self.assertEqual(rawio.read(2), b"d") 839 self.assertEqual(rawio.read(2), None) 840 self.assertEqual(rawio.read(2), b"ef") 841 self.assertEqual(rawio.read(2), b"g") 842 self.assertEqual(rawio.read(2), None) 843 self.assertEqual(rawio.read(2), b"") 844 845 def test_types_have_dict(self): 846 test = ( 847 self.IOBase(), 848 self.RawIOBase(), 849 self.TextIOBase(), 850 self.StringIO(), 851 self.BytesIO() 852 ) 853 for obj in test: 854 self.assertTrue(hasattr(obj, "__dict__")) 855 856 def test_opener(self): 857 with self.open(support.TESTFN, "w") as f: 858 f.write("egg\n") 859 fd = os.open(support.TESTFN, os.O_RDONLY) 860 def opener(path, flags): 861 return fd 862 with self.open("non-existent", "r", opener=opener) as f: 863 self.assertEqual(f.read(), "egg\n") 864 865 def test_bad_opener_negative_1(self): 866 # Issue #27066. 867 def badopener(fname, flags): 868 return -1 869 with self.assertRaises(ValueError) as cm: 870 open('non-existent', 'r', opener=badopener) 871 self.assertEqual(str(cm.exception), 'opener returned -1') 872 873 def test_bad_opener_other_negative(self): 874 # Issue #27066. 875 def badopener(fname, flags): 876 return -2 877 with self.assertRaises(ValueError) as cm: 878 open('non-existent', 'r', opener=badopener) 879 self.assertEqual(str(cm.exception), 'opener returned -2') 880 881 def test_fileio_closefd(self): 882 # Issue #4841 883 with self.open(__file__, 'rb') as f1, \ 884 self.open(__file__, 'rb') as f2: 885 fileio = self.FileIO(f1.fileno(), closefd=False) 886 # .__init__() must not close f1 887 fileio.__init__(f2.fileno(), closefd=False) 888 f1.readline() 889 # .close() must not close f2 890 fileio.close() 891 f2.readline() 892 893 def test_nonbuffered_textio(self): 894 with support.check_no_resource_warning(self): 895 with self.assertRaises(ValueError): 896 self.open(support.TESTFN, 'w', buffering=0) 897 898 def test_invalid_newline(self): 899 with support.check_no_resource_warning(self): 900 with self.assertRaises(ValueError): 901 self.open(support.TESTFN, 'w', newline='invalid') 902 903 def test_buffered_readinto_mixin(self): 904 # Test the implementation provided by BufferedIOBase 905 class Stream(self.BufferedIOBase): 906 def read(self, size): 907 return b"12345" 908 read1 = read 909 stream = Stream() 910 for method in ("readinto", "readinto1"): 911 with self.subTest(method): 912 buffer = byteslike(5) 913 self.assertEqual(getattr(stream, method)(buffer), 5) 914 self.assertEqual(bytes(buffer), b"12345") 915 916 def test_fspath_support(self): 917 def check_path_succeeds(path): 918 with self.open(path, "w") as f: 919 f.write("egg\n") 920 921 with self.open(path, "r") as f: 922 self.assertEqual(f.read(), "egg\n") 923 924 check_path_succeeds(FakePath(support.TESTFN)) 925 check_path_succeeds(FakePath(support.TESTFN.encode('utf-8'))) 926 927 with self.open(support.TESTFN, "w") as f: 928 bad_path = FakePath(f.fileno()) 929 with self.assertRaises(TypeError): 930 self.open(bad_path, 'w') 931 932 bad_path = FakePath(None) 933 with self.assertRaises(TypeError): 934 self.open(bad_path, 'w') 935 936 bad_path = FakePath(FloatingPointError) 937 with self.assertRaises(FloatingPointError): 938 self.open(bad_path, 'w') 939 940 # ensure that refcounting is correct with some error conditions 941 with self.assertRaisesRegex(ValueError, 'read/write/append mode'): 942 self.open(FakePath(support.TESTFN), 'rwxa') 943 944 def test_RawIOBase_readall(self): 945 # Exercise the default unlimited RawIOBase.read() and readall() 946 # implementations. 947 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg")) 948 self.assertEqual(rawio.read(), b"abcdefg") 949 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg")) 950 self.assertEqual(rawio.readall(), b"abcdefg") 951 952 def test_BufferedIOBase_readinto(self): 953 # Exercise the default BufferedIOBase.readinto() and readinto1() 954 # implementations (which call read() or read1() internally). 955 class Reader(self.BufferedIOBase): 956 def __init__(self, avail): 957 self.avail = avail 958 def read(self, size): 959 result = self.avail[:size] 960 self.avail = self.avail[size:] 961 return result 962 def read1(self, size): 963 """Returns no more than 5 bytes at once""" 964 return self.read(min(size, 5)) 965 tests = ( 966 # (test method, total data available, read buffer size, expected 967 # read size) 968 ("readinto", 10, 5, 5), 969 ("readinto", 10, 6, 6), # More than read1() can return 970 ("readinto", 5, 6, 5), # Buffer larger than total available 971 ("readinto", 6, 7, 6), 972 ("readinto", 10, 0, 0), # Empty buffer 973 ("readinto1", 10, 5, 5), # Result limited to single read1() call 974 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return 975 ("readinto1", 5, 6, 5), # Buffer larger than total available 976 ("readinto1", 6, 7, 5), 977 ("readinto1", 10, 0, 0), # Empty buffer 978 ) 979 UNUSED_BYTE = 0x81 980 for test in tests: 981 with self.subTest(test): 982 method, avail, request, result = test 983 reader = Reader(bytes(range(avail))) 984 buffer = bytearray((UNUSED_BYTE,) * request) 985 method = getattr(reader, method) 986 self.assertEqual(method(buffer), result) 987 self.assertEqual(len(buffer), request) 988 self.assertSequenceEqual(buffer[:result], range(result)) 989 unused = (UNUSED_BYTE,) * (request - result) 990 self.assertSequenceEqual(buffer[result:], unused) 991 self.assertEqual(len(reader.avail), avail - result) 992 993 def test_close_assert(self): 994 class R(self.IOBase): 995 def __setattr__(self, name, value): 996 pass 997 def flush(self): 998 raise OSError() 999 f = R() 1000 # This would cause an assertion failure. 1001 self.assertRaises(OSError, f.close) 1002 1003 # Silence destructor error 1004 R.flush = lambda self: None 1005 1006 1007class CIOTest(IOTest): 1008 1009 def test_IOBase_finalize(self): 1010 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a 1011 # class which inherits IOBase and an object of this class are caught 1012 # in a reference cycle and close() is already in the method cache. 1013 class MyIO(self.IOBase): 1014 def close(self): 1015 pass 1016 1017 # create an instance to populate the method cache 1018 MyIO() 1019 obj = MyIO() 1020 obj.obj = obj 1021 wr = weakref.ref(obj) 1022 del MyIO 1023 del obj 1024 support.gc_collect() 1025 self.assertIsNone(wr(), wr) 1026 1027class PyIOTest(IOTest): 1028 pass 1029 1030 1031@support.cpython_only 1032class APIMismatchTest(unittest.TestCase): 1033 1034 def test_RawIOBase_io_in_pyio_match(self): 1035 """Test that pyio RawIOBase class has all c RawIOBase methods""" 1036 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase, 1037 ignore=('__weakref__',)) 1038 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods') 1039 1040 def test_RawIOBase_pyio_in_io_match(self): 1041 """Test that c RawIOBase class has all pyio RawIOBase methods""" 1042 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase) 1043 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods') 1044 1045 1046class CommonBufferedTests: 1047 # Tests common to BufferedReader, BufferedWriter and BufferedRandom 1048 1049 def test_detach(self): 1050 raw = self.MockRawIO() 1051 buf = self.tp(raw) 1052 self.assertIs(buf.detach(), raw) 1053 self.assertRaises(ValueError, buf.detach) 1054 1055 repr(buf) # Should still work 1056 1057 def test_fileno(self): 1058 rawio = self.MockRawIO() 1059 bufio = self.tp(rawio) 1060 1061 self.assertEqual(42, bufio.fileno()) 1062 1063 def test_invalid_args(self): 1064 rawio = self.MockRawIO() 1065 bufio = self.tp(rawio) 1066 # Invalid whence 1067 self.assertRaises(ValueError, bufio.seek, 0, -1) 1068 self.assertRaises(ValueError, bufio.seek, 0, 9) 1069 1070 def test_override_destructor(self): 1071 tp = self.tp 1072 record = [] 1073 class MyBufferedIO(tp): 1074 def __del__(self): 1075 record.append(1) 1076 try: 1077 f = super().__del__ 1078 except AttributeError: 1079 pass 1080 else: 1081 f() 1082 def close(self): 1083 record.append(2) 1084 super().close() 1085 def flush(self): 1086 record.append(3) 1087 super().flush() 1088 rawio = self.MockRawIO() 1089 bufio = MyBufferedIO(rawio) 1090 del bufio 1091 support.gc_collect() 1092 self.assertEqual(record, [1, 2, 3]) 1093 1094 def test_context_manager(self): 1095 # Test usability as a context manager 1096 rawio = self.MockRawIO() 1097 bufio = self.tp(rawio) 1098 def _with(): 1099 with bufio: 1100 pass 1101 _with() 1102 # bufio should now be closed, and using it a second time should raise 1103 # a ValueError. 1104 self.assertRaises(ValueError, _with) 1105 1106 def test_error_through_destructor(self): 1107 # Test that the exception state is not modified by a destructor, 1108 # even if close() fails. 1109 rawio = self.CloseFailureIO() 1110 with support.catch_unraisable_exception() as cm: 1111 with self.assertRaises(AttributeError): 1112 self.tp(rawio).xyzzy 1113 1114 if not IOBASE_EMITS_UNRAISABLE: 1115 self.assertIsNone(cm.unraisable) 1116 elif cm.unraisable is not None: 1117 self.assertEqual(cm.unraisable.exc_type, OSError) 1118 1119 def test_repr(self): 1120 raw = self.MockRawIO() 1121 b = self.tp(raw) 1122 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__) 1123 self.assertRegex(repr(b), "<%s>" % clsname) 1124 raw.name = "dummy" 1125 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname) 1126 raw.name = b"dummy" 1127 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname) 1128 1129 def test_recursive_repr(self): 1130 # Issue #25455 1131 raw = self.MockRawIO() 1132 b = self.tp(raw) 1133 with support.swap_attr(raw, 'name', b): 1134 try: 1135 repr(b) # Should not crash 1136 except RuntimeError: 1137 pass 1138 1139 def test_flush_error_on_close(self): 1140 # Test that buffered file is closed despite failed flush 1141 # and that flush() is called before file closed. 1142 raw = self.MockRawIO() 1143 closed = [] 1144 def bad_flush(): 1145 closed[:] = [b.closed, raw.closed] 1146 raise OSError() 1147 raw.flush = bad_flush 1148 b = self.tp(raw) 1149 self.assertRaises(OSError, b.close) # exception not swallowed 1150 self.assertTrue(b.closed) 1151 self.assertTrue(raw.closed) 1152 self.assertTrue(closed) # flush() called 1153 self.assertFalse(closed[0]) # flush() called before file closed 1154 self.assertFalse(closed[1]) 1155 raw.flush = lambda: None # break reference loop 1156 1157 def test_close_error_on_close(self): 1158 raw = self.MockRawIO() 1159 def bad_flush(): 1160 raise OSError('flush') 1161 def bad_close(): 1162 raise OSError('close') 1163 raw.close = bad_close 1164 b = self.tp(raw) 1165 b.flush = bad_flush 1166 with self.assertRaises(OSError) as err: # exception not swallowed 1167 b.close() 1168 self.assertEqual(err.exception.args, ('close',)) 1169 self.assertIsInstance(err.exception.__context__, OSError) 1170 self.assertEqual(err.exception.__context__.args, ('flush',)) 1171 self.assertFalse(b.closed) 1172 1173 # Silence destructor error 1174 raw.close = lambda: None 1175 b.flush = lambda: None 1176 1177 def test_nonnormalized_close_error_on_close(self): 1178 # Issue #21677 1179 raw = self.MockRawIO() 1180 def bad_flush(): 1181 raise non_existing_flush 1182 def bad_close(): 1183 raise non_existing_close 1184 raw.close = bad_close 1185 b = self.tp(raw) 1186 b.flush = bad_flush 1187 with self.assertRaises(NameError) as err: # exception not swallowed 1188 b.close() 1189 self.assertIn('non_existing_close', str(err.exception)) 1190 self.assertIsInstance(err.exception.__context__, NameError) 1191 self.assertIn('non_existing_flush', str(err.exception.__context__)) 1192 self.assertFalse(b.closed) 1193 1194 # Silence destructor error 1195 b.flush = lambda: None 1196 raw.close = lambda: None 1197 1198 def test_multi_close(self): 1199 raw = self.MockRawIO() 1200 b = self.tp(raw) 1201 b.close() 1202 b.close() 1203 b.close() 1204 self.assertRaises(ValueError, b.flush) 1205 1206 def test_unseekable(self): 1207 bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) 1208 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1209 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1210 1211 def test_readonly_attributes(self): 1212 raw = self.MockRawIO() 1213 buf = self.tp(raw) 1214 x = self.MockRawIO() 1215 with self.assertRaises(AttributeError): 1216 buf.raw = x 1217 1218 1219class SizeofTest: 1220 1221 @support.cpython_only 1222 def test_sizeof(self): 1223 bufsize1 = 4096 1224 bufsize2 = 8192 1225 rawio = self.MockRawIO() 1226 bufio = self.tp(rawio, buffer_size=bufsize1) 1227 size = sys.getsizeof(bufio) - bufsize1 1228 rawio = self.MockRawIO() 1229 bufio = self.tp(rawio, buffer_size=bufsize2) 1230 self.assertEqual(sys.getsizeof(bufio), size + bufsize2) 1231 1232 @support.cpython_only 1233 def test_buffer_freeing(self) : 1234 bufsize = 4096 1235 rawio = self.MockRawIO() 1236 bufio = self.tp(rawio, buffer_size=bufsize) 1237 size = sys.getsizeof(bufio) - bufsize 1238 bufio.close() 1239 self.assertEqual(sys.getsizeof(bufio), size) 1240 1241class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): 1242 read_mode = "rb" 1243 1244 def test_constructor(self): 1245 rawio = self.MockRawIO([b"abc"]) 1246 bufio = self.tp(rawio) 1247 bufio.__init__(rawio) 1248 bufio.__init__(rawio, buffer_size=1024) 1249 bufio.__init__(rawio, buffer_size=16) 1250 self.assertEqual(b"abc", bufio.read()) 1251 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1252 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1253 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1254 rawio = self.MockRawIO([b"abc"]) 1255 bufio.__init__(rawio) 1256 self.assertEqual(b"abc", bufio.read()) 1257 1258 def test_uninitialized(self): 1259 bufio = self.tp.__new__(self.tp) 1260 del bufio 1261 bufio = self.tp.__new__(self.tp) 1262 self.assertRaisesRegex((ValueError, AttributeError), 1263 'uninitialized|has no attribute', 1264 bufio.read, 0) 1265 bufio.__init__(self.MockRawIO()) 1266 self.assertEqual(bufio.read(0), b'') 1267 1268 def test_read(self): 1269 for arg in (None, 7): 1270 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1271 bufio = self.tp(rawio) 1272 self.assertEqual(b"abcdefg", bufio.read(arg)) 1273 # Invalid args 1274 self.assertRaises(ValueError, bufio.read, -2) 1275 1276 def test_read1(self): 1277 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1278 bufio = self.tp(rawio) 1279 self.assertEqual(b"a", bufio.read(1)) 1280 self.assertEqual(b"b", bufio.read1(1)) 1281 self.assertEqual(rawio._reads, 1) 1282 self.assertEqual(b"", bufio.read1(0)) 1283 self.assertEqual(b"c", bufio.read1(100)) 1284 self.assertEqual(rawio._reads, 1) 1285 self.assertEqual(b"d", bufio.read1(100)) 1286 self.assertEqual(rawio._reads, 2) 1287 self.assertEqual(b"efg", bufio.read1(100)) 1288 self.assertEqual(rawio._reads, 3) 1289 self.assertEqual(b"", bufio.read1(100)) 1290 self.assertEqual(rawio._reads, 4) 1291 1292 def test_read1_arbitrary(self): 1293 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1294 bufio = self.tp(rawio) 1295 self.assertEqual(b"a", bufio.read(1)) 1296 self.assertEqual(b"bc", bufio.read1()) 1297 self.assertEqual(b"d", bufio.read1()) 1298 self.assertEqual(b"efg", bufio.read1(-1)) 1299 self.assertEqual(rawio._reads, 3) 1300 self.assertEqual(b"", bufio.read1()) 1301 self.assertEqual(rawio._reads, 4) 1302 1303 def test_readinto(self): 1304 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1305 bufio = self.tp(rawio) 1306 b = bytearray(2) 1307 self.assertEqual(bufio.readinto(b), 2) 1308 self.assertEqual(b, b"ab") 1309 self.assertEqual(bufio.readinto(b), 2) 1310 self.assertEqual(b, b"cd") 1311 self.assertEqual(bufio.readinto(b), 2) 1312 self.assertEqual(b, b"ef") 1313 self.assertEqual(bufio.readinto(b), 1) 1314 self.assertEqual(b, b"gf") 1315 self.assertEqual(bufio.readinto(b), 0) 1316 self.assertEqual(b, b"gf") 1317 rawio = self.MockRawIO((b"abc", None)) 1318 bufio = self.tp(rawio) 1319 self.assertEqual(bufio.readinto(b), 2) 1320 self.assertEqual(b, b"ab") 1321 self.assertEqual(bufio.readinto(b), 1) 1322 self.assertEqual(b, b"cb") 1323 1324 def test_readinto1(self): 1325 buffer_size = 10 1326 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl")) 1327 bufio = self.tp(rawio, buffer_size=buffer_size) 1328 b = bytearray(2) 1329 self.assertEqual(bufio.peek(3), b'abc') 1330 self.assertEqual(rawio._reads, 1) 1331 self.assertEqual(bufio.readinto1(b), 2) 1332 self.assertEqual(b, b"ab") 1333 self.assertEqual(rawio._reads, 1) 1334 self.assertEqual(bufio.readinto1(b), 1) 1335 self.assertEqual(b[:1], b"c") 1336 self.assertEqual(rawio._reads, 1) 1337 self.assertEqual(bufio.readinto1(b), 2) 1338 self.assertEqual(b, b"de") 1339 self.assertEqual(rawio._reads, 2) 1340 b = bytearray(2*buffer_size) 1341 self.assertEqual(bufio.peek(3), b'fgh') 1342 self.assertEqual(rawio._reads, 3) 1343 self.assertEqual(bufio.readinto1(b), 6) 1344 self.assertEqual(b[:6], b"fghjkl") 1345 self.assertEqual(rawio._reads, 4) 1346 1347 def test_readinto_array(self): 1348 buffer_size = 60 1349 data = b"a" * 26 1350 rawio = self.MockRawIO((data,)) 1351 bufio = self.tp(rawio, buffer_size=buffer_size) 1352 1353 # Create an array with element size > 1 byte 1354 b = array.array('i', b'x' * 32) 1355 assert len(b) != 16 1356 1357 # Read into it. We should get as many *bytes* as we can fit into b 1358 # (which is more than the number of elements) 1359 n = bufio.readinto(b) 1360 self.assertGreater(n, len(b)) 1361 1362 # Check that old contents of b are preserved 1363 bm = memoryview(b).cast('B') 1364 self.assertLess(n, len(bm)) 1365 self.assertEqual(bm[:n], data[:n]) 1366 self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) 1367 1368 def test_readinto1_array(self): 1369 buffer_size = 60 1370 data = b"a" * 26 1371 rawio = self.MockRawIO((data,)) 1372 bufio = self.tp(rawio, buffer_size=buffer_size) 1373 1374 # Create an array with element size > 1 byte 1375 b = array.array('i', b'x' * 32) 1376 assert len(b) != 16 1377 1378 # Read into it. We should get as many *bytes* as we can fit into b 1379 # (which is more than the number of elements) 1380 n = bufio.readinto1(b) 1381 self.assertGreater(n, len(b)) 1382 1383 # Check that old contents of b are preserved 1384 bm = memoryview(b).cast('B') 1385 self.assertLess(n, len(bm)) 1386 self.assertEqual(bm[:n], data[:n]) 1387 self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) 1388 1389 def test_readlines(self): 1390 def bufio(): 1391 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) 1392 return self.tp(rawio) 1393 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"]) 1394 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"]) 1395 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"]) 1396 1397 def test_buffering(self): 1398 data = b"abcdefghi" 1399 dlen = len(data) 1400 1401 tests = [ 1402 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ], 1403 [ 100, [ 3, 3, 3], [ dlen ] ], 1404 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ], 1405 ] 1406 1407 for bufsize, buf_read_sizes, raw_read_sizes in tests: 1408 rawio = self.MockFileIO(data) 1409 bufio = self.tp(rawio, buffer_size=bufsize) 1410 pos = 0 1411 for nbytes in buf_read_sizes: 1412 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes]) 1413 pos += nbytes 1414 # this is mildly implementation-dependent 1415 self.assertEqual(rawio.read_history, raw_read_sizes) 1416 1417 def test_read_non_blocking(self): 1418 # Inject some None's in there to simulate EWOULDBLOCK 1419 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None)) 1420 bufio = self.tp(rawio) 1421 self.assertEqual(b"abcd", bufio.read(6)) 1422 self.assertEqual(b"e", bufio.read(1)) 1423 self.assertEqual(b"fg", bufio.read()) 1424 self.assertEqual(b"", bufio.peek(1)) 1425 self.assertIsNone(bufio.read()) 1426 self.assertEqual(b"", bufio.read()) 1427 1428 rawio = self.MockRawIO((b"a", None, None)) 1429 self.assertEqual(b"a", rawio.readall()) 1430 self.assertIsNone(rawio.readall()) 1431 1432 def test_read_past_eof(self): 1433 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1434 bufio = self.tp(rawio) 1435 1436 self.assertEqual(b"abcdefg", bufio.read(9000)) 1437 1438 def test_read_all(self): 1439 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1440 bufio = self.tp(rawio) 1441 1442 self.assertEqual(b"abcdefg", bufio.read()) 1443 1444 @support.requires_resource('cpu') 1445 def test_threads(self): 1446 try: 1447 # Write out many bytes with exactly the same number of 0's, 1448 # 1's... 255's. This will help us check that concurrent reading 1449 # doesn't duplicate or forget contents. 1450 N = 1000 1451 l = list(range(256)) * N 1452 random.shuffle(l) 1453 s = bytes(bytearray(l)) 1454 with self.open(support.TESTFN, "wb") as f: 1455 f.write(s) 1456 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw: 1457 bufio = self.tp(raw, 8) 1458 errors = [] 1459 results = [] 1460 def f(): 1461 try: 1462 # Intra-buffer read then buffer-flushing read 1463 for n in cycle([1, 19]): 1464 s = bufio.read(n) 1465 if not s: 1466 break 1467 # list.append() is atomic 1468 results.append(s) 1469 except Exception as e: 1470 errors.append(e) 1471 raise 1472 threads = [threading.Thread(target=f) for x in range(20)] 1473 with support.start_threads(threads): 1474 time.sleep(0.02) # yield 1475 self.assertFalse(errors, 1476 "the following exceptions were caught: %r" % errors) 1477 s = b''.join(results) 1478 for i in range(256): 1479 c = bytes(bytearray([i])) 1480 self.assertEqual(s.count(c), N) 1481 finally: 1482 support.unlink(support.TESTFN) 1483 1484 def test_unseekable(self): 1485 bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) 1486 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1487 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1488 bufio.read(1) 1489 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1490 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1491 1492 def test_misbehaved_io(self): 1493 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1494 bufio = self.tp(rawio) 1495 self.assertRaises(OSError, bufio.seek, 0) 1496 self.assertRaises(OSError, bufio.tell) 1497 1498 # Silence destructor error 1499 bufio.close = lambda: None 1500 1501 def test_no_extraneous_read(self): 1502 # Issue #9550; when the raw IO object has satisfied the read request, 1503 # we should not issue any additional reads, otherwise it may block 1504 # (e.g. socket). 1505 bufsize = 16 1506 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2): 1507 rawio = self.MockRawIO([b"x" * n]) 1508 bufio = self.tp(rawio, bufsize) 1509 self.assertEqual(bufio.read(n), b"x" * n) 1510 # Simple case: one raw read is enough to satisfy the request. 1511 self.assertEqual(rawio._extraneous_reads, 0, 1512 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1513 # A more complex case where two raw reads are needed to satisfy 1514 # the request. 1515 rawio = self.MockRawIO([b"x" * (n - 1), b"x"]) 1516 bufio = self.tp(rawio, bufsize) 1517 self.assertEqual(bufio.read(n), b"x" * n) 1518 self.assertEqual(rawio._extraneous_reads, 0, 1519 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1520 1521 def test_read_on_closed(self): 1522 # Issue #23796 1523 b = io.BufferedReader(io.BytesIO(b"12")) 1524 b.read(1) 1525 b.close() 1526 self.assertRaises(ValueError, b.peek) 1527 self.assertRaises(ValueError, b.read1, 1) 1528 1529 1530class CBufferedReaderTest(BufferedReaderTest, SizeofTest): 1531 tp = io.BufferedReader 1532 1533 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing " 1534 "instead of returning NULL for malloc failure.") 1535 def test_constructor(self): 1536 BufferedReaderTest.test_constructor(self) 1537 # The allocation can succeed on 32-bit builds, e.g. with more 1538 # than 2 GiB RAM and a 64-bit kernel. 1539 if sys.maxsize > 0x7FFFFFFF: 1540 rawio = self.MockRawIO() 1541 bufio = self.tp(rawio) 1542 self.assertRaises((OverflowError, MemoryError, ValueError), 1543 bufio.__init__, rawio, sys.maxsize) 1544 1545 def test_initialization(self): 1546 rawio = self.MockRawIO([b"abc"]) 1547 bufio = self.tp(rawio) 1548 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1549 self.assertRaises(ValueError, bufio.read) 1550 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1551 self.assertRaises(ValueError, bufio.read) 1552 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1553 self.assertRaises(ValueError, bufio.read) 1554 1555 def test_misbehaved_io_read(self): 1556 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1557 bufio = self.tp(rawio) 1558 # _pyio.BufferedReader seems to implement reading different, so that 1559 # checking this is not so easy. 1560 self.assertRaises(OSError, bufio.read, 10) 1561 1562 def test_garbage_collection(self): 1563 # C BufferedReader objects are collected. 1564 # The Python version has __del__, so it ends into gc.garbage instead 1565 self.addCleanup(support.unlink, support.TESTFN) 1566 with support.check_warnings(('', ResourceWarning)): 1567 rawio = self.FileIO(support.TESTFN, "w+b") 1568 f = self.tp(rawio) 1569 f.f = f 1570 wr = weakref.ref(f) 1571 del f 1572 support.gc_collect() 1573 self.assertIsNone(wr(), wr) 1574 1575 def test_args_error(self): 1576 # Issue #17275 1577 with self.assertRaisesRegex(TypeError, "BufferedReader"): 1578 self.tp(io.BytesIO(), 1024, 1024, 1024) 1579 1580 1581class PyBufferedReaderTest(BufferedReaderTest): 1582 tp = pyio.BufferedReader 1583 1584 1585class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): 1586 write_mode = "wb" 1587 1588 def test_constructor(self): 1589 rawio = self.MockRawIO() 1590 bufio = self.tp(rawio) 1591 bufio.__init__(rawio) 1592 bufio.__init__(rawio, buffer_size=1024) 1593 bufio.__init__(rawio, buffer_size=16) 1594 self.assertEqual(3, bufio.write(b"abc")) 1595 bufio.flush() 1596 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1597 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1598 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1599 bufio.__init__(rawio) 1600 self.assertEqual(3, bufio.write(b"ghi")) 1601 bufio.flush() 1602 self.assertEqual(b"".join(rawio._write_stack), b"abcghi") 1603 1604 def test_uninitialized(self): 1605 bufio = self.tp.__new__(self.tp) 1606 del bufio 1607 bufio = self.tp.__new__(self.tp) 1608 self.assertRaisesRegex((ValueError, AttributeError), 1609 'uninitialized|has no attribute', 1610 bufio.write, b'') 1611 bufio.__init__(self.MockRawIO()) 1612 self.assertEqual(bufio.write(b''), 0) 1613 1614 def test_detach_flush(self): 1615 raw = self.MockRawIO() 1616 buf = self.tp(raw) 1617 buf.write(b"howdy!") 1618 self.assertFalse(raw._write_stack) 1619 buf.detach() 1620 self.assertEqual(raw._write_stack, [b"howdy!"]) 1621 1622 def test_write(self): 1623 # Write to the buffered IO but don't overflow the buffer. 1624 writer = self.MockRawIO() 1625 bufio = self.tp(writer, 8) 1626 bufio.write(b"abc") 1627 self.assertFalse(writer._write_stack) 1628 buffer = bytearray(b"def") 1629 bufio.write(buffer) 1630 buffer[:] = b"***" # Overwrite our copy of the data 1631 bufio.flush() 1632 self.assertEqual(b"".join(writer._write_stack), b"abcdef") 1633 1634 def test_write_overflow(self): 1635 writer = self.MockRawIO() 1636 bufio = self.tp(writer, 8) 1637 contents = b"abcdefghijklmnop" 1638 for n in range(0, len(contents), 3): 1639 bufio.write(contents[n:n+3]) 1640 flushed = b"".join(writer._write_stack) 1641 # At least (total - 8) bytes were implicitly flushed, perhaps more 1642 # depending on the implementation. 1643 self.assertTrue(flushed.startswith(contents[:-8]), flushed) 1644 1645 def check_writes(self, intermediate_func): 1646 # Lots of writes, test the flushed output is as expected. 1647 contents = bytes(range(256)) * 1000 1648 n = 0 1649 writer = self.MockRawIO() 1650 bufio = self.tp(writer, 13) 1651 # Generator of write sizes: repeat each N 15 times then proceed to N+1 1652 def gen_sizes(): 1653 for size in count(1): 1654 for i in range(15): 1655 yield size 1656 sizes = gen_sizes() 1657 while n < len(contents): 1658 size = min(next(sizes), len(contents) - n) 1659 self.assertEqual(bufio.write(contents[n:n+size]), size) 1660 intermediate_func(bufio) 1661 n += size 1662 bufio.flush() 1663 self.assertEqual(contents, b"".join(writer._write_stack)) 1664 1665 def test_writes(self): 1666 self.check_writes(lambda bufio: None) 1667 1668 def test_writes_and_flushes(self): 1669 self.check_writes(lambda bufio: bufio.flush()) 1670 1671 def test_writes_and_seeks(self): 1672 def _seekabs(bufio): 1673 pos = bufio.tell() 1674 bufio.seek(pos + 1, 0) 1675 bufio.seek(pos - 1, 0) 1676 bufio.seek(pos, 0) 1677 self.check_writes(_seekabs) 1678 def _seekrel(bufio): 1679 pos = bufio.seek(0, 1) 1680 bufio.seek(+1, 1) 1681 bufio.seek(-1, 1) 1682 bufio.seek(pos, 0) 1683 self.check_writes(_seekrel) 1684 1685 def test_writes_and_truncates(self): 1686 self.check_writes(lambda bufio: bufio.truncate(bufio.tell())) 1687 1688 def test_write_non_blocking(self): 1689 raw = self.MockNonBlockWriterIO() 1690 bufio = self.tp(raw, 8) 1691 1692 self.assertEqual(bufio.write(b"abcd"), 4) 1693 self.assertEqual(bufio.write(b"efghi"), 5) 1694 # 1 byte will be written, the rest will be buffered 1695 raw.block_on(b"k") 1696 self.assertEqual(bufio.write(b"jklmn"), 5) 1697 1698 # 8 bytes will be written, 8 will be buffered and the rest will be lost 1699 raw.block_on(b"0") 1700 try: 1701 bufio.write(b"opqrwxyz0123456789") 1702 except self.BlockingIOError as e: 1703 written = e.characters_written 1704 else: 1705 self.fail("BlockingIOError should have been raised") 1706 self.assertEqual(written, 16) 1707 self.assertEqual(raw.pop_written(), 1708 b"abcdefghijklmnopqrwxyz") 1709 1710 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9) 1711 s = raw.pop_written() 1712 # Previously buffered bytes were flushed 1713 self.assertTrue(s.startswith(b"01234567A"), s) 1714 1715 def test_write_and_rewind(self): 1716 raw = io.BytesIO() 1717 bufio = self.tp(raw, 4) 1718 self.assertEqual(bufio.write(b"abcdef"), 6) 1719 self.assertEqual(bufio.tell(), 6) 1720 bufio.seek(0, 0) 1721 self.assertEqual(bufio.write(b"XY"), 2) 1722 bufio.seek(6, 0) 1723 self.assertEqual(raw.getvalue(), b"XYcdef") 1724 self.assertEqual(bufio.write(b"123456"), 6) 1725 bufio.flush() 1726 self.assertEqual(raw.getvalue(), b"XYcdef123456") 1727 1728 def test_flush(self): 1729 writer = self.MockRawIO() 1730 bufio = self.tp(writer, 8) 1731 bufio.write(b"abc") 1732 bufio.flush() 1733 self.assertEqual(b"abc", writer._write_stack[0]) 1734 1735 def test_writelines(self): 1736 l = [b'ab', b'cd', b'ef'] 1737 writer = self.MockRawIO() 1738 bufio = self.tp(writer, 8) 1739 bufio.writelines(l) 1740 bufio.flush() 1741 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1742 1743 def test_writelines_userlist(self): 1744 l = UserList([b'ab', b'cd', b'ef']) 1745 writer = self.MockRawIO() 1746 bufio = self.tp(writer, 8) 1747 bufio.writelines(l) 1748 bufio.flush() 1749 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1750 1751 def test_writelines_error(self): 1752 writer = self.MockRawIO() 1753 bufio = self.tp(writer, 8) 1754 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3]) 1755 self.assertRaises(TypeError, bufio.writelines, None) 1756 self.assertRaises(TypeError, bufio.writelines, 'abc') 1757 1758 def test_destructor(self): 1759 writer = self.MockRawIO() 1760 bufio = self.tp(writer, 8) 1761 bufio.write(b"abc") 1762 del bufio 1763 support.gc_collect() 1764 self.assertEqual(b"abc", writer._write_stack[0]) 1765 1766 def test_truncate(self): 1767 # Truncate implicitly flushes the buffer. 1768 self.addCleanup(support.unlink, support.TESTFN) 1769 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1770 bufio = self.tp(raw, 8) 1771 bufio.write(b"abcdef") 1772 self.assertEqual(bufio.truncate(3), 3) 1773 self.assertEqual(bufio.tell(), 6) 1774 with self.open(support.TESTFN, "rb", buffering=0) as f: 1775 self.assertEqual(f.read(), b"abc") 1776 1777 def test_truncate_after_write(self): 1778 # Ensure that truncate preserves the file position after 1779 # writes longer than the buffer size. 1780 # Issue: https://bugs.python.org/issue32228 1781 self.addCleanup(support.unlink, support.TESTFN) 1782 with self.open(support.TESTFN, "wb") as f: 1783 # Fill with some buffer 1784 f.write(b'\x00' * 10000) 1785 buffer_sizes = [8192, 4096, 200] 1786 for buffer_size in buffer_sizes: 1787 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f: 1788 f.write(b'\x00' * (buffer_size + 1)) 1789 # After write write_pos and write_end are set to 0 1790 f.read(1) 1791 # read operation makes sure that pos != raw_pos 1792 f.truncate() 1793 self.assertEqual(f.tell(), buffer_size + 2) 1794 1795 @support.requires_resource('cpu') 1796 def test_threads(self): 1797 try: 1798 # Write out many bytes from many threads and test they were 1799 # all flushed. 1800 N = 1000 1801 contents = bytes(range(256)) * N 1802 sizes = cycle([1, 19]) 1803 n = 0 1804 queue = deque() 1805 while n < len(contents): 1806 size = next(sizes) 1807 queue.append(contents[n:n+size]) 1808 n += size 1809 del contents 1810 # We use a real file object because it allows us to 1811 # exercise situations where the GIL is released before 1812 # writing the buffer to the raw streams. This is in addition 1813 # to concurrency issues due to switching threads in the middle 1814 # of Python code. 1815 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1816 bufio = self.tp(raw, 8) 1817 errors = [] 1818 def f(): 1819 try: 1820 while True: 1821 try: 1822 s = queue.popleft() 1823 except IndexError: 1824 return 1825 bufio.write(s) 1826 except Exception as e: 1827 errors.append(e) 1828 raise 1829 threads = [threading.Thread(target=f) for x in range(20)] 1830 with support.start_threads(threads): 1831 time.sleep(0.02) # yield 1832 self.assertFalse(errors, 1833 "the following exceptions were caught: %r" % errors) 1834 bufio.close() 1835 with self.open(support.TESTFN, "rb") as f: 1836 s = f.read() 1837 for i in range(256): 1838 self.assertEqual(s.count(bytes([i])), N) 1839 finally: 1840 support.unlink(support.TESTFN) 1841 1842 def test_misbehaved_io(self): 1843 rawio = self.MisbehavedRawIO() 1844 bufio = self.tp(rawio, 5) 1845 self.assertRaises(OSError, bufio.seek, 0) 1846 self.assertRaises(OSError, bufio.tell) 1847 self.assertRaises(OSError, bufio.write, b"abcdef") 1848 1849 # Silence destructor error 1850 bufio.close = lambda: None 1851 1852 def test_max_buffer_size_removal(self): 1853 with self.assertRaises(TypeError): 1854 self.tp(self.MockRawIO(), 8, 12) 1855 1856 def test_write_error_on_close(self): 1857 raw = self.MockRawIO() 1858 def bad_write(b): 1859 raise OSError() 1860 raw.write = bad_write 1861 b = self.tp(raw) 1862 b.write(b'spam') 1863 self.assertRaises(OSError, b.close) # exception not swallowed 1864 self.assertTrue(b.closed) 1865 1866 def test_slow_close_from_thread(self): 1867 # Issue #31976 1868 rawio = self.SlowFlushRawIO() 1869 bufio = self.tp(rawio, 8) 1870 t = threading.Thread(target=bufio.close) 1871 t.start() 1872 rawio.in_flush.wait() 1873 self.assertRaises(ValueError, bufio.write, b'spam') 1874 self.assertTrue(bufio.closed) 1875 t.join() 1876 1877 1878 1879class CBufferedWriterTest(BufferedWriterTest, SizeofTest): 1880 tp = io.BufferedWriter 1881 1882 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing " 1883 "instead of returning NULL for malloc failure.") 1884 def test_constructor(self): 1885 BufferedWriterTest.test_constructor(self) 1886 # The allocation can succeed on 32-bit builds, e.g. with more 1887 # than 2 GiB RAM and a 64-bit kernel. 1888 if sys.maxsize > 0x7FFFFFFF: 1889 rawio = self.MockRawIO() 1890 bufio = self.tp(rawio) 1891 self.assertRaises((OverflowError, MemoryError, ValueError), 1892 bufio.__init__, rawio, sys.maxsize) 1893 1894 def test_initialization(self): 1895 rawio = self.MockRawIO() 1896 bufio = self.tp(rawio) 1897 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1898 self.assertRaises(ValueError, bufio.write, b"def") 1899 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1900 self.assertRaises(ValueError, bufio.write, b"def") 1901 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1902 self.assertRaises(ValueError, bufio.write, b"def") 1903 1904 def test_garbage_collection(self): 1905 # C BufferedWriter objects are collected, and collecting them flushes 1906 # all data to disk. 1907 # The Python version has __del__, so it ends into gc.garbage instead 1908 self.addCleanup(support.unlink, support.TESTFN) 1909 with support.check_warnings(('', ResourceWarning)): 1910 rawio = self.FileIO(support.TESTFN, "w+b") 1911 f = self.tp(rawio) 1912 f.write(b"123xxx") 1913 f.x = f 1914 wr = weakref.ref(f) 1915 del f 1916 support.gc_collect() 1917 self.assertIsNone(wr(), wr) 1918 with self.open(support.TESTFN, "rb") as f: 1919 self.assertEqual(f.read(), b"123xxx") 1920 1921 def test_args_error(self): 1922 # Issue #17275 1923 with self.assertRaisesRegex(TypeError, "BufferedWriter"): 1924 self.tp(io.BytesIO(), 1024, 1024, 1024) 1925 1926 1927class PyBufferedWriterTest(BufferedWriterTest): 1928 tp = pyio.BufferedWriter 1929 1930class BufferedRWPairTest(unittest.TestCase): 1931 1932 def test_constructor(self): 1933 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1934 self.assertFalse(pair.closed) 1935 1936 def test_uninitialized(self): 1937 pair = self.tp.__new__(self.tp) 1938 del pair 1939 pair = self.tp.__new__(self.tp) 1940 self.assertRaisesRegex((ValueError, AttributeError), 1941 'uninitialized|has no attribute', 1942 pair.read, 0) 1943 self.assertRaisesRegex((ValueError, AttributeError), 1944 'uninitialized|has no attribute', 1945 pair.write, b'') 1946 pair.__init__(self.MockRawIO(), self.MockRawIO()) 1947 self.assertEqual(pair.read(0), b'') 1948 self.assertEqual(pair.write(b''), 0) 1949 1950 def test_detach(self): 1951 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1952 self.assertRaises(self.UnsupportedOperation, pair.detach) 1953 1954 def test_constructor_max_buffer_size_removal(self): 1955 with self.assertRaises(TypeError): 1956 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) 1957 1958 def test_constructor_with_not_readable(self): 1959 class NotReadable(MockRawIO): 1960 def readable(self): 1961 return False 1962 1963 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO()) 1964 1965 def test_constructor_with_not_writeable(self): 1966 class NotWriteable(MockRawIO): 1967 def writable(self): 1968 return False 1969 1970 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable()) 1971 1972 def test_read(self): 1973 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1974 1975 self.assertEqual(pair.read(3), b"abc") 1976 self.assertEqual(pair.read(1), b"d") 1977 self.assertEqual(pair.read(), b"ef") 1978 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO()) 1979 self.assertEqual(pair.read(None), b"abc") 1980 1981 def test_readlines(self): 1982 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO()) 1983 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1984 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1985 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"]) 1986 1987 def test_read1(self): 1988 # .read1() is delegated to the underlying reader object, so this test 1989 # can be shallow. 1990 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1991 1992 self.assertEqual(pair.read1(3), b"abc") 1993 self.assertEqual(pair.read1(), b"def") 1994 1995 def test_readinto(self): 1996 for method in ("readinto", "readinto1"): 1997 with self.subTest(method): 1998 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1999 2000 data = byteslike(b'\0' * 5) 2001 self.assertEqual(getattr(pair, method)(data), 5) 2002 self.assertEqual(bytes(data), b"abcde") 2003 2004 def test_write(self): 2005 w = self.MockRawIO() 2006 pair = self.tp(self.MockRawIO(), w) 2007 2008 pair.write(b"abc") 2009 pair.flush() 2010 buffer = bytearray(b"def") 2011 pair.write(buffer) 2012 buffer[:] = b"***" # Overwrite our copy of the data 2013 pair.flush() 2014 self.assertEqual(w._write_stack, [b"abc", b"def"]) 2015 2016 def test_peek(self): 2017 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 2018 2019 self.assertTrue(pair.peek(3).startswith(b"abc")) 2020 self.assertEqual(pair.read(3), b"abc") 2021 2022 def test_readable(self): 2023 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2024 self.assertTrue(pair.readable()) 2025 2026 def test_writeable(self): 2027 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2028 self.assertTrue(pair.writable()) 2029 2030 def test_seekable(self): 2031 # BufferedRWPairs are never seekable, even if their readers and writers 2032 # are. 2033 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2034 self.assertFalse(pair.seekable()) 2035 2036 # .flush() is delegated to the underlying writer object and has been 2037 # tested in the test_write method. 2038 2039 def test_close_and_closed(self): 2040 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2041 self.assertFalse(pair.closed) 2042 pair.close() 2043 self.assertTrue(pair.closed) 2044 2045 def test_reader_close_error_on_close(self): 2046 def reader_close(): 2047 reader_non_existing 2048 reader = self.MockRawIO() 2049 reader.close = reader_close 2050 writer = self.MockRawIO() 2051 pair = self.tp(reader, writer) 2052 with self.assertRaises(NameError) as err: 2053 pair.close() 2054 self.assertIn('reader_non_existing', str(err.exception)) 2055 self.assertTrue(pair.closed) 2056 self.assertFalse(reader.closed) 2057 self.assertTrue(writer.closed) 2058 2059 # Silence destructor error 2060 reader.close = lambda: None 2061 2062 def test_writer_close_error_on_close(self): 2063 def writer_close(): 2064 writer_non_existing 2065 reader = self.MockRawIO() 2066 writer = self.MockRawIO() 2067 writer.close = writer_close 2068 pair = self.tp(reader, writer) 2069 with self.assertRaises(NameError) as err: 2070 pair.close() 2071 self.assertIn('writer_non_existing', str(err.exception)) 2072 self.assertFalse(pair.closed) 2073 self.assertTrue(reader.closed) 2074 self.assertFalse(writer.closed) 2075 2076 # Silence destructor error 2077 writer.close = lambda: None 2078 writer = None 2079 2080 # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception 2081 with support.catch_unraisable_exception(): 2082 # Ignore BufferedRWPair unraisable exception 2083 with support.catch_unraisable_exception(): 2084 pair = None 2085 support.gc_collect() 2086 support.gc_collect() 2087 2088 def test_reader_writer_close_error_on_close(self): 2089 def reader_close(): 2090 reader_non_existing 2091 def writer_close(): 2092 writer_non_existing 2093 reader = self.MockRawIO() 2094 reader.close = reader_close 2095 writer = self.MockRawIO() 2096 writer.close = writer_close 2097 pair = self.tp(reader, writer) 2098 with self.assertRaises(NameError) as err: 2099 pair.close() 2100 self.assertIn('reader_non_existing', str(err.exception)) 2101 self.assertIsInstance(err.exception.__context__, NameError) 2102 self.assertIn('writer_non_existing', str(err.exception.__context__)) 2103 self.assertFalse(pair.closed) 2104 self.assertFalse(reader.closed) 2105 self.assertFalse(writer.closed) 2106 2107 # Silence destructor error 2108 reader.close = lambda: None 2109 writer.close = lambda: None 2110 2111 def test_isatty(self): 2112 class SelectableIsAtty(MockRawIO): 2113 def __init__(self, isatty): 2114 MockRawIO.__init__(self) 2115 self._isatty = isatty 2116 2117 def isatty(self): 2118 return self._isatty 2119 2120 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False)) 2121 self.assertFalse(pair.isatty()) 2122 2123 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False)) 2124 self.assertTrue(pair.isatty()) 2125 2126 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True)) 2127 self.assertTrue(pair.isatty()) 2128 2129 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True)) 2130 self.assertTrue(pair.isatty()) 2131 2132 def test_weakref_clearing(self): 2133 brw = self.tp(self.MockRawIO(), self.MockRawIO()) 2134 ref = weakref.ref(brw) 2135 brw = None 2136 ref = None # Shouldn't segfault. 2137 2138class CBufferedRWPairTest(BufferedRWPairTest): 2139 tp = io.BufferedRWPair 2140 2141class PyBufferedRWPairTest(BufferedRWPairTest): 2142 tp = pyio.BufferedRWPair 2143 2144 2145class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): 2146 read_mode = "rb+" 2147 write_mode = "wb+" 2148 2149 def test_constructor(self): 2150 BufferedReaderTest.test_constructor(self) 2151 BufferedWriterTest.test_constructor(self) 2152 2153 def test_uninitialized(self): 2154 BufferedReaderTest.test_uninitialized(self) 2155 BufferedWriterTest.test_uninitialized(self) 2156 2157 def test_read_and_write(self): 2158 raw = self.MockRawIO((b"asdf", b"ghjk")) 2159 rw = self.tp(raw, 8) 2160 2161 self.assertEqual(b"as", rw.read(2)) 2162 rw.write(b"ddd") 2163 rw.write(b"eee") 2164 self.assertFalse(raw._write_stack) # Buffer writes 2165 self.assertEqual(b"ghjk", rw.read()) 2166 self.assertEqual(b"dddeee", raw._write_stack[0]) 2167 2168 def test_seek_and_tell(self): 2169 raw = self.BytesIO(b"asdfghjkl") 2170 rw = self.tp(raw) 2171 2172 self.assertEqual(b"as", rw.read(2)) 2173 self.assertEqual(2, rw.tell()) 2174 rw.seek(0, 0) 2175 self.assertEqual(b"asdf", rw.read(4)) 2176 2177 rw.write(b"123f") 2178 rw.seek(0, 0) 2179 self.assertEqual(b"asdf123fl", rw.read()) 2180 self.assertEqual(9, rw.tell()) 2181 rw.seek(-4, 2) 2182 self.assertEqual(5, rw.tell()) 2183 rw.seek(2, 1) 2184 self.assertEqual(7, rw.tell()) 2185 self.assertEqual(b"fl", rw.read(11)) 2186 rw.flush() 2187 self.assertEqual(b"asdf123fl", raw.getvalue()) 2188 2189 self.assertRaises(TypeError, rw.seek, 0.0) 2190 2191 def check_flush_and_read(self, read_func): 2192 raw = self.BytesIO(b"abcdefghi") 2193 bufio = self.tp(raw) 2194 2195 self.assertEqual(b"ab", read_func(bufio, 2)) 2196 bufio.write(b"12") 2197 self.assertEqual(b"ef", read_func(bufio, 2)) 2198 self.assertEqual(6, bufio.tell()) 2199 bufio.flush() 2200 self.assertEqual(6, bufio.tell()) 2201 self.assertEqual(b"ghi", read_func(bufio)) 2202 raw.seek(0, 0) 2203 raw.write(b"XYZ") 2204 # flush() resets the read buffer 2205 bufio.flush() 2206 bufio.seek(0, 0) 2207 self.assertEqual(b"XYZ", read_func(bufio, 3)) 2208 2209 def test_flush_and_read(self): 2210 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args)) 2211 2212 def test_flush_and_readinto(self): 2213 def _readinto(bufio, n=-1): 2214 b = bytearray(n if n >= 0 else 9999) 2215 n = bufio.readinto(b) 2216 return bytes(b[:n]) 2217 self.check_flush_and_read(_readinto) 2218 2219 def test_flush_and_peek(self): 2220 def _peek(bufio, n=-1): 2221 # This relies on the fact that the buffer can contain the whole 2222 # raw stream, otherwise peek() can return less. 2223 b = bufio.peek(n) 2224 if n != -1: 2225 b = b[:n] 2226 bufio.seek(len(b), 1) 2227 return b 2228 self.check_flush_and_read(_peek) 2229 2230 def test_flush_and_write(self): 2231 raw = self.BytesIO(b"abcdefghi") 2232 bufio = self.tp(raw) 2233 2234 bufio.write(b"123") 2235 bufio.flush() 2236 bufio.write(b"45") 2237 bufio.flush() 2238 bufio.seek(0, 0) 2239 self.assertEqual(b"12345fghi", raw.getvalue()) 2240 self.assertEqual(b"12345fghi", bufio.read()) 2241 2242 def test_threads(self): 2243 BufferedReaderTest.test_threads(self) 2244 BufferedWriterTest.test_threads(self) 2245 2246 def test_writes_and_peek(self): 2247 def _peek(bufio): 2248 bufio.peek(1) 2249 self.check_writes(_peek) 2250 def _peek(bufio): 2251 pos = bufio.tell() 2252 bufio.seek(-1, 1) 2253 bufio.peek(1) 2254 bufio.seek(pos, 0) 2255 self.check_writes(_peek) 2256 2257 def test_writes_and_reads(self): 2258 def _read(bufio): 2259 bufio.seek(-1, 1) 2260 bufio.read(1) 2261 self.check_writes(_read) 2262 2263 def test_writes_and_read1s(self): 2264 def _read1(bufio): 2265 bufio.seek(-1, 1) 2266 bufio.read1(1) 2267 self.check_writes(_read1) 2268 2269 def test_writes_and_readintos(self): 2270 def _read(bufio): 2271 bufio.seek(-1, 1) 2272 bufio.readinto(bytearray(1)) 2273 self.check_writes(_read) 2274 2275 def test_write_after_readahead(self): 2276 # Issue #6629: writing after the buffer was filled by readahead should 2277 # first rewind the raw stream. 2278 for overwrite_size in [1, 5]: 2279 raw = self.BytesIO(b"A" * 10) 2280 bufio = self.tp(raw, 4) 2281 # Trigger readahead 2282 self.assertEqual(bufio.read(1), b"A") 2283 self.assertEqual(bufio.tell(), 1) 2284 # Overwriting should rewind the raw stream if it needs so 2285 bufio.write(b"B" * overwrite_size) 2286 self.assertEqual(bufio.tell(), overwrite_size + 1) 2287 # If the write size was smaller than the buffer size, flush() and 2288 # check that rewind happens. 2289 bufio.flush() 2290 self.assertEqual(bufio.tell(), overwrite_size + 1) 2291 s = raw.getvalue() 2292 self.assertEqual(s, 2293 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size)) 2294 2295 def test_write_rewind_write(self): 2296 # Various combinations of reading / writing / seeking backwards / writing again 2297 def mutate(bufio, pos1, pos2): 2298 assert pos2 >= pos1 2299 # Fill the buffer 2300 bufio.seek(pos1) 2301 bufio.read(pos2 - pos1) 2302 bufio.write(b'\x02') 2303 # This writes earlier than the previous write, but still inside 2304 # the buffer. 2305 bufio.seek(pos1) 2306 bufio.write(b'\x01') 2307 2308 b = b"\x80\x81\x82\x83\x84" 2309 for i in range(0, len(b)): 2310 for j in range(i, len(b)): 2311 raw = self.BytesIO(b) 2312 bufio = self.tp(raw, 100) 2313 mutate(bufio, i, j) 2314 bufio.flush() 2315 expected = bytearray(b) 2316 expected[j] = 2 2317 expected[i] = 1 2318 self.assertEqual(raw.getvalue(), expected, 2319 "failed result for i=%d, j=%d" % (i, j)) 2320 2321 def test_truncate_after_read_or_write(self): 2322 raw = self.BytesIO(b"A" * 10) 2323 bufio = self.tp(raw, 100) 2324 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled 2325 self.assertEqual(bufio.truncate(), 2) 2326 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases 2327 self.assertEqual(bufio.truncate(), 4) 2328 2329 def test_misbehaved_io(self): 2330 BufferedReaderTest.test_misbehaved_io(self) 2331 BufferedWriterTest.test_misbehaved_io(self) 2332 2333 def test_interleaved_read_write(self): 2334 # Test for issue #12213 2335 with self.BytesIO(b'abcdefgh') as raw: 2336 with self.tp(raw, 100) as f: 2337 f.write(b"1") 2338 self.assertEqual(f.read(1), b'b') 2339 f.write(b'2') 2340 self.assertEqual(f.read1(1), b'd') 2341 f.write(b'3') 2342 buf = bytearray(1) 2343 f.readinto(buf) 2344 self.assertEqual(buf, b'f') 2345 f.write(b'4') 2346 self.assertEqual(f.peek(1), b'h') 2347 f.flush() 2348 self.assertEqual(raw.getvalue(), b'1b2d3f4h') 2349 2350 with self.BytesIO(b'abc') as raw: 2351 with self.tp(raw, 100) as f: 2352 self.assertEqual(f.read(1), b'a') 2353 f.write(b"2") 2354 self.assertEqual(f.read(1), b'c') 2355 f.flush() 2356 self.assertEqual(raw.getvalue(), b'a2c') 2357 2358 def test_interleaved_readline_write(self): 2359 with self.BytesIO(b'ab\ncdef\ng\n') as raw: 2360 with self.tp(raw) as f: 2361 f.write(b'1') 2362 self.assertEqual(f.readline(), b'b\n') 2363 f.write(b'2') 2364 self.assertEqual(f.readline(), b'def\n') 2365 f.write(b'3') 2366 self.assertEqual(f.readline(), b'\n') 2367 f.flush() 2368 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n') 2369 2370 # You can't construct a BufferedRandom over a non-seekable stream. 2371 test_unseekable = None 2372 2373 2374class CBufferedRandomTest(BufferedRandomTest, SizeofTest): 2375 tp = io.BufferedRandom 2376 2377 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing " 2378 "instead of returning NULL for malloc failure.") 2379 def test_constructor(self): 2380 BufferedRandomTest.test_constructor(self) 2381 # The allocation can succeed on 32-bit builds, e.g. with more 2382 # than 2 GiB RAM and a 64-bit kernel. 2383 if sys.maxsize > 0x7FFFFFFF: 2384 rawio = self.MockRawIO() 2385 bufio = self.tp(rawio) 2386 self.assertRaises((OverflowError, MemoryError, ValueError), 2387 bufio.__init__, rawio, sys.maxsize) 2388 2389 def test_garbage_collection(self): 2390 CBufferedReaderTest.test_garbage_collection(self) 2391 CBufferedWriterTest.test_garbage_collection(self) 2392 2393 def test_args_error(self): 2394 # Issue #17275 2395 with self.assertRaisesRegex(TypeError, "BufferedRandom"): 2396 self.tp(io.BytesIO(), 1024, 1024, 1024) 2397 2398 2399class PyBufferedRandomTest(BufferedRandomTest): 2400 tp = pyio.BufferedRandom 2401 2402 2403# To fully exercise seek/tell, the StatefulIncrementalDecoder has these 2404# properties: 2405# - A single output character can correspond to many bytes of input. 2406# - The number of input bytes to complete the character can be 2407# undetermined until the last input byte is received. 2408# - The number of input bytes can vary depending on previous input. 2409# - A single input byte can correspond to many characters of output. 2410# - The number of output characters can be undetermined until the 2411# last input byte is received. 2412# - The number of output characters can vary depending on previous input. 2413 2414class StatefulIncrementalDecoder(codecs.IncrementalDecoder): 2415 """ 2416 For testing seek/tell behavior with a stateful, buffering decoder. 2417 2418 Input is a sequence of words. Words may be fixed-length (length set 2419 by input) or variable-length (period-terminated). In variable-length 2420 mode, extra periods are ignored. Possible words are: 2421 - 'i' followed by a number sets the input length, I (maximum 99). 2422 When I is set to 0, words are space-terminated. 2423 - 'o' followed by a number sets the output length, O (maximum 99). 2424 - Any other word is converted into a word followed by a period on 2425 the output. The output word consists of the input word truncated 2426 or padded out with hyphens to make its length equal to O. If O 2427 is 0, the word is output verbatim without truncating or padding. 2428 I and O are initially set to 1. When I changes, any buffered input is 2429 re-scanned according to the new I. EOF also terminates the last word. 2430 """ 2431 2432 def __init__(self, errors='strict'): 2433 codecs.IncrementalDecoder.__init__(self, errors) 2434 self.reset() 2435 2436 def __repr__(self): 2437 return '<SID %x>' % id(self) 2438 2439 def reset(self): 2440 self.i = 1 2441 self.o = 1 2442 self.buffer = bytearray() 2443 2444 def getstate(self): 2445 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset() 2446 return bytes(self.buffer), i*100 + o 2447 2448 def setstate(self, state): 2449 buffer, io = state 2450 self.buffer = bytearray(buffer) 2451 i, o = divmod(io, 100) 2452 self.i, self.o = i ^ 1, o ^ 1 2453 2454 def decode(self, input, final=False): 2455 output = '' 2456 for b in input: 2457 if self.i == 0: # variable-length, terminated with period 2458 if b == ord('.'): 2459 if self.buffer: 2460 output += self.process_word() 2461 else: 2462 self.buffer.append(b) 2463 else: # fixed-length, terminate after self.i bytes 2464 self.buffer.append(b) 2465 if len(self.buffer) == self.i: 2466 output += self.process_word() 2467 if final and self.buffer: # EOF terminates the last word 2468 output += self.process_word() 2469 return output 2470 2471 def process_word(self): 2472 output = '' 2473 if self.buffer[0] == ord('i'): 2474 self.i = min(99, int(self.buffer[1:] or 0)) # set input length 2475 elif self.buffer[0] == ord('o'): 2476 self.o = min(99, int(self.buffer[1:] or 0)) # set output length 2477 else: 2478 output = self.buffer.decode('ascii') 2479 if len(output) < self.o: 2480 output += '-'*self.o # pad out with hyphens 2481 if self.o: 2482 output = output[:self.o] # truncate to output length 2483 output += '.' 2484 self.buffer = bytearray() 2485 return output 2486 2487 codecEnabled = False 2488 2489 @classmethod 2490 def lookupTestDecoder(cls, name): 2491 if cls.codecEnabled and name == 'test_decoder': 2492 latin1 = codecs.lookup('latin-1') 2493 return codecs.CodecInfo( 2494 name='test_decoder', encode=latin1.encode, decode=None, 2495 incrementalencoder=None, 2496 streamreader=None, streamwriter=None, 2497 incrementaldecoder=cls) 2498 2499# Register the previous decoder for testing. 2500# Disabled by default, tests will enable it. 2501codecs.register(StatefulIncrementalDecoder.lookupTestDecoder) 2502 2503 2504class StatefulIncrementalDecoderTest(unittest.TestCase): 2505 """ 2506 Make sure the StatefulIncrementalDecoder actually works. 2507 """ 2508 2509 test_cases = [ 2510 # I=1, O=1 (fixed-length input == fixed-length output) 2511 (b'abcd', False, 'a.b.c.d.'), 2512 # I=0, O=0 (variable-length input, variable-length output) 2513 (b'oiabcd', True, 'abcd.'), 2514 # I=0, O=0 (should ignore extra periods) 2515 (b'oi...abcd...', True, 'abcd.'), 2516 # I=0, O=6 (variable-length input, fixed-length output) 2517 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'), 2518 # I=2, O=6 (fixed-length input < fixed-length output) 2519 (b'i.i2.o6xyz', True, 'xy----.z-----.'), 2520 # I=6, O=3 (fixed-length input > fixed-length output) 2521 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'), 2522 # I=0, then 3; O=29, then 15 (with longer output) 2523 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True, 2524 'a----------------------------.' + 2525 'b----------------------------.' + 2526 'cde--------------------------.' + 2527 'abcdefghijabcde.' + 2528 'a.b------------.' + 2529 '.c.------------.' + 2530 'd.e------------.' + 2531 'k--------------.' + 2532 'l--------------.' + 2533 'm--------------.') 2534 ] 2535 2536 def test_decoder(self): 2537 # Try a few one-shot test cases. 2538 for input, eof, output in self.test_cases: 2539 d = StatefulIncrementalDecoder() 2540 self.assertEqual(d.decode(input, eof), output) 2541 2542 # Also test an unfinished decode, followed by forcing EOF. 2543 d = StatefulIncrementalDecoder() 2544 self.assertEqual(d.decode(b'oiabcd'), '') 2545 self.assertEqual(d.decode(b'', 1), 'abcd.') 2546 2547class TextIOWrapperTest(unittest.TestCase): 2548 2549 def setUp(self): 2550 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" 2551 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") 2552 support.unlink(support.TESTFN) 2553 2554 def tearDown(self): 2555 support.unlink(support.TESTFN) 2556 2557 def test_constructor(self): 2558 r = self.BytesIO(b"\xc3\xa9\n\n") 2559 b = self.BufferedReader(r, 1000) 2560 t = self.TextIOWrapper(b) 2561 t.__init__(b, encoding="latin-1", newline="\r\n") 2562 self.assertEqual(t.encoding, "latin-1") 2563 self.assertEqual(t.line_buffering, False) 2564 t.__init__(b, encoding="utf-8", line_buffering=True) 2565 self.assertEqual(t.encoding, "utf-8") 2566 self.assertEqual(t.line_buffering, True) 2567 self.assertEqual("\xe9\n", t.readline()) 2568 self.assertRaises(TypeError, t.__init__, b, newline=42) 2569 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 2570 2571 def test_uninitialized(self): 2572 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2573 del t 2574 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2575 self.assertRaises(Exception, repr, t) 2576 self.assertRaisesRegex((ValueError, AttributeError), 2577 'uninitialized|has no attribute', 2578 t.read, 0) 2579 t.__init__(self.MockRawIO()) 2580 self.assertEqual(t.read(0), '') 2581 2582 def test_non_text_encoding_codecs_are_rejected(self): 2583 # Ensure the constructor complains if passed a codec that isn't 2584 # marked as a text encoding 2585 # http://bugs.python.org/issue20404 2586 r = self.BytesIO() 2587 b = self.BufferedWriter(r) 2588 with self.assertRaisesRegex(LookupError, "is not a text encoding"): 2589 self.TextIOWrapper(b, encoding="hex") 2590 2591 def test_detach(self): 2592 r = self.BytesIO() 2593 b = self.BufferedWriter(r) 2594 t = self.TextIOWrapper(b) 2595 self.assertIs(t.detach(), b) 2596 2597 t = self.TextIOWrapper(b, encoding="ascii") 2598 t.write("howdy") 2599 self.assertFalse(r.getvalue()) 2600 t.detach() 2601 self.assertEqual(r.getvalue(), b"howdy") 2602 self.assertRaises(ValueError, t.detach) 2603 2604 # Operations independent of the detached stream should still work 2605 repr(t) 2606 self.assertEqual(t.encoding, "ascii") 2607 self.assertEqual(t.errors, "strict") 2608 self.assertFalse(t.line_buffering) 2609 self.assertFalse(t.write_through) 2610 2611 def test_repr(self): 2612 raw = self.BytesIO("hello".encode("utf-8")) 2613 b = self.BufferedReader(raw) 2614 t = self.TextIOWrapper(b, encoding="utf-8") 2615 modname = self.TextIOWrapper.__module__ 2616 self.assertRegex(repr(t), 2617 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname) 2618 raw.name = "dummy" 2619 self.assertRegex(repr(t), 2620 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname) 2621 t.mode = "r" 2622 self.assertRegex(repr(t), 2623 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname) 2624 raw.name = b"dummy" 2625 self.assertRegex(repr(t), 2626 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname) 2627 2628 t.buffer.detach() 2629 repr(t) # Should not raise an exception 2630 2631 def test_recursive_repr(self): 2632 # Issue #25455 2633 raw = self.BytesIO() 2634 t = self.TextIOWrapper(raw) 2635 with support.swap_attr(raw, 'name', t): 2636 try: 2637 repr(t) # Should not crash 2638 except RuntimeError: 2639 pass 2640 2641 def test_line_buffering(self): 2642 r = self.BytesIO() 2643 b = self.BufferedWriter(r, 1000) 2644 t = self.TextIOWrapper(b, newline="\n", line_buffering=True) 2645 t.write("X") 2646 self.assertEqual(r.getvalue(), b"") # No flush happened 2647 t.write("Y\nZ") 2648 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed 2649 t.write("A\rB") 2650 self.assertEqual(r.getvalue(), b"XY\nZA\rB") 2651 2652 def test_reconfigure_line_buffering(self): 2653 r = self.BytesIO() 2654 b = self.BufferedWriter(r, 1000) 2655 t = self.TextIOWrapper(b, newline="\n", line_buffering=False) 2656 t.write("AB\nC") 2657 self.assertEqual(r.getvalue(), b"") 2658 2659 t.reconfigure(line_buffering=True) # implicit flush 2660 self.assertEqual(r.getvalue(), b"AB\nC") 2661 t.write("DEF\nG") 2662 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") 2663 t.write("H") 2664 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") 2665 t.reconfigure(line_buffering=False) # implicit flush 2666 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") 2667 t.write("IJ") 2668 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") 2669 2670 # Keeping default value 2671 t.reconfigure() 2672 t.reconfigure(line_buffering=None) 2673 self.assertEqual(t.line_buffering, False) 2674 t.reconfigure(line_buffering=True) 2675 t.reconfigure() 2676 t.reconfigure(line_buffering=None) 2677 self.assertEqual(t.line_buffering, True) 2678 2679 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled") 2680 def test_default_encoding(self): 2681 old_environ = dict(os.environ) 2682 try: 2683 # try to get a user preferred encoding different than the current 2684 # locale encoding to check that TextIOWrapper() uses the current 2685 # locale encoding and not the user preferred encoding 2686 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'): 2687 if key in os.environ: 2688 del os.environ[key] 2689 2690 current_locale_encoding = locale.getpreferredencoding(False) 2691 b = self.BytesIO() 2692 t = self.TextIOWrapper(b) 2693 self.assertEqual(t.encoding, current_locale_encoding) 2694 finally: 2695 os.environ.clear() 2696 os.environ.update(old_environ) 2697 2698 @support.cpython_only 2699 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled") 2700 def test_device_encoding(self): 2701 # Issue 15989 2702 import _testcapi 2703 b = self.BytesIO() 2704 b.fileno = lambda: _testcapi.INT_MAX + 1 2705 self.assertRaises(OverflowError, self.TextIOWrapper, b) 2706 b.fileno = lambda: _testcapi.UINT_MAX + 1 2707 self.assertRaises(OverflowError, self.TextIOWrapper, b) 2708 2709 def test_encoding(self): 2710 # Check the encoding attribute is always set, and valid 2711 b = self.BytesIO() 2712 t = self.TextIOWrapper(b, encoding="utf-8") 2713 self.assertEqual(t.encoding, "utf-8") 2714 t = self.TextIOWrapper(b) 2715 self.assertIsNotNone(t.encoding) 2716 codecs.lookup(t.encoding) 2717 2718 def test_encoding_errors_reading(self): 2719 # (1) default 2720 b = self.BytesIO(b"abc\n\xff\n") 2721 t = self.TextIOWrapper(b, encoding="ascii") 2722 self.assertRaises(UnicodeError, t.read) 2723 # (2) explicit strict 2724 b = self.BytesIO(b"abc\n\xff\n") 2725 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2726 self.assertRaises(UnicodeError, t.read) 2727 # (3) ignore 2728 b = self.BytesIO(b"abc\n\xff\n") 2729 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") 2730 self.assertEqual(t.read(), "abc\n\n") 2731 # (4) replace 2732 b = self.BytesIO(b"abc\n\xff\n") 2733 t = self.TextIOWrapper(b, encoding="ascii", errors="replace") 2734 self.assertEqual(t.read(), "abc\n\ufffd\n") 2735 2736 def test_encoding_errors_writing(self): 2737 # (1) default 2738 b = self.BytesIO() 2739 t = self.TextIOWrapper(b, encoding="ascii") 2740 self.assertRaises(UnicodeError, t.write, "\xff") 2741 # (2) explicit strict 2742 b = self.BytesIO() 2743 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2744 self.assertRaises(UnicodeError, t.write, "\xff") 2745 # (3) ignore 2746 b = self.BytesIO() 2747 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", 2748 newline="\n") 2749 t.write("abc\xffdef\n") 2750 t.flush() 2751 self.assertEqual(b.getvalue(), b"abcdef\n") 2752 # (4) replace 2753 b = self.BytesIO() 2754 t = self.TextIOWrapper(b, encoding="ascii", errors="replace", 2755 newline="\n") 2756 t.write("abc\xffdef\n") 2757 t.flush() 2758 self.assertEqual(b.getvalue(), b"abc?def\n") 2759 2760 def test_newlines(self): 2761 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] 2762 2763 tests = [ 2764 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], 2765 [ '', input_lines ], 2766 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ], 2767 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], 2768 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], 2769 ] 2770 encodings = ( 2771 'utf-8', 'latin-1', 2772 'utf-16', 'utf-16-le', 'utf-16-be', 2773 'utf-32', 'utf-32-le', 'utf-32-be', 2774 ) 2775 2776 # Try a range of buffer sizes to test the case where \r is the last 2777 # character in TextIOWrapper._pending_line. 2778 for encoding in encodings: 2779 # XXX: str.encode() should return bytes 2780 data = bytes(''.join(input_lines).encode(encoding)) 2781 for do_reads in (False, True): 2782 for bufsize in range(1, 10): 2783 for newline, exp_lines in tests: 2784 bufio = self.BufferedReader(self.BytesIO(data), bufsize) 2785 textio = self.TextIOWrapper(bufio, newline=newline, 2786 encoding=encoding) 2787 if do_reads: 2788 got_lines = [] 2789 while True: 2790 c2 = textio.read(2) 2791 if c2 == '': 2792 break 2793 self.assertEqual(len(c2), 2) 2794 got_lines.append(c2 + textio.readline()) 2795 else: 2796 got_lines = list(textio) 2797 2798 for got_line, exp_line in zip(got_lines, exp_lines): 2799 self.assertEqual(got_line, exp_line) 2800 self.assertEqual(len(got_lines), len(exp_lines)) 2801 2802 def test_newlines_input(self): 2803 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" 2804 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") 2805 for newline, expected in [ 2806 (None, normalized.decode("ascii").splitlines(keepends=True)), 2807 ("", testdata.decode("ascii").splitlines(keepends=True)), 2808 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2809 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2810 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), 2811 ]: 2812 buf = self.BytesIO(testdata) 2813 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2814 self.assertEqual(txt.readlines(), expected) 2815 txt.seek(0) 2816 self.assertEqual(txt.read(), "".join(expected)) 2817 2818 def test_newlines_output(self): 2819 testdict = { 2820 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2821 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2822 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", 2823 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", 2824 } 2825 tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) 2826 for newline, expected in tests: 2827 buf = self.BytesIO() 2828 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2829 txt.write("AAA\nB") 2830 txt.write("BB\nCCC\n") 2831 txt.write("X\rY\r\nZ") 2832 txt.flush() 2833 self.assertEqual(buf.closed, False) 2834 self.assertEqual(buf.getvalue(), expected) 2835 2836 def test_destructor(self): 2837 l = [] 2838 base = self.BytesIO 2839 class MyBytesIO(base): 2840 def close(self): 2841 l.append(self.getvalue()) 2842 base.close(self) 2843 b = MyBytesIO() 2844 t = self.TextIOWrapper(b, encoding="ascii") 2845 t.write("abc") 2846 del t 2847 support.gc_collect() 2848 self.assertEqual([b"abc"], l) 2849 2850 def test_override_destructor(self): 2851 record = [] 2852 class MyTextIO(self.TextIOWrapper): 2853 def __del__(self): 2854 record.append(1) 2855 try: 2856 f = super().__del__ 2857 except AttributeError: 2858 pass 2859 else: 2860 f() 2861 def close(self): 2862 record.append(2) 2863 super().close() 2864 def flush(self): 2865 record.append(3) 2866 super().flush() 2867 b = self.BytesIO() 2868 t = MyTextIO(b, encoding="ascii") 2869 del t 2870 support.gc_collect() 2871 self.assertEqual(record, [1, 2, 3]) 2872 2873 def test_error_through_destructor(self): 2874 # Test that the exception state is not modified by a destructor, 2875 # even if close() fails. 2876 rawio = self.CloseFailureIO() 2877 with support.catch_unraisable_exception() as cm: 2878 with self.assertRaises(AttributeError): 2879 self.TextIOWrapper(rawio).xyzzy 2880 2881 if not IOBASE_EMITS_UNRAISABLE: 2882 self.assertIsNone(cm.unraisable) 2883 elif cm.unraisable is not None: 2884 self.assertEqual(cm.unraisable.exc_type, OSError) 2885 2886 # Systematic tests of the text I/O API 2887 2888 def test_basic_io(self): 2889 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): 2890 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le": 2891 f = self.open(support.TESTFN, "w+", encoding=enc) 2892 f._CHUNK_SIZE = chunksize 2893 self.assertEqual(f.write("abc"), 3) 2894 f.close() 2895 f = self.open(support.TESTFN, "r+", encoding=enc) 2896 f._CHUNK_SIZE = chunksize 2897 self.assertEqual(f.tell(), 0) 2898 self.assertEqual(f.read(), "abc") 2899 cookie = f.tell() 2900 self.assertEqual(f.seek(0), 0) 2901 self.assertEqual(f.read(None), "abc") 2902 f.seek(0) 2903 self.assertEqual(f.read(2), "ab") 2904 self.assertEqual(f.read(1), "c") 2905 self.assertEqual(f.read(1), "") 2906 self.assertEqual(f.read(), "") 2907 self.assertEqual(f.tell(), cookie) 2908 self.assertEqual(f.seek(0), 0) 2909 self.assertEqual(f.seek(0, 2), cookie) 2910 self.assertEqual(f.write("def"), 3) 2911 self.assertEqual(f.seek(cookie), cookie) 2912 self.assertEqual(f.read(), "def") 2913 if enc.startswith("utf"): 2914 self.multi_line_test(f, enc) 2915 f.close() 2916 2917 def multi_line_test(self, f, enc): 2918 f.seek(0) 2919 f.truncate() 2920 sample = "s\xff\u0fff\uffff" 2921 wlines = [] 2922 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): 2923 chars = [] 2924 for i in range(size): 2925 chars.append(sample[i % len(sample)]) 2926 line = "".join(chars) + "\n" 2927 wlines.append((f.tell(), line)) 2928 f.write(line) 2929 f.seek(0) 2930 rlines = [] 2931 while True: 2932 pos = f.tell() 2933 line = f.readline() 2934 if not line: 2935 break 2936 rlines.append((pos, line)) 2937 self.assertEqual(rlines, wlines) 2938 2939 def test_telling(self): 2940 f = self.open(support.TESTFN, "w+", encoding="utf-8") 2941 p0 = f.tell() 2942 f.write("\xff\n") 2943 p1 = f.tell() 2944 f.write("\xff\n") 2945 p2 = f.tell() 2946 f.seek(0) 2947 self.assertEqual(f.tell(), p0) 2948 self.assertEqual(f.readline(), "\xff\n") 2949 self.assertEqual(f.tell(), p1) 2950 self.assertEqual(f.readline(), "\xff\n") 2951 self.assertEqual(f.tell(), p2) 2952 f.seek(0) 2953 for line in f: 2954 self.assertEqual(line, "\xff\n") 2955 self.assertRaises(OSError, f.tell) 2956 self.assertEqual(f.tell(), p2) 2957 f.close() 2958 2959 def test_seeking(self): 2960 chunk_size = _default_chunk_size() 2961 prefix_size = chunk_size - 2 2962 u_prefix = "a" * prefix_size 2963 prefix = bytes(u_prefix.encode("utf-8")) 2964 self.assertEqual(len(u_prefix), len(prefix)) 2965 u_suffix = "\u8888\n" 2966 suffix = bytes(u_suffix.encode("utf-8")) 2967 line = prefix + suffix 2968 with self.open(support.TESTFN, "wb") as f: 2969 f.write(line*2) 2970 with self.open(support.TESTFN, "r", encoding="utf-8") as f: 2971 s = f.read(prefix_size) 2972 self.assertEqual(s, str(prefix, "ascii")) 2973 self.assertEqual(f.tell(), prefix_size) 2974 self.assertEqual(f.readline(), u_suffix) 2975 2976 def test_seeking_too(self): 2977 # Regression test for a specific bug 2978 data = b'\xe0\xbf\xbf\n' 2979 with self.open(support.TESTFN, "wb") as f: 2980 f.write(data) 2981 with self.open(support.TESTFN, "r", encoding="utf-8") as f: 2982 f._CHUNK_SIZE # Just test that it exists 2983 f._CHUNK_SIZE = 2 2984 f.readline() 2985 f.tell() 2986 2987 def test_seek_and_tell(self): 2988 #Test seek/tell using the StatefulIncrementalDecoder. 2989 # Make test faster by doing smaller seeks 2990 CHUNK_SIZE = 128 2991 2992 def test_seek_and_tell_with_data(data, min_pos=0): 2993 """Tell/seek to various points within a data stream and ensure 2994 that the decoded data returned by read() is consistent.""" 2995 f = self.open(support.TESTFN, 'wb') 2996 f.write(data) 2997 f.close() 2998 f = self.open(support.TESTFN, encoding='test_decoder') 2999 f._CHUNK_SIZE = CHUNK_SIZE 3000 decoded = f.read() 3001 f.close() 3002 3003 for i in range(min_pos, len(decoded) + 1): # seek positions 3004 for j in [1, 5, len(decoded) - i]: # read lengths 3005 f = self.open(support.TESTFN, encoding='test_decoder') 3006 self.assertEqual(f.read(i), decoded[:i]) 3007 cookie = f.tell() 3008 self.assertEqual(f.read(j), decoded[i:i + j]) 3009 f.seek(cookie) 3010 self.assertEqual(f.read(), decoded[i:]) 3011 f.close() 3012 3013 # Enable the test decoder. 3014 StatefulIncrementalDecoder.codecEnabled = 1 3015 3016 # Run the tests. 3017 try: 3018 # Try each test case. 3019 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 3020 test_seek_and_tell_with_data(input) 3021 3022 # Position each test case so that it crosses a chunk boundary. 3023 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 3024 offset = CHUNK_SIZE - len(input)//2 3025 prefix = b'.'*offset 3026 # Don't bother seeking into the prefix (takes too long). 3027 min_pos = offset*2 3028 test_seek_and_tell_with_data(prefix + input, min_pos) 3029 3030 # Ensure our test decoder won't interfere with subsequent tests. 3031 finally: 3032 StatefulIncrementalDecoder.codecEnabled = 0 3033 3034 def test_multibyte_seek_and_tell(self): 3035 f = self.open(support.TESTFN, "w", encoding="euc_jp") 3036 f.write("AB\n\u3046\u3048\n") 3037 f.close() 3038 3039 f = self.open(support.TESTFN, "r", encoding="euc_jp") 3040 self.assertEqual(f.readline(), "AB\n") 3041 p0 = f.tell() 3042 self.assertEqual(f.readline(), "\u3046\u3048\n") 3043 p1 = f.tell() 3044 f.seek(p0) 3045 self.assertEqual(f.readline(), "\u3046\u3048\n") 3046 self.assertEqual(f.tell(), p1) 3047 f.close() 3048 3049 def test_seek_with_encoder_state(self): 3050 f = self.open(support.TESTFN, "w", encoding="euc_jis_2004") 3051 f.write("\u00e6\u0300") 3052 p0 = f.tell() 3053 f.write("\u00e6") 3054 f.seek(p0) 3055 f.write("\u0300") 3056 f.close() 3057 3058 f = self.open(support.TESTFN, "r", encoding="euc_jis_2004") 3059 self.assertEqual(f.readline(), "\u00e6\u0300\u0300") 3060 f.close() 3061 3062 def test_encoded_writes(self): 3063 data = "1234567890" 3064 tests = ("utf-16", 3065 "utf-16-le", 3066 "utf-16-be", 3067 "utf-32", 3068 "utf-32-le", 3069 "utf-32-be") 3070 for encoding in tests: 3071 buf = self.BytesIO() 3072 f = self.TextIOWrapper(buf, encoding=encoding) 3073 # Check if the BOM is written only once (see issue1753). 3074 f.write(data) 3075 f.write(data) 3076 f.seek(0) 3077 self.assertEqual(f.read(), data * 2) 3078 f.seek(0) 3079 self.assertEqual(f.read(), data * 2) 3080 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding)) 3081 3082 def test_unreadable(self): 3083 class UnReadable(self.BytesIO): 3084 def readable(self): 3085 return False 3086 txt = self.TextIOWrapper(UnReadable()) 3087 self.assertRaises(OSError, txt.read) 3088 3089 def test_read_one_by_one(self): 3090 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB")) 3091 reads = "" 3092 while True: 3093 c = txt.read(1) 3094 if not c: 3095 break 3096 reads += c 3097 self.assertEqual(reads, "AA\nBB") 3098 3099 def test_readlines(self): 3100 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC")) 3101 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) 3102 txt.seek(0) 3103 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) 3104 txt.seek(0) 3105 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"]) 3106 3107 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. 3108 def test_read_by_chunk(self): 3109 # make sure "\r\n" straddles 128 char boundary. 3110 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB")) 3111 reads = "" 3112 while True: 3113 c = txt.read(128) 3114 if not c: 3115 break 3116 reads += c 3117 self.assertEqual(reads, "A"*127+"\nB") 3118 3119 def test_writelines(self): 3120 l = ['ab', 'cd', 'ef'] 3121 buf = self.BytesIO() 3122 txt = self.TextIOWrapper(buf) 3123 txt.writelines(l) 3124 txt.flush() 3125 self.assertEqual(buf.getvalue(), b'abcdef') 3126 3127 def test_writelines_userlist(self): 3128 l = UserList(['ab', 'cd', 'ef']) 3129 buf = self.BytesIO() 3130 txt = self.TextIOWrapper(buf) 3131 txt.writelines(l) 3132 txt.flush() 3133 self.assertEqual(buf.getvalue(), b'abcdef') 3134 3135 def test_writelines_error(self): 3136 txt = self.TextIOWrapper(self.BytesIO()) 3137 self.assertRaises(TypeError, txt.writelines, [1, 2, 3]) 3138 self.assertRaises(TypeError, txt.writelines, None) 3139 self.assertRaises(TypeError, txt.writelines, b'abc') 3140 3141 def test_issue1395_1(self): 3142 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3143 3144 # read one char at a time 3145 reads = "" 3146 while True: 3147 c = txt.read(1) 3148 if not c: 3149 break 3150 reads += c 3151 self.assertEqual(reads, self.normalized) 3152 3153 def test_issue1395_2(self): 3154 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3155 txt._CHUNK_SIZE = 4 3156 3157 reads = "" 3158 while True: 3159 c = txt.read(4) 3160 if not c: 3161 break 3162 reads += c 3163 self.assertEqual(reads, self.normalized) 3164 3165 def test_issue1395_3(self): 3166 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3167 txt._CHUNK_SIZE = 4 3168 3169 reads = txt.read(4) 3170 reads += txt.read(4) 3171 reads += txt.readline() 3172 reads += txt.readline() 3173 reads += txt.readline() 3174 self.assertEqual(reads, self.normalized) 3175 3176 def test_issue1395_4(self): 3177 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3178 txt._CHUNK_SIZE = 4 3179 3180 reads = txt.read(4) 3181 reads += txt.read() 3182 self.assertEqual(reads, self.normalized) 3183 3184 def test_issue1395_5(self): 3185 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3186 txt._CHUNK_SIZE = 4 3187 3188 reads = txt.read(4) 3189 pos = txt.tell() 3190 txt.seek(0) 3191 txt.seek(pos) 3192 self.assertEqual(txt.read(4), "BBB\n") 3193 3194 def test_issue2282(self): 3195 buffer = self.BytesIO(self.testdata) 3196 txt = self.TextIOWrapper(buffer, encoding="ascii") 3197 3198 self.assertEqual(buffer.seekable(), txt.seekable()) 3199 3200 def test_append_bom(self): 3201 # The BOM is not written again when appending to a non-empty file 3202 filename = support.TESTFN 3203 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3204 with self.open(filename, 'w', encoding=charset) as f: 3205 f.write('aaa') 3206 pos = f.tell() 3207 with self.open(filename, 'rb') as f: 3208 self.assertEqual(f.read(), 'aaa'.encode(charset)) 3209 3210 with self.open(filename, 'a', encoding=charset) as f: 3211 f.write('xxx') 3212 with self.open(filename, 'rb') as f: 3213 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 3214 3215 def test_seek_bom(self): 3216 # Same test, but when seeking manually 3217 filename = support.TESTFN 3218 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3219 with self.open(filename, 'w', encoding=charset) as f: 3220 f.write('aaa') 3221 pos = f.tell() 3222 with self.open(filename, 'r+', encoding=charset) as f: 3223 f.seek(pos) 3224 f.write('zzz') 3225 f.seek(0) 3226 f.write('bbb') 3227 with self.open(filename, 'rb') as f: 3228 self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) 3229 3230 def test_seek_append_bom(self): 3231 # Same test, but first seek to the start and then to the end 3232 filename = support.TESTFN 3233 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3234 with self.open(filename, 'w', encoding=charset) as f: 3235 f.write('aaa') 3236 with self.open(filename, 'a', encoding=charset) as f: 3237 f.seek(0) 3238 f.seek(0, self.SEEK_END) 3239 f.write('xxx') 3240 with self.open(filename, 'rb') as f: 3241 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 3242 3243 def test_errors_property(self): 3244 with self.open(support.TESTFN, "w") as f: 3245 self.assertEqual(f.errors, "strict") 3246 with self.open(support.TESTFN, "w", errors="replace") as f: 3247 self.assertEqual(f.errors, "replace") 3248 3249 @support.no_tracing 3250 def test_threads_write(self): 3251 # Issue6750: concurrent writes could duplicate data 3252 event = threading.Event() 3253 with self.open(support.TESTFN, "w", buffering=1) as f: 3254 def run(n): 3255 text = "Thread%03d\n" % n 3256 event.wait() 3257 f.write(text) 3258 threads = [threading.Thread(target=run, args=(x,)) 3259 for x in range(20)] 3260 with support.start_threads(threads, event.set): 3261 time.sleep(0.02) 3262 with self.open(support.TESTFN) as f: 3263 content = f.read() 3264 for n in range(20): 3265 self.assertEqual(content.count("Thread%03d\n" % n), 1) 3266 3267 def test_flush_error_on_close(self): 3268 # Test that text file is closed despite failed flush 3269 # and that flush() is called before file closed. 3270 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3271 closed = [] 3272 def bad_flush(): 3273 closed[:] = [txt.closed, txt.buffer.closed] 3274 raise OSError() 3275 txt.flush = bad_flush 3276 self.assertRaises(OSError, txt.close) # exception not swallowed 3277 self.assertTrue(txt.closed) 3278 self.assertTrue(txt.buffer.closed) 3279 self.assertTrue(closed) # flush() called 3280 self.assertFalse(closed[0]) # flush() called before file closed 3281 self.assertFalse(closed[1]) 3282 txt.flush = lambda: None # break reference loop 3283 3284 def test_close_error_on_close(self): 3285 buffer = self.BytesIO(self.testdata) 3286 def bad_flush(): 3287 raise OSError('flush') 3288 def bad_close(): 3289 raise OSError('close') 3290 buffer.close = bad_close 3291 txt = self.TextIOWrapper(buffer, encoding="ascii") 3292 txt.flush = bad_flush 3293 with self.assertRaises(OSError) as err: # exception not swallowed 3294 txt.close() 3295 self.assertEqual(err.exception.args, ('close',)) 3296 self.assertIsInstance(err.exception.__context__, OSError) 3297 self.assertEqual(err.exception.__context__.args, ('flush',)) 3298 self.assertFalse(txt.closed) 3299 3300 # Silence destructor error 3301 buffer.close = lambda: None 3302 txt.flush = lambda: None 3303 3304 def test_nonnormalized_close_error_on_close(self): 3305 # Issue #21677 3306 buffer = self.BytesIO(self.testdata) 3307 def bad_flush(): 3308 raise non_existing_flush 3309 def bad_close(): 3310 raise non_existing_close 3311 buffer.close = bad_close 3312 txt = self.TextIOWrapper(buffer, encoding="ascii") 3313 txt.flush = bad_flush 3314 with self.assertRaises(NameError) as err: # exception not swallowed 3315 txt.close() 3316 self.assertIn('non_existing_close', str(err.exception)) 3317 self.assertIsInstance(err.exception.__context__, NameError) 3318 self.assertIn('non_existing_flush', str(err.exception.__context__)) 3319 self.assertFalse(txt.closed) 3320 3321 # Silence destructor error 3322 buffer.close = lambda: None 3323 txt.flush = lambda: None 3324 3325 def test_multi_close(self): 3326 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3327 txt.close() 3328 txt.close() 3329 txt.close() 3330 self.assertRaises(ValueError, txt.flush) 3331 3332 def test_unseekable(self): 3333 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata)) 3334 self.assertRaises(self.UnsupportedOperation, txt.tell) 3335 self.assertRaises(self.UnsupportedOperation, txt.seek, 0) 3336 3337 def test_readonly_attributes(self): 3338 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3339 buf = self.BytesIO(self.testdata) 3340 with self.assertRaises(AttributeError): 3341 txt.buffer = buf 3342 3343 def test_rawio(self): 3344 # Issue #12591: TextIOWrapper must work with raw I/O objects, so 3345 # that subprocess.Popen() can have the required unbuffered 3346 # semantics with universal_newlines=True. 3347 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) 3348 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3349 # Reads 3350 self.assertEqual(txt.read(4), 'abcd') 3351 self.assertEqual(txt.readline(), 'efghi\n') 3352 self.assertEqual(list(txt), ['jkl\n', 'opq\n']) 3353 3354 def test_rawio_write_through(self): 3355 # Issue #12591: with write_through=True, writes don't need a flush 3356 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) 3357 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n', 3358 write_through=True) 3359 txt.write('1') 3360 txt.write('23\n4') 3361 txt.write('5') 3362 self.assertEqual(b''.join(raw._write_stack), b'123\n45') 3363 3364 def test_bufio_write_through(self): 3365 # Issue #21396: write_through=True doesn't force a flush() 3366 # on the underlying binary buffered object. 3367 flush_called, write_called = [], [] 3368 class BufferedWriter(self.BufferedWriter): 3369 def flush(self, *args, **kwargs): 3370 flush_called.append(True) 3371 return super().flush(*args, **kwargs) 3372 def write(self, *args, **kwargs): 3373 write_called.append(True) 3374 return super().write(*args, **kwargs) 3375 3376 rawio = self.BytesIO() 3377 data = b"a" 3378 bufio = BufferedWriter(rawio, len(data)*2) 3379 textio = self.TextIOWrapper(bufio, encoding='ascii', 3380 write_through=True) 3381 # write to the buffered io but don't overflow the buffer 3382 text = data.decode('ascii') 3383 textio.write(text) 3384 3385 # buffer.flush is not called with write_through=True 3386 self.assertFalse(flush_called) 3387 # buffer.write *is* called with write_through=True 3388 self.assertTrue(write_called) 3389 self.assertEqual(rawio.getvalue(), b"") # no flush 3390 3391 write_called = [] # reset 3392 textio.write(text * 10) # total content is larger than bufio buffer 3393 self.assertTrue(write_called) 3394 self.assertEqual(rawio.getvalue(), data * 11) # all flushed 3395 3396 def test_reconfigure_write_through(self): 3397 raw = self.MockRawIO([]) 3398 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3399 t.write('1') 3400 t.reconfigure(write_through=True) # implied flush 3401 self.assertEqual(t.write_through, True) 3402 self.assertEqual(b''.join(raw._write_stack), b'1') 3403 t.write('23') 3404 self.assertEqual(b''.join(raw._write_stack), b'123') 3405 t.reconfigure(write_through=False) 3406 self.assertEqual(t.write_through, False) 3407 t.write('45') 3408 t.flush() 3409 self.assertEqual(b''.join(raw._write_stack), b'12345') 3410 # Keeping default value 3411 t.reconfigure() 3412 t.reconfigure(write_through=None) 3413 self.assertEqual(t.write_through, False) 3414 t.reconfigure(write_through=True) 3415 t.reconfigure() 3416 t.reconfigure(write_through=None) 3417 self.assertEqual(t.write_through, True) 3418 3419 def test_read_nonbytes(self): 3420 # Issue #17106 3421 # Crash when underlying read() returns non-bytes 3422 t = self.TextIOWrapper(self.StringIO('a')) 3423 self.assertRaises(TypeError, t.read, 1) 3424 t = self.TextIOWrapper(self.StringIO('a')) 3425 self.assertRaises(TypeError, t.readline) 3426 t = self.TextIOWrapper(self.StringIO('a')) 3427 self.assertRaises(TypeError, t.read) 3428 3429 def test_illegal_encoder(self): 3430 # Issue 31271: Calling write() while the return value of encoder's 3431 # encode() is invalid shouldn't cause an assertion failure. 3432 rot13 = codecs.lookup("rot13") 3433 with support.swap_attr(rot13, '_is_text_encoding', True): 3434 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13") 3435 self.assertRaises(TypeError, t.write, 'bar') 3436 3437 def test_illegal_decoder(self): 3438 # Issue #17106 3439 # Bypass the early encoding check added in issue 20404 3440 def _make_illegal_wrapper(): 3441 quopri = codecs.lookup("quopri") 3442 quopri._is_text_encoding = True 3443 try: 3444 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), 3445 newline='\n', encoding="quopri") 3446 finally: 3447 quopri._is_text_encoding = False 3448 return t 3449 # Crash when decoder returns non-string 3450 t = _make_illegal_wrapper() 3451 self.assertRaises(TypeError, t.read, 1) 3452 t = _make_illegal_wrapper() 3453 self.assertRaises(TypeError, t.readline) 3454 t = _make_illegal_wrapper() 3455 self.assertRaises(TypeError, t.read) 3456 3457 # Issue 31243: calling read() while the return value of decoder's 3458 # getstate() is invalid should neither crash the interpreter nor 3459 # raise a SystemError. 3460 def _make_very_illegal_wrapper(getstate_ret_val): 3461 class BadDecoder: 3462 def getstate(self): 3463 return getstate_ret_val 3464 def _get_bad_decoder(dummy): 3465 return BadDecoder() 3466 quopri = codecs.lookup("quopri") 3467 with support.swap_attr(quopri, 'incrementaldecoder', 3468 _get_bad_decoder): 3469 return _make_illegal_wrapper() 3470 t = _make_very_illegal_wrapper(42) 3471 self.assertRaises(TypeError, t.read, 42) 3472 t = _make_very_illegal_wrapper(()) 3473 self.assertRaises(TypeError, t.read, 42) 3474 t = _make_very_illegal_wrapper((1, 2)) 3475 self.assertRaises(TypeError, t.read, 42) 3476 3477 def _check_create_at_shutdown(self, **kwargs): 3478 # Issue #20037: creating a TextIOWrapper at shutdown 3479 # shouldn't crash the interpreter. 3480 iomod = self.io.__name__ 3481 code = """if 1: 3482 import codecs 3483 import {iomod} as io 3484 3485 # Avoid looking up codecs at shutdown 3486 codecs.lookup('utf-8') 3487 3488 class C: 3489 def __init__(self): 3490 self.buf = io.BytesIO() 3491 def __del__(self): 3492 io.TextIOWrapper(self.buf, **{kwargs}) 3493 print("ok") 3494 c = C() 3495 """.format(iomod=iomod, kwargs=kwargs) 3496 return assert_python_ok("-c", code) 3497 3498 @support.requires_type_collecting 3499 def test_create_at_shutdown_without_encoding(self): 3500 rc, out, err = self._check_create_at_shutdown() 3501 if err: 3502 # Can error out with a RuntimeError if the module state 3503 # isn't found. 3504 self.assertIn(self.shutdown_error, err.decode()) 3505 else: 3506 self.assertEqual("ok", out.decode().strip()) 3507 3508 @support.requires_type_collecting 3509 def test_create_at_shutdown_with_encoding(self): 3510 rc, out, err = self._check_create_at_shutdown(encoding='utf-8', 3511 errors='strict') 3512 self.assertFalse(err) 3513 self.assertEqual("ok", out.decode().strip()) 3514 3515 def test_read_byteslike(self): 3516 r = MemviewBytesIO(b'Just some random string\n') 3517 t = self.TextIOWrapper(r, 'utf-8') 3518 3519 # TextIOwrapper will not read the full string, because 3520 # we truncate it to a multiple of the native int size 3521 # so that we can construct a more complex memoryview. 3522 bytes_val = _to_memoryview(r.getvalue()).tobytes() 3523 3524 self.assertEqual(t.read(200), bytes_val.decode('utf-8')) 3525 3526 def test_issue22849(self): 3527 class F(object): 3528 def readable(self): return True 3529 def writable(self): return True 3530 def seekable(self): return True 3531 3532 for i in range(10): 3533 try: 3534 self.TextIOWrapper(F(), encoding='utf-8') 3535 except Exception: 3536 pass 3537 3538 F.tell = lambda x: 0 3539 t = self.TextIOWrapper(F(), encoding='utf-8') 3540 3541 def test_reconfigure_encoding_read(self): 3542 # latin1 -> utf8 3543 # (latin1 can decode utf-8 encoded string) 3544 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8') 3545 raw = self.BytesIO(data) 3546 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') 3547 self.assertEqual(txt.readline(), 'abc\xe9\n') 3548 with self.assertRaises(self.UnsupportedOperation): 3549 txt.reconfigure(encoding='utf-8') 3550 with self.assertRaises(self.UnsupportedOperation): 3551 txt.reconfigure(newline=None) 3552 3553 def test_reconfigure_write_fromascii(self): 3554 # ascii has a specific encodefunc in the C implementation, 3555 # but utf-8-sig has not. Make sure that we get rid of the 3556 # cached encodefunc when we switch encoders. 3557 raw = self.BytesIO() 3558 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3559 txt.write('foo\n') 3560 txt.reconfigure(encoding='utf-8-sig') 3561 txt.write('\xe9\n') 3562 txt.flush() 3563 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n') 3564 3565 def test_reconfigure_write(self): 3566 # latin -> utf8 3567 raw = self.BytesIO() 3568 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') 3569 txt.write('abc\xe9\n') 3570 txt.reconfigure(encoding='utf-8') 3571 self.assertEqual(raw.getvalue(), b'abc\xe9\n') 3572 txt.write('d\xe9f\n') 3573 txt.flush() 3574 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n') 3575 3576 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of 3577 # the file 3578 raw = self.BytesIO() 3579 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3580 txt.write('abc\n') 3581 txt.reconfigure(encoding='utf-8-sig') 3582 txt.write('d\xe9f\n') 3583 txt.flush() 3584 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n') 3585 3586 def test_reconfigure_write_non_seekable(self): 3587 raw = self.BytesIO() 3588 raw.seekable = lambda: False 3589 raw.seek = None 3590 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3591 txt.write('abc\n') 3592 txt.reconfigure(encoding='utf-8-sig') 3593 txt.write('d\xe9f\n') 3594 txt.flush() 3595 3596 # If the raw stream is not seekable, there'll be a BOM 3597 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n') 3598 3599 def test_reconfigure_defaults(self): 3600 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n') 3601 txt.reconfigure(encoding=None) 3602 self.assertEqual(txt.encoding, 'ascii') 3603 self.assertEqual(txt.errors, 'replace') 3604 txt.write('LF\n') 3605 3606 txt.reconfigure(newline='\r\n') 3607 self.assertEqual(txt.encoding, 'ascii') 3608 self.assertEqual(txt.errors, 'replace') 3609 3610 txt.reconfigure(errors='ignore') 3611 self.assertEqual(txt.encoding, 'ascii') 3612 self.assertEqual(txt.errors, 'ignore') 3613 txt.write('CRLF\n') 3614 3615 txt.reconfigure(encoding='utf-8', newline=None) 3616 self.assertEqual(txt.errors, 'strict') 3617 txt.seek(0) 3618 self.assertEqual(txt.read(), 'LF\nCRLF\n') 3619 3620 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n') 3621 3622 def test_reconfigure_newline(self): 3623 raw = self.BytesIO(b'CR\rEOF') 3624 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3625 txt.reconfigure(newline=None) 3626 self.assertEqual(txt.readline(), 'CR\n') 3627 raw = self.BytesIO(b'CR\rEOF') 3628 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3629 txt.reconfigure(newline='') 3630 self.assertEqual(txt.readline(), 'CR\r') 3631 raw = self.BytesIO(b'CR\rLF\nEOF') 3632 txt = self.TextIOWrapper(raw, 'ascii', newline='\r') 3633 txt.reconfigure(newline='\n') 3634 self.assertEqual(txt.readline(), 'CR\rLF\n') 3635 raw = self.BytesIO(b'LF\nCR\rEOF') 3636 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3637 txt.reconfigure(newline='\r') 3638 self.assertEqual(txt.readline(), 'LF\nCR\r') 3639 raw = self.BytesIO(b'CR\rCRLF\r\nEOF') 3640 txt = self.TextIOWrapper(raw, 'ascii', newline='\r') 3641 txt.reconfigure(newline='\r\n') 3642 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n') 3643 3644 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r') 3645 txt.reconfigure(newline=None) 3646 txt.write('linesep\n') 3647 txt.reconfigure(newline='') 3648 txt.write('LF\n') 3649 txt.reconfigure(newline='\n') 3650 txt.write('LF\n') 3651 txt.reconfigure(newline='\r') 3652 txt.write('CR\n') 3653 txt.reconfigure(newline='\r\n') 3654 txt.write('CRLF\n') 3655 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n' 3656 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected) 3657 3658 def test_issue25862(self): 3659 # Assertion failures occurred in tell() after read() and write(). 3660 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') 3661 t.read(1) 3662 t.read() 3663 t.tell() 3664 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') 3665 t.read(1) 3666 t.write('x') 3667 t.tell() 3668 3669 3670class MemviewBytesIO(io.BytesIO): 3671 '''A BytesIO object whose read method returns memoryviews 3672 rather than bytes''' 3673 3674 def read1(self, len_): 3675 return _to_memoryview(super().read1(len_)) 3676 3677 def read(self, len_): 3678 return _to_memoryview(super().read(len_)) 3679 3680def _to_memoryview(buf): 3681 '''Convert bytes-object *buf* to a non-trivial memoryview''' 3682 3683 arr = array.array('i') 3684 idx = len(buf) - len(buf) % arr.itemsize 3685 arr.frombytes(buf[:idx]) 3686 return memoryview(arr) 3687 3688 3689class CTextIOWrapperTest(TextIOWrapperTest): 3690 io = io 3691 shutdown_error = "RuntimeError: could not find io module state" 3692 3693 def test_initialization(self): 3694 r = self.BytesIO(b"\xc3\xa9\n\n") 3695 b = self.BufferedReader(r, 1000) 3696 t = self.TextIOWrapper(b) 3697 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 3698 self.assertRaises(ValueError, t.read) 3699 3700 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 3701 self.assertRaises(Exception, repr, t) 3702 3703 def test_garbage_collection(self): 3704 # C TextIOWrapper objects are collected, and collecting them flushes 3705 # all data to disk. 3706 # The Python version has __del__, so it ends in gc.garbage instead. 3707 with support.check_warnings(('', ResourceWarning)): 3708 rawio = io.FileIO(support.TESTFN, "wb") 3709 b = self.BufferedWriter(rawio) 3710 t = self.TextIOWrapper(b, encoding="ascii") 3711 t.write("456def") 3712 t.x = t 3713 wr = weakref.ref(t) 3714 del t 3715 support.gc_collect() 3716 self.assertIsNone(wr(), wr) 3717 with self.open(support.TESTFN, "rb") as f: 3718 self.assertEqual(f.read(), b"456def") 3719 3720 def test_rwpair_cleared_before_textio(self): 3721 # Issue 13070: TextIOWrapper's finalization would crash when called 3722 # after the reference to the underlying BufferedRWPair's writer got 3723 # cleared by the GC. 3724 for i in range(1000): 3725 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 3726 t1 = self.TextIOWrapper(b1, encoding="ascii") 3727 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 3728 t2 = self.TextIOWrapper(b2, encoding="ascii") 3729 # circular references 3730 t1.buddy = t2 3731 t2.buddy = t1 3732 support.gc_collect() 3733 3734 def test_del__CHUNK_SIZE_SystemError(self): 3735 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii') 3736 with self.assertRaises(AttributeError): 3737 del t._CHUNK_SIZE 3738 3739 3740class PyTextIOWrapperTest(TextIOWrapperTest): 3741 io = pyio 3742 shutdown_error = "LookupError: unknown encoding: ascii" 3743 3744 3745class IncrementalNewlineDecoderTest(unittest.TestCase): 3746 3747 def check_newline_decoding_utf8(self, decoder): 3748 # UTF-8 specific tests for a newline decoder 3749 def _check_decode(b, s, **kwargs): 3750 # We exercise getstate() / setstate() as well as decode() 3751 state = decoder.getstate() 3752 self.assertEqual(decoder.decode(b, **kwargs), s) 3753 decoder.setstate(state) 3754 self.assertEqual(decoder.decode(b, **kwargs), s) 3755 3756 _check_decode(b'\xe8\xa2\x88', "\u8888") 3757 3758 _check_decode(b'\xe8', "") 3759 _check_decode(b'\xa2', "") 3760 _check_decode(b'\x88', "\u8888") 3761 3762 _check_decode(b'\xe8', "") 3763 _check_decode(b'\xa2', "") 3764 _check_decode(b'\x88', "\u8888") 3765 3766 _check_decode(b'\xe8', "") 3767 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True) 3768 3769 decoder.reset() 3770 _check_decode(b'\n', "\n") 3771 _check_decode(b'\r', "") 3772 _check_decode(b'', "\n", final=True) 3773 _check_decode(b'\r', "\n", final=True) 3774 3775 _check_decode(b'\r', "") 3776 _check_decode(b'a', "\na") 3777 3778 _check_decode(b'\r\r\n', "\n\n") 3779 _check_decode(b'\r', "") 3780 _check_decode(b'\r', "\n") 3781 _check_decode(b'\na', "\na") 3782 3783 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n") 3784 _check_decode(b'\xe8\xa2\x88', "\u8888") 3785 _check_decode(b'\n', "\n") 3786 _check_decode(b'\xe8\xa2\x88\r', "\u8888") 3787 _check_decode(b'\n', "\n") 3788 3789 def check_newline_decoding(self, decoder, encoding): 3790 result = [] 3791 if encoding is not None: 3792 encoder = codecs.getincrementalencoder(encoding)() 3793 def _decode_bytewise(s): 3794 # Decode one byte at a time 3795 for b in encoder.encode(s): 3796 result.append(decoder.decode(bytes([b]))) 3797 else: 3798 encoder = None 3799 def _decode_bytewise(s): 3800 # Decode one char at a time 3801 for c in s: 3802 result.append(decoder.decode(c)) 3803 self.assertEqual(decoder.newlines, None) 3804 _decode_bytewise("abc\n\r") 3805 self.assertEqual(decoder.newlines, '\n') 3806 _decode_bytewise("\nabc") 3807 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 3808 _decode_bytewise("abc\r") 3809 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 3810 _decode_bytewise("abc") 3811 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) 3812 _decode_bytewise("abc\r") 3813 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") 3814 decoder.reset() 3815 input = "abc" 3816 if encoder is not None: 3817 encoder.reset() 3818 input = encoder.encode(input) 3819 self.assertEqual(decoder.decode(input), "abc") 3820 self.assertEqual(decoder.newlines, None) 3821 3822 def test_newline_decoder(self): 3823 encodings = ( 3824 # None meaning the IncrementalNewlineDecoder takes unicode input 3825 # rather than bytes input 3826 None, 'utf-8', 'latin-1', 3827 'utf-16', 'utf-16-le', 'utf-16-be', 3828 'utf-32', 'utf-32-le', 'utf-32-be', 3829 ) 3830 for enc in encodings: 3831 decoder = enc and codecs.getincrementaldecoder(enc)() 3832 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 3833 self.check_newline_decoding(decoder, enc) 3834 decoder = codecs.getincrementaldecoder("utf-8")() 3835 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 3836 self.check_newline_decoding_utf8(decoder) 3837 self.assertRaises(TypeError, decoder.setstate, 42) 3838 3839 def test_newline_bytes(self): 3840 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder 3841 def _check(dec): 3842 self.assertEqual(dec.newlines, None) 3843 self.assertEqual(dec.decode("\u0D00"), "\u0D00") 3844 self.assertEqual(dec.newlines, None) 3845 self.assertEqual(dec.decode("\u0A00"), "\u0A00") 3846 self.assertEqual(dec.newlines, None) 3847 dec = self.IncrementalNewlineDecoder(None, translate=False) 3848 _check(dec) 3849 dec = self.IncrementalNewlineDecoder(None, translate=True) 3850 _check(dec) 3851 3852 def test_translate(self): 3853 # issue 35062 3854 for translate in (-2, -1, 1, 2): 3855 decoder = codecs.getincrementaldecoder("utf-8")() 3856 decoder = self.IncrementalNewlineDecoder(decoder, translate) 3857 self.check_newline_decoding_utf8(decoder) 3858 decoder = codecs.getincrementaldecoder("utf-8")() 3859 decoder = self.IncrementalNewlineDecoder(decoder, translate=0) 3860 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n") 3861 3862class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 3863 pass 3864 3865class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 3866 pass 3867 3868 3869# XXX Tests for open() 3870 3871class MiscIOTest(unittest.TestCase): 3872 3873 def tearDown(self): 3874 support.unlink(support.TESTFN) 3875 3876 def test___all__(self): 3877 for name in self.io.__all__: 3878 obj = getattr(self.io, name, None) 3879 self.assertIsNotNone(obj, name) 3880 if name in ("open", "open_code"): 3881 continue 3882 elif "error" in name.lower() or name == "UnsupportedOperation": 3883 self.assertTrue(issubclass(obj, Exception), name) 3884 elif not name.startswith("SEEK_"): 3885 self.assertTrue(issubclass(obj, self.IOBase)) 3886 3887 def test_attributes(self): 3888 f = self.open(support.TESTFN, "wb", buffering=0) 3889 self.assertEqual(f.mode, "wb") 3890 f.close() 3891 3892 with support.check_warnings(('', DeprecationWarning)): 3893 f = self.open(support.TESTFN, "U") 3894 self.assertEqual(f.name, support.TESTFN) 3895 self.assertEqual(f.buffer.name, support.TESTFN) 3896 self.assertEqual(f.buffer.raw.name, support.TESTFN) 3897 self.assertEqual(f.mode, "U") 3898 self.assertEqual(f.buffer.mode, "rb") 3899 self.assertEqual(f.buffer.raw.mode, "rb") 3900 f.close() 3901 3902 f = self.open(support.TESTFN, "w+") 3903 self.assertEqual(f.mode, "w+") 3904 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter? 3905 self.assertEqual(f.buffer.raw.mode, "rb+") 3906 3907 g = self.open(f.fileno(), "wb", closefd=False) 3908 self.assertEqual(g.mode, "wb") 3909 self.assertEqual(g.raw.mode, "wb") 3910 self.assertEqual(g.name, f.fileno()) 3911 self.assertEqual(g.raw.name, f.fileno()) 3912 f.close() 3913 g.close() 3914 3915 def test_open_pipe_with_append(self): 3916 # bpo-27805: Ignore ESPIPE from lseek() in open(). 3917 r, w = os.pipe() 3918 self.addCleanup(os.close, r) 3919 f = self.open(w, 'a') 3920 self.addCleanup(f.close) 3921 # Check that the file is marked non-seekable. On Windows, however, lseek 3922 # somehow succeeds on pipes. 3923 if sys.platform != 'win32': 3924 self.assertFalse(f.seekable()) 3925 3926 def test_io_after_close(self): 3927 for kwargs in [ 3928 {"mode": "w"}, 3929 {"mode": "wb"}, 3930 {"mode": "w", "buffering": 1}, 3931 {"mode": "w", "buffering": 2}, 3932 {"mode": "wb", "buffering": 0}, 3933 {"mode": "r"}, 3934 {"mode": "rb"}, 3935 {"mode": "r", "buffering": 1}, 3936 {"mode": "r", "buffering": 2}, 3937 {"mode": "rb", "buffering": 0}, 3938 {"mode": "w+"}, 3939 {"mode": "w+b"}, 3940 {"mode": "w+", "buffering": 1}, 3941 {"mode": "w+", "buffering": 2}, 3942 {"mode": "w+b", "buffering": 0}, 3943 ]: 3944 f = self.open(support.TESTFN, **kwargs) 3945 f.close() 3946 self.assertRaises(ValueError, f.flush) 3947 self.assertRaises(ValueError, f.fileno) 3948 self.assertRaises(ValueError, f.isatty) 3949 self.assertRaises(ValueError, f.__iter__) 3950 if hasattr(f, "peek"): 3951 self.assertRaises(ValueError, f.peek, 1) 3952 self.assertRaises(ValueError, f.read) 3953 if hasattr(f, "read1"): 3954 self.assertRaises(ValueError, f.read1, 1024) 3955 self.assertRaises(ValueError, f.read1) 3956 if hasattr(f, "readall"): 3957 self.assertRaises(ValueError, f.readall) 3958 if hasattr(f, "readinto"): 3959 self.assertRaises(ValueError, f.readinto, bytearray(1024)) 3960 if hasattr(f, "readinto1"): 3961 self.assertRaises(ValueError, f.readinto1, bytearray(1024)) 3962 self.assertRaises(ValueError, f.readline) 3963 self.assertRaises(ValueError, f.readlines) 3964 self.assertRaises(ValueError, f.readlines, 1) 3965 self.assertRaises(ValueError, f.seek, 0) 3966 self.assertRaises(ValueError, f.tell) 3967 self.assertRaises(ValueError, f.truncate) 3968 self.assertRaises(ValueError, f.write, 3969 b"" if "b" in kwargs['mode'] else "") 3970 self.assertRaises(ValueError, f.writelines, []) 3971 self.assertRaises(ValueError, next, f) 3972 3973 def test_blockingioerror(self): 3974 # Various BlockingIOError issues 3975 class C(str): 3976 pass 3977 c = C("") 3978 b = self.BlockingIOError(1, c) 3979 c.b = b 3980 b.c = c 3981 wr = weakref.ref(c) 3982 del c, b 3983 support.gc_collect() 3984 self.assertIsNone(wr(), wr) 3985 3986 def test_abcs(self): 3987 # Test the visible base classes are ABCs. 3988 self.assertIsInstance(self.IOBase, abc.ABCMeta) 3989 self.assertIsInstance(self.RawIOBase, abc.ABCMeta) 3990 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta) 3991 self.assertIsInstance(self.TextIOBase, abc.ABCMeta) 3992 3993 def _check_abc_inheritance(self, abcmodule): 3994 with self.open(support.TESTFN, "wb", buffering=0) as f: 3995 self.assertIsInstance(f, abcmodule.IOBase) 3996 self.assertIsInstance(f, abcmodule.RawIOBase) 3997 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 3998 self.assertNotIsInstance(f, abcmodule.TextIOBase) 3999 with self.open(support.TESTFN, "wb") as f: 4000 self.assertIsInstance(f, abcmodule.IOBase) 4001 self.assertNotIsInstance(f, abcmodule.RawIOBase) 4002 self.assertIsInstance(f, abcmodule.BufferedIOBase) 4003 self.assertNotIsInstance(f, abcmodule.TextIOBase) 4004 with self.open(support.TESTFN, "w") as f: 4005 self.assertIsInstance(f, abcmodule.IOBase) 4006 self.assertNotIsInstance(f, abcmodule.RawIOBase) 4007 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 4008 self.assertIsInstance(f, abcmodule.TextIOBase) 4009 4010 def test_abc_inheritance(self): 4011 # Test implementations inherit from their respective ABCs 4012 self._check_abc_inheritance(self) 4013 4014 def test_abc_inheritance_official(self): 4015 # Test implementations inherit from the official ABCs of the 4016 # baseline "io" module. 4017 self._check_abc_inheritance(io) 4018 4019 def _check_warn_on_dealloc(self, *args, **kwargs): 4020 f = open(*args, **kwargs) 4021 r = repr(f) 4022 with self.assertWarns(ResourceWarning) as cm: 4023 f = None 4024 support.gc_collect() 4025 self.assertIn(r, str(cm.warning.args[0])) 4026 4027 def test_warn_on_dealloc(self): 4028 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0) 4029 self._check_warn_on_dealloc(support.TESTFN, "wb") 4030 self._check_warn_on_dealloc(support.TESTFN, "w") 4031 4032 def _check_warn_on_dealloc_fd(self, *args, **kwargs): 4033 fds = [] 4034 def cleanup_fds(): 4035 for fd in fds: 4036 try: 4037 os.close(fd) 4038 except OSError as e: 4039 if e.errno != errno.EBADF: 4040 raise 4041 self.addCleanup(cleanup_fds) 4042 r, w = os.pipe() 4043 fds += r, w 4044 self._check_warn_on_dealloc(r, *args, **kwargs) 4045 # When using closefd=False, there's no warning 4046 r, w = os.pipe() 4047 fds += r, w 4048 with support.check_no_resource_warning(self): 4049 open(r, *args, closefd=False, **kwargs) 4050 4051 def test_warn_on_dealloc_fd(self): 4052 self._check_warn_on_dealloc_fd("rb", buffering=0) 4053 self._check_warn_on_dealloc_fd("rb") 4054 self._check_warn_on_dealloc_fd("r") 4055 4056 4057 def test_pickling(self): 4058 # Pickling file objects is forbidden 4059 for kwargs in [ 4060 {"mode": "w"}, 4061 {"mode": "wb"}, 4062 {"mode": "wb", "buffering": 0}, 4063 {"mode": "r"}, 4064 {"mode": "rb"}, 4065 {"mode": "rb", "buffering": 0}, 4066 {"mode": "w+"}, 4067 {"mode": "w+b"}, 4068 {"mode": "w+b", "buffering": 0}, 4069 ]: 4070 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 4071 with self.open(support.TESTFN, **kwargs) as f: 4072 self.assertRaises(TypeError, pickle.dumps, f, protocol) 4073 4074 def test_nonblock_pipe_write_bigbuf(self): 4075 self._test_nonblock_pipe_write(16*1024) 4076 4077 def test_nonblock_pipe_write_smallbuf(self): 4078 self._test_nonblock_pipe_write(1024) 4079 4080 @unittest.skipUnless(hasattr(os, 'set_blocking'), 4081 'os.set_blocking() required for this test') 4082 def _test_nonblock_pipe_write(self, bufsize): 4083 sent = [] 4084 received = [] 4085 r, w = os.pipe() 4086 os.set_blocking(r, False) 4087 os.set_blocking(w, False) 4088 4089 # To exercise all code paths in the C implementation we need 4090 # to play with buffer sizes. For instance, if we choose a 4091 # buffer size less than or equal to _PIPE_BUF (4096 on Linux) 4092 # then we will never get a partial write of the buffer. 4093 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize) 4094 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize) 4095 4096 with rf, wf: 4097 for N in 9999, 73, 7574: 4098 try: 4099 i = 0 4100 while True: 4101 msg = bytes([i % 26 + 97]) * N 4102 sent.append(msg) 4103 wf.write(msg) 4104 i += 1 4105 4106 except self.BlockingIOError as e: 4107 self.assertEqual(e.args[0], errno.EAGAIN) 4108 self.assertEqual(e.args[2], e.characters_written) 4109 sent[-1] = sent[-1][:e.characters_written] 4110 received.append(rf.read()) 4111 msg = b'BLOCKED' 4112 wf.write(msg) 4113 sent.append(msg) 4114 4115 while True: 4116 try: 4117 wf.flush() 4118 break 4119 except self.BlockingIOError as e: 4120 self.assertEqual(e.args[0], errno.EAGAIN) 4121 self.assertEqual(e.args[2], e.characters_written) 4122 self.assertEqual(e.characters_written, 0) 4123 received.append(rf.read()) 4124 4125 received += iter(rf.read, None) 4126 4127 sent, received = b''.join(sent), b''.join(received) 4128 self.assertEqual(sent, received) 4129 self.assertTrue(wf.closed) 4130 self.assertTrue(rf.closed) 4131 4132 def test_create_fail(self): 4133 # 'x' mode fails if file is existing 4134 with self.open(support.TESTFN, 'w'): 4135 pass 4136 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x') 4137 4138 def test_create_writes(self): 4139 # 'x' mode opens for writing 4140 with self.open(support.TESTFN, 'xb') as f: 4141 f.write(b"spam") 4142 with self.open(support.TESTFN, 'rb') as f: 4143 self.assertEqual(b"spam", f.read()) 4144 4145 def test_open_allargs(self): 4146 # there used to be a buffer overflow in the parser for rawmode 4147 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+') 4148 4149 4150class CMiscIOTest(MiscIOTest): 4151 io = io 4152 4153 def test_readinto_buffer_overflow(self): 4154 # Issue #18025 4155 class BadReader(self.io.BufferedIOBase): 4156 def read(self, n=-1): 4157 return b'x' * 10**6 4158 bufio = BadReader() 4159 b = bytearray(2) 4160 self.assertRaises(ValueError, bufio.readinto, b) 4161 4162 def check_daemon_threads_shutdown_deadlock(self, stream_name): 4163 # Issue #23309: deadlocks at shutdown should be avoided when a 4164 # daemon thread and the main thread both write to a file. 4165 code = """if 1: 4166 import sys 4167 import time 4168 import threading 4169 from test.support import SuppressCrashReport 4170 4171 file = sys.{stream_name} 4172 4173 def run(): 4174 while True: 4175 file.write('.') 4176 file.flush() 4177 4178 crash = SuppressCrashReport() 4179 crash.__enter__() 4180 # don't call __exit__(): the crash occurs at Python shutdown 4181 4182 thread = threading.Thread(target=run) 4183 thread.daemon = True 4184 thread.start() 4185 4186 time.sleep(0.5) 4187 file.write('!') 4188 file.flush() 4189 """.format_map(locals()) 4190 res, _ = run_python_until_end("-c", code) 4191 err = res.err.decode() 4192 if res.rc != 0: 4193 # Failure: should be a fatal error 4194 pattern = (r"Fatal Python error: could not acquire lock " 4195 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> " 4196 r"at interpreter shutdown, possibly due to " 4197 r"daemon threads".format_map(locals())) 4198 self.assertRegex(err, pattern) 4199 else: 4200 self.assertFalse(err.strip('.!')) 4201 4202 def test_daemon_threads_shutdown_stdout_deadlock(self): 4203 self.check_daemon_threads_shutdown_deadlock('stdout') 4204 4205 def test_daemon_threads_shutdown_stderr_deadlock(self): 4206 self.check_daemon_threads_shutdown_deadlock('stderr') 4207 4208 4209class PyMiscIOTest(MiscIOTest): 4210 io = pyio 4211 4212 4213@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.') 4214class SignalsTest(unittest.TestCase): 4215 4216 def setUp(self): 4217 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 4218 4219 def tearDown(self): 4220 signal.signal(signal.SIGALRM, self.oldalrm) 4221 4222 def alarm_interrupt(self, sig, frame): 4223 1/0 4224 4225 def check_interrupted_write(self, item, bytes, **fdopen_kwargs): 4226 """Check that a partial write, when it gets interrupted, properly 4227 invokes the signal handler, and bubbles up the exception raised 4228 in the latter.""" 4229 read_results = [] 4230 def _read(): 4231 s = os.read(r, 1) 4232 read_results.append(s) 4233 4234 t = threading.Thread(target=_read) 4235 t.daemon = True 4236 r, w = os.pipe() 4237 fdopen_kwargs["closefd"] = False 4238 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1) 4239 try: 4240 wio = self.io.open(w, **fdopen_kwargs) 4241 if hasattr(signal, 'pthread_sigmask'): 4242 # create the thread with SIGALRM signal blocked 4243 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM]) 4244 t.start() 4245 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM]) 4246 else: 4247 t.start() 4248 4249 # Fill the pipe enough that the write will be blocking. 4250 # It will be interrupted by the timer armed above. Since the 4251 # other thread has read one byte, the low-level write will 4252 # return with a successful (partial) result rather than an EINTR. 4253 # The buffered IO layer must check for pending signal 4254 # handlers, which in this case will invoke alarm_interrupt(). 4255 signal.alarm(1) 4256 try: 4257 self.assertRaises(ZeroDivisionError, wio.write, large_data) 4258 finally: 4259 signal.alarm(0) 4260 t.join() 4261 # We got one byte, get another one and check that it isn't a 4262 # repeat of the first one. 4263 read_results.append(os.read(r, 1)) 4264 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]]) 4265 finally: 4266 os.close(w) 4267 os.close(r) 4268 # This is deliberate. If we didn't close the file descriptor 4269 # before closing wio, wio would try to flush its internal 4270 # buffer, and block again. 4271 try: 4272 wio.close() 4273 except OSError as e: 4274 if e.errno != errno.EBADF: 4275 raise 4276 4277 def test_interrupted_write_unbuffered(self): 4278 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0) 4279 4280 def test_interrupted_write_buffered(self): 4281 self.check_interrupted_write(b"xy", b"xy", mode="wb") 4282 4283 def test_interrupted_write_text(self): 4284 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") 4285 4286 @support.no_tracing 4287 def check_reentrant_write(self, data, **fdopen_kwargs): 4288 def on_alarm(*args): 4289 # Will be called reentrantly from the same thread 4290 wio.write(data) 4291 1/0 4292 signal.signal(signal.SIGALRM, on_alarm) 4293 r, w = os.pipe() 4294 wio = self.io.open(w, **fdopen_kwargs) 4295 try: 4296 signal.alarm(1) 4297 # Either the reentrant call to wio.write() fails with RuntimeError, 4298 # or the signal handler raises ZeroDivisionError. 4299 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm: 4300 while 1: 4301 for i in range(100): 4302 wio.write(data) 4303 wio.flush() 4304 # Make sure the buffer doesn't fill up and block further writes 4305 os.read(r, len(data) * 100) 4306 exc = cm.exception 4307 if isinstance(exc, RuntimeError): 4308 self.assertTrue(str(exc).startswith("reentrant call"), str(exc)) 4309 finally: 4310 signal.alarm(0) 4311 wio.close() 4312 os.close(r) 4313 4314 def test_reentrant_write_buffered(self): 4315 self.check_reentrant_write(b"xy", mode="wb") 4316 4317 def test_reentrant_write_text(self): 4318 self.check_reentrant_write("xy", mode="w", encoding="ascii") 4319 4320 def check_interrupted_read_retry(self, decode, **fdopen_kwargs): 4321 """Check that a buffered read, when it gets interrupted (either 4322 returning a partial result or EINTR), properly invokes the signal 4323 handler and retries if the latter returned successfully.""" 4324 r, w = os.pipe() 4325 fdopen_kwargs["closefd"] = False 4326 def alarm_handler(sig, frame): 4327 os.write(w, b"bar") 4328 signal.signal(signal.SIGALRM, alarm_handler) 4329 try: 4330 rio = self.io.open(r, **fdopen_kwargs) 4331 os.write(w, b"foo") 4332 signal.alarm(1) 4333 # Expected behaviour: 4334 # - first raw read() returns partial b"foo" 4335 # - second raw read() returns EINTR 4336 # - third raw read() returns b"bar" 4337 self.assertEqual(decode(rio.read(6)), "foobar") 4338 finally: 4339 signal.alarm(0) 4340 rio.close() 4341 os.close(w) 4342 os.close(r) 4343 4344 def test_interrupted_read_retry_buffered(self): 4345 self.check_interrupted_read_retry(lambda x: x.decode('latin1'), 4346 mode="rb") 4347 4348 def test_interrupted_read_retry_text(self): 4349 self.check_interrupted_read_retry(lambda x: x, 4350 mode="r") 4351 4352 def check_interrupted_write_retry(self, item, **fdopen_kwargs): 4353 """Check that a buffered write, when it gets interrupted (either 4354 returning a partial result or EINTR), properly invokes the signal 4355 handler and retries if the latter returned successfully.""" 4356 select = support.import_module("select") 4357 4358 # A quantity that exceeds the buffer size of an anonymous pipe's 4359 # write end. 4360 N = support.PIPE_MAX_SIZE 4361 r, w = os.pipe() 4362 fdopen_kwargs["closefd"] = False 4363 4364 # We need a separate thread to read from the pipe and allow the 4365 # write() to finish. This thread is started after the SIGALRM is 4366 # received (forcing a first EINTR in write()). 4367 read_results = [] 4368 write_finished = False 4369 error = None 4370 def _read(): 4371 try: 4372 while not write_finished: 4373 while r in select.select([r], [], [], 1.0)[0]: 4374 s = os.read(r, 1024) 4375 read_results.append(s) 4376 except BaseException as exc: 4377 nonlocal error 4378 error = exc 4379 t = threading.Thread(target=_read) 4380 t.daemon = True 4381 def alarm1(sig, frame): 4382 signal.signal(signal.SIGALRM, alarm2) 4383 signal.alarm(1) 4384 def alarm2(sig, frame): 4385 t.start() 4386 4387 large_data = item * N 4388 signal.signal(signal.SIGALRM, alarm1) 4389 try: 4390 wio = self.io.open(w, **fdopen_kwargs) 4391 signal.alarm(1) 4392 # Expected behaviour: 4393 # - first raw write() is partial (because of the limited pipe buffer 4394 # and the first alarm) 4395 # - second raw write() returns EINTR (because of the second alarm) 4396 # - subsequent write()s are successful (either partial or complete) 4397 written = wio.write(large_data) 4398 self.assertEqual(N, written) 4399 4400 wio.flush() 4401 write_finished = True 4402 t.join() 4403 4404 self.assertIsNone(error) 4405 self.assertEqual(N, sum(len(x) for x in read_results)) 4406 finally: 4407 signal.alarm(0) 4408 write_finished = True 4409 os.close(w) 4410 os.close(r) 4411 # This is deliberate. If we didn't close the file descriptor 4412 # before closing wio, wio would try to flush its internal 4413 # buffer, and could block (in case of failure). 4414 try: 4415 wio.close() 4416 except OSError as e: 4417 if e.errno != errno.EBADF: 4418 raise 4419 4420 def test_interrupted_write_retry_buffered(self): 4421 self.check_interrupted_write_retry(b"x", mode="wb") 4422 4423 def test_interrupted_write_retry_text(self): 4424 self.check_interrupted_write_retry("x", mode="w", encoding="latin1") 4425 4426 4427class CSignalsTest(SignalsTest): 4428 io = io 4429 4430class PySignalsTest(SignalsTest): 4431 io = pyio 4432 4433 # Handling reentrancy issues would slow down _pyio even more, so the 4434 # tests are disabled. 4435 test_reentrant_write_buffered = None 4436 test_reentrant_write_text = None 4437 4438 4439def load_tests(*args): 4440 tests = (CIOTest, PyIOTest, APIMismatchTest, 4441 CBufferedReaderTest, PyBufferedReaderTest, 4442 CBufferedWriterTest, PyBufferedWriterTest, 4443 CBufferedRWPairTest, PyBufferedRWPairTest, 4444 CBufferedRandomTest, PyBufferedRandomTest, 4445 StatefulIncrementalDecoderTest, 4446 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest, 4447 CTextIOWrapperTest, PyTextIOWrapperTest, 4448 CMiscIOTest, PyMiscIOTest, 4449 CSignalsTest, PySignalsTest, 4450 ) 4451 4452 # Put the namespaces of the IO module we are testing and some useful mock 4453 # classes in the __dict__ of each test. 4454 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, 4455 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead, 4456 SlowFlushRawIO) 4457 all_members = io.__all__ + ["IncrementalNewlineDecoder"] 4458 c_io_ns = {name : getattr(io, name) for name in all_members} 4459 py_io_ns = {name : getattr(pyio, name) for name in all_members} 4460 globs = globals() 4461 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks) 4462 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks) 4463 # Avoid turning open into a bound method. 4464 py_io_ns["open"] = pyio.OpenWrapper 4465 for test in tests: 4466 if test.__name__.startswith("C"): 4467 for name, obj in c_io_ns.items(): 4468 setattr(test, name, obj) 4469 elif test.__name__.startswith("Py"): 4470 for name, obj in py_io_ns.items(): 4471 setattr(test, name, obj) 4472 4473 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests]) 4474 return suite 4475 4476if __name__ == "__main__": 4477 unittest.main() 4478